1 // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 // Source code and contact info at http://github.com/streadway/amqp
14 // Constants for standard AMQP 0-9-1 exchange types.
16 ExchangeDirect = "direct"
17 ExchangeFanout = "fanout"
18 ExchangeTopic = "topic"
19 ExchangeHeaders = "headers"
23 // ErrClosed is returned when the channel or connection is not open
24 ErrClosed = &Error{Code: ChannelError, Reason: "channel/connection is not open"}
26 // ErrChannelMax is returned when Connection.Channel has been called enough
27 // times that all channel IDs have been exhausted in the client or the
29 ErrChannelMax = &Error{Code: ChannelError, Reason: "channel id space exhausted"}
31 // ErrSASL is returned from Dial when the authentication mechanism could not
33 ErrSASL = &Error{Code: AccessRefused, Reason: "SASL could not negotiate a shared mechanism"}
35 // ErrCredentials is returned when the authenticated client is not authorized
37 ErrCredentials = &Error{Code: AccessRefused, Reason: "username or password not allowed"}
39 // ErrVhost is returned when the authenticated user is not permitted to
40 // access the requested Vhost.
41 ErrVhost = &Error{Code: AccessRefused, Reason: "no access to this vhost"}
43 // ErrSyntax is hard protocol error, indicating an unsupported protocol,
44 // implementation or encoding.
45 ErrSyntax = &Error{Code: SyntaxError, Reason: "invalid field or value inside of a frame"}
47 // ErrFrame is returned when the protocol frame cannot be read from the
48 // server, indicating an unsupported protocol or unsupported frame type.
49 ErrFrame = &Error{Code: FrameError, Reason: "frame could not be parsed"}
51 // ErrCommandInvalid is returned when the server sends an unexpected response
52 // to this requested message type. This indicates a bug in this client.
53 ErrCommandInvalid = &Error{Code: CommandInvalid, Reason: "unexpected command received"}
55 // ErrUnexpectedFrame is returned when something other than a method or
56 // heartbeat frame is delivered to the Connection, indicating a bug in the
58 ErrUnexpectedFrame = &Error{Code: UnexpectedFrame, Reason: "unexpected frame received"}
60 // ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP.
61 ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
64 // Error captures the code and reason a channel or connection has been closed
67 Code int // constant code from the specification
68 Reason string // description of the error
69 Server bool // true when initiated from the server, false when from this library
70 Recover bool // true when this error can be recovered by retrying later or with different parameters
73 func newError(code uint16, text string) *Error {
77 Recover: isSoftExceptionCode(int(code)),
82 func (e Error) Error() string {
83 return fmt.Sprintf("Exception (%d) Reason: %q", e.Code, e.Reason)
86 // Used by header frames to capture routing and header information
87 type properties struct {
88 ContentType string // MIME content type
89 ContentEncoding string // MIME content encoding
90 Headers Table // Application or header exchange table
91 DeliveryMode uint8 // queue implementation use - Transient (1) or Persistent (2)
92 Priority uint8 // queue implementation use - 0 to 9
93 CorrelationId string // application use - correlation identifier
94 ReplyTo string // application use - address to to reply to (ex: RPC)
95 Expiration string // implementation use - message expiration spec
96 MessageId string // application use - message identifier
97 Timestamp time.Time // application use - message timestamp
98 Type string // application use - message type name
99 UserId string // application use - creating user id
100 AppId string // application use - creating application
101 reserved1 string // was cluster-id - process for buffer consumption
104 // DeliveryMode. Transient means higher throughput but messages will not be
105 // restored on broker restart. The delivery mode of publishings is unrelated
106 // to the durability of the queues they reside on. Transient messages will
107 // not be restored to durable queues, persistent messages will be restored to
108 // durable queues and lost on non-durable queues during server restart.
110 // This remains typed as uint8 to match Publishing.DeliveryMode. Other
111 // delivery modes specific to custom queue implementations are not enumerated
118 // The property flags are an array of bits that indicate the presence or
119 // absence of each property value in sequence. The bits are ordered from most
120 // high to low - bit 15 indicates the first property.
122 flagContentType = 0x8000
123 flagContentEncoding = 0x4000
125 flagDeliveryMode = 0x1000
126 flagPriority = 0x0800
127 flagCorrelationId = 0x0400
129 flagExpiration = 0x0100
130 flagMessageId = 0x0080
131 flagTimestamp = 0x0040
135 flagReserved1 = 0x0004
138 // Queue captures the current server state of the queue on the server returned
139 // from Channel.QueueDeclare or Channel.QueueInspect.
141 Name string // server confirmed or generated name
142 Messages int // count of messages not awaiting acknowledgment
143 Consumers int // number of consumers receiving deliveries
146 // Publishing captures the client message sent to the server. The fields
147 // outside of the Headers table included in this struct mirror the underlying
148 // fields in the content frame. They use native types for convenience and
150 type Publishing struct {
151 // Application or exchange specific fields,
152 // the headers exchange will inspect this field.
156 ContentType string // MIME content type
157 ContentEncoding string // MIME content encoding
158 DeliveryMode uint8 // Transient (0 or 1) or Persistent (2)
159 Priority uint8 // 0 to 9
160 CorrelationId string // correlation identifier
161 ReplyTo string // address to to reply to (ex: RPC)
162 Expiration string // message expiration spec
163 MessageId string // message identifier
164 Timestamp time.Time // message timestamp
165 Type string // message type name
166 UserId string // creating user id - ex: "guest"
167 AppId string // creating application id
169 // The application specific payload of the message
173 // Blocking notifies the server's TCP flow control of the Connection. When a
174 // server hits a memory or disk alarm it will block all connections until the
175 // resources are reclaimed. Use NotifyBlock on the Connection to receive these
177 type Blocking struct {
178 Active bool // TCP pushback active/inactive on server
179 Reason string // Server reason for activation
182 // Confirmation notifies the acknowledgment or negative acknowledgement of a
183 // publishing identified by its delivery tag. Use NotifyPublish on the Channel
184 // to consume these events.
185 type Confirmation struct {
186 DeliveryTag uint64 // A 1 based counter of publishings from when the channel was put in Confirm mode
187 Ack bool // True when the server successfully received the publishing
190 // Decimal matches the AMQP decimal type. Scale is the number of decimal
191 // digits Scale == 2, Value == 12345, Decimal == 123.45
192 type Decimal struct {
197 // Table stores user supplied fields of the following types:
213 // []interface{} - containing above types
215 // Functions taking a table will immediately fail when the table contains a
216 // value of an unsupported type.
218 // The caller must be specific in which precision of integer it wishes to
221 // Use a type assertion when reading values from a table for type conversion.
223 // RabbitMQ expects int32 for integer values.
225 type Table map[string]interface{}
227 func validateField(f interface{}) error {
228 switch fv := f.(type) {
229 case nil, bool, byte, int, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time:
233 for _, v := range fv {
234 if err := validateField(v); err != nil {
235 return fmt.Errorf("in array %s", err)
241 for k, v := range fv {
242 if err := validateField(v); err != nil {
243 return fmt.Errorf("table field %q %s", k, err)
249 return fmt.Errorf("value %t not supported", f)
252 // Validate returns and error if any Go types in the table are incompatible with AMQP types.
253 func (t Table) Validate() error {
254 return validateField(t)
257 // Heap interface for maintaining delivery tags
260 func (set tagSet) Len() int { return len(set) }
261 func (set tagSet) Less(i, j int) bool { return (set)[i] < (set)[j] }
262 func (set tagSet) Swap(i, j int) { (set)[i], (set)[j] = (set)[j], (set)[i] }
263 func (set *tagSet) Push(tag interface{}) { *set = append(*set, tag.(uint64)) }
264 func (set *tagSet) Pop() interface{} {
265 val := (*set)[len(*set)-1]
266 *set = (*set)[:len(*set)-1]
270 type message interface {
271 id() (uint16, uint16)
273 read(io.Reader) error
274 write(io.Writer) error
277 type messageWithContent interface {
279 getContent() (properties, []byte)
280 setContent(properties, []byte)
284 The base interface implemented as:
288 All frames consist of a header (7 octets), a payload of arbitrary size, and a 'frame-end' octet that detects
291 0 1 3 7 size+7 size+8
292 +------+---------+-------------+ +------------+ +-----------+
293 | type | channel | size | | payload | | frame-end |
294 +------+---------+-------------+ +------------+ +-----------+
295 octet short long size octets octet
299 1. Read the header and check the frame type and channel.
300 2. Depending on the frame type, we read the payload and process it.
301 3. Read the frame end octet.
303 In realistic implementations where performance is a concern, we would use
304 “read-ahead buffering” or “gathering reads” to avoid doing three separate
305 system calls to read a frame.
308 type frame interface {
309 write(io.Writer) error
321 // Implements the frame interface for Connection RPC
322 type protocolHeader struct{}
324 func (protocolHeader) write(w io.Writer) error {
325 _, err := w.Write([]byte{'A', 'M', 'Q', 'P', 0, 0, 9, 1})
329 func (protocolHeader) channel() uint16 {
330 panic("only valid as initial handshake")
334 Method frames carry the high-level protocol commands (which we call "methods").
335 One method frame carries one command. The method frame payload has this format:
338 +----------+-----------+-------------- - -
339 | class-id | method-id | arguments...
340 +----------+-----------+-------------- - -
343 To process a method frame, we:
344 1. Read the method frame payload.
345 2. Unpack it into a structure. A given method always has the same structure,
346 so we can unpack the method rapidly. 3. Check that the method is allowed in
348 4. Check that the method arguments are valid.
349 5. Execute the method.
351 Method frame bodies are constructed as a list of AMQP data fields (bits,
352 integers, strings and string tables). The marshalling code is trivially
353 generated directly from the protocol specifications, and can be very rapid.
355 type methodFrame struct {
362 func (f *methodFrame) channel() uint16 { return f.ChannelId }
365 Heartbeating is a technique designed to undo one of TCP/IP's features, namely
366 its ability to recover from a broken physical connection by closing only after
367 a quite long time-out. In some scenarios we need to know very rapidly if a
368 peer is disconnected or not responding for other reasons (e.g. it is looping).
369 Since heartbeating can be done at a low level, we implement this as a special
370 type of frame that peers exchange at the transport level, rather than as a
373 type heartbeatFrame struct {
377 func (f *heartbeatFrame) channel() uint16 { return f.ChannelId }
380 Certain methods (such as Basic.Publish, Basic.Deliver, etc.) are formally
381 defined as carrying content. When a peer sends such a method frame, it always
382 follows it with a content header and zero or more content body frames.
384 A content header frame has this format:
387 +----------+--------+-----------+----------------+------------- - -
388 | class-id | weight | body size | property flags | property list...
389 +----------+--------+-----------+----------------+------------- - -
390 short short long long short remainder...
392 We place content body in distinct frames (rather than including it in the
393 method) so that AMQP may support "zero copy" techniques in which content is
394 never marshalled or encoded. We place the content properties in their own
395 frame so that recipients can selectively discard contents they do not want to
398 type headerFrame struct {
403 Properties properties
406 func (f *headerFrame) channel() uint16 { return f.ChannelId }
409 Content is the application data we carry from client-to-client via the AMQP
410 server. Content is, roughly speaking, a set of properties plus a binary data
411 part. The set of allowed properties are defined by the Basic class, and these
412 form the "content header frame". The data can be any size, and MAY be broken
413 into several (or many) chunks, each forming a "content body frame".
415 Looking at the frames for a specific channel, as they pass on the wire, we
416 might see something like this:
419 [method] [header] [body] [body]
423 type bodyFrame struct {
428 func (f *bodyFrame) channel() uint16 { return f.ChannelId }