barometer: update DMA's vendoring packages
[barometer.git] / src / dma / vendor / github.com / streadway / amqp / types.go
1 // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 // Source code and contact info at http://github.com/streadway/amqp
5
6 package amqp
7
8 import (
9         "fmt"
10         "io"
11         "time"
12 )
13
14 // Constants for standard AMQP 0-9-1 exchange types.
15 const (
16         ExchangeDirect  = "direct"
17         ExchangeFanout  = "fanout"
18         ExchangeTopic   = "topic"
19         ExchangeHeaders = "headers"
20 )
21
22 var (
23         // ErrClosed is returned when the channel or connection is not open
24         ErrClosed = &Error{Code: ChannelError, Reason: "channel/connection is not open"}
25
26         // ErrChannelMax is returned when Connection.Channel has been called enough
27         // times that all channel IDs have been exhausted in the client or the
28         // server.
29         ErrChannelMax = &Error{Code: ChannelError, Reason: "channel id space exhausted"}
30
31         // ErrSASL is returned from Dial when the authentication mechanism could not
32         // be negoated.
33         ErrSASL = &Error{Code: AccessRefused, Reason: "SASL could not negotiate a shared mechanism"}
34
35         // ErrCredentials is returned when the authenticated client is not authorized
36         // to any vhost.
37         ErrCredentials = &Error{Code: AccessRefused, Reason: "username or password not allowed"}
38
39         // ErrVhost is returned when the authenticated user is not permitted to
40         // access the requested Vhost.
41         ErrVhost = &Error{Code: AccessRefused, Reason: "no access to this vhost"}
42
43         // ErrSyntax is hard protocol error, indicating an unsupported protocol,
44         // implementation or encoding.
45         ErrSyntax = &Error{Code: SyntaxError, Reason: "invalid field or value inside of a frame"}
46
47         // ErrFrame is returned when the protocol frame cannot be read from the
48         // server, indicating an unsupported protocol or unsupported frame type.
49         ErrFrame = &Error{Code: FrameError, Reason: "frame could not be parsed"}
50
51         // ErrCommandInvalid is returned when the server sends an unexpected response
52         // to this requested message type. This indicates a bug in this client.
53         ErrCommandInvalid = &Error{Code: CommandInvalid, Reason: "unexpected command received"}
54
55         // ErrUnexpectedFrame is returned when something other than a method or
56         // heartbeat frame is delivered to the Connection, indicating a bug in the
57         // client.
58         ErrUnexpectedFrame = &Error{Code: UnexpectedFrame, Reason: "unexpected frame received"}
59
60         // ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP.
61         ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
62 )
63
64 // Error captures the code and reason a channel or connection has been closed
65 // by the server.
66 type Error struct {
67         Code    int    // constant code from the specification
68         Reason  string // description of the error
69         Server  bool   // true when initiated from the server, false when from this library
70         Recover bool   // true when this error can be recovered by retrying later or with different parameters
71 }
72
73 func newError(code uint16, text string) *Error {
74         return &Error{
75                 Code:    int(code),
76                 Reason:  text,
77                 Recover: isSoftExceptionCode(int(code)),
78                 Server:  true,
79         }
80 }
81
82 func (e Error) Error() string {
83         return fmt.Sprintf("Exception (%d) Reason: %q", e.Code, e.Reason)
84 }
85
86 // Used by header frames to capture routing and header information
87 type properties struct {
88         ContentType     string    // MIME content type
89         ContentEncoding string    // MIME content encoding
90         Headers         Table     // Application or header exchange table
91         DeliveryMode    uint8     // queue implementation use - Transient (1) or Persistent (2)
92         Priority        uint8     // queue implementation use - 0 to 9
93         CorrelationId   string    // application use - correlation identifier
94         ReplyTo         string    // application use - address to to reply to (ex: RPC)
95         Expiration      string    // implementation use - message expiration spec
96         MessageId       string    // application use - message identifier
97         Timestamp       time.Time // application use - message timestamp
98         Type            string    // application use - message type name
99         UserId          string    // application use - creating user id
100         AppId           string    // application use - creating application
101         reserved1       string    // was cluster-id - process for buffer consumption
102 }
103
104 // DeliveryMode.  Transient means higher throughput but messages will not be
105 // restored on broker restart.  The delivery mode of publishings is unrelated
106 // to the durability of the queues they reside on.  Transient messages will
107 // not be restored to durable queues, persistent messages will be restored to
108 // durable queues and lost on non-durable queues during server restart.
109 //
110 // This remains typed as uint8 to match Publishing.DeliveryMode.  Other
111 // delivery modes specific to custom queue implementations are not enumerated
112 // here.
113 const (
114         Transient  uint8 = 1
115         Persistent uint8 = 2
116 )
117
118 // The property flags are an array of bits that indicate the presence or
119 // absence of each property value in sequence.  The bits are ordered from most
120 // high to low - bit 15 indicates the first property.
121 const (
122         flagContentType     = 0x8000
123         flagContentEncoding = 0x4000
124         flagHeaders         = 0x2000
125         flagDeliveryMode    = 0x1000
126         flagPriority        = 0x0800
127         flagCorrelationId   = 0x0400
128         flagReplyTo         = 0x0200
129         flagExpiration      = 0x0100
130         flagMessageId       = 0x0080
131         flagTimestamp       = 0x0040
132         flagType            = 0x0020
133         flagUserId          = 0x0010
134         flagAppId           = 0x0008
135         flagReserved1       = 0x0004
136 )
137
138 // Queue captures the current server state of the queue on the server returned
139 // from Channel.QueueDeclare or Channel.QueueInspect.
140 type Queue struct {
141         Name      string // server confirmed or generated name
142         Messages  int    // count of messages not awaiting acknowledgment
143         Consumers int    // number of consumers receiving deliveries
144 }
145
146 // Publishing captures the client message sent to the server.  The fields
147 // outside of the Headers table included in this struct mirror the underlying
148 // fields in the content frame.  They use native types for convenience and
149 // efficiency.
150 type Publishing struct {
151         // Application or exchange specific fields,
152         // the headers exchange will inspect this field.
153         Headers Table
154
155         // Properties
156         ContentType     string    // MIME content type
157         ContentEncoding string    // MIME content encoding
158         DeliveryMode    uint8     // Transient (0 or 1) or Persistent (2)
159         Priority        uint8     // 0 to 9
160         CorrelationId   string    // correlation identifier
161         ReplyTo         string    // address to to reply to (ex: RPC)
162         Expiration      string    // message expiration spec
163         MessageId       string    // message identifier
164         Timestamp       time.Time // message timestamp
165         Type            string    // message type name
166         UserId          string    // creating user id - ex: "guest"
167         AppId           string    // creating application id
168
169         // The application specific payload of the message
170         Body []byte
171 }
172
173 // Blocking notifies the server's TCP flow control of the Connection.  When a
174 // server hits a memory or disk alarm it will block all connections until the
175 // resources are reclaimed.  Use NotifyBlock on the Connection to receive these
176 // events.
177 type Blocking struct {
178         Active bool   // TCP pushback active/inactive on server
179         Reason string // Server reason for activation
180 }
181
182 // Confirmation notifies the acknowledgment or negative acknowledgement of a
183 // publishing identified by its delivery tag.  Use NotifyPublish on the Channel
184 // to consume these events.
185 type Confirmation struct {
186         DeliveryTag uint64 // A 1 based counter of publishings from when the channel was put in Confirm mode
187         Ack         bool   // True when the server successfully received the publishing
188 }
189
190 // Decimal matches the AMQP decimal type.  Scale is the number of decimal
191 // digits Scale == 2, Value == 12345, Decimal == 123.45
192 type Decimal struct {
193         Scale uint8
194         Value int32
195 }
196
197 // Table stores user supplied fields of the following types:
198 //
199 //   bool
200 //   byte
201 //   float32
202 //   float64
203 //   int
204 //   int16
205 //   int32
206 //   int64
207 //   nil
208 //   string
209 //   time.Time
210 //   amqp.Decimal
211 //   amqp.Table
212 //   []byte
213 //   []interface{} - containing above types
214 //
215 // Functions taking a table will immediately fail when the table contains a
216 // value of an unsupported type.
217 //
218 // The caller must be specific in which precision of integer it wishes to
219 // encode.
220 //
221 // Use a type assertion when reading values from a table for type conversion.
222 //
223 // RabbitMQ expects int32 for integer values.
224 //
225 type Table map[string]interface{}
226
227 func validateField(f interface{}) error {
228         switch fv := f.(type) {
229         case nil, bool, byte, int, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time:
230                 return nil
231
232         case []interface{}:
233                 for _, v := range fv {
234                         if err := validateField(v); err != nil {
235                                 return fmt.Errorf("in array %s", err)
236                         }
237                 }
238                 return nil
239
240         case Table:
241                 for k, v := range fv {
242                         if err := validateField(v); err != nil {
243                                 return fmt.Errorf("table field %q %s", k, err)
244                         }
245                 }
246                 return nil
247         }
248
249         return fmt.Errorf("value %t not supported", f)
250 }
251
252 // Validate returns and error if any Go types in the table are incompatible with AMQP types.
253 func (t Table) Validate() error {
254         return validateField(t)
255 }
256
257 // Heap interface for maintaining delivery tags
258 type tagSet []uint64
259
260 func (set tagSet) Len() int              { return len(set) }
261 func (set tagSet) Less(i, j int) bool    { return (set)[i] < (set)[j] }
262 func (set tagSet) Swap(i, j int)         { (set)[i], (set)[j] = (set)[j], (set)[i] }
263 func (set *tagSet) Push(tag interface{}) { *set = append(*set, tag.(uint64)) }
264 func (set *tagSet) Pop() interface{} {
265         val := (*set)[len(*set)-1]
266         *set = (*set)[:len(*set)-1]
267         return val
268 }
269
270 type message interface {
271         id() (uint16, uint16)
272         wait() bool
273         read(io.Reader) error
274         write(io.Writer) error
275 }
276
277 type messageWithContent interface {
278         message
279         getContent() (properties, []byte)
280         setContent(properties, []byte)
281 }
282
283 /*
284 The base interface implemented as:
285
286 2.3.5  frame Details
287
288 All frames consist of a header (7 octets), a payload of arbitrary size, and a 'frame-end' octet that detects
289 malformed frames:
290
291   0      1         3             7                  size+7 size+8
292   +------+---------+-------------+  +------------+  +-----------+
293   | type | channel |     size    |  |  payload   |  | frame-end |
294   +------+---------+-------------+  +------------+  +-----------+
295    octet   short         long         size octets       octet
296
297 To read a frame, we:
298
299  1. Read the header and check the frame type and channel.
300  2. Depending on the frame type, we read the payload and process it.
301  3. Read the frame end octet.
302
303 In realistic implementations where performance is a concern, we would use
304 “read-ahead buffering” or “gathering reads” to avoid doing three separate
305 system calls to read a frame.
306
307 */
308 type frame interface {
309         write(io.Writer) error
310         channel() uint16
311 }
312
313 type reader struct {
314         r io.Reader
315 }
316
317 type writer struct {
318         w io.Writer
319 }
320
321 // Implements the frame interface for Connection RPC
322 type protocolHeader struct{}
323
324 func (protocolHeader) write(w io.Writer) error {
325         _, err := w.Write([]byte{'A', 'M', 'Q', 'P', 0, 0, 9, 1})
326         return err
327 }
328
329 func (protocolHeader) channel() uint16 {
330         panic("only valid as initial handshake")
331 }
332
333 /*
334 Method frames carry the high-level protocol commands (which we call "methods").
335 One method frame carries one command.  The method frame payload has this format:
336
337   0          2           4
338   +----------+-----------+-------------- - -
339   | class-id | method-id | arguments...
340   +----------+-----------+-------------- - -
341      short      short    ...
342
343 To process a method frame, we:
344  1. Read the method frame payload.
345  2. Unpack it into a structure.  A given method always has the same structure,
346  so we can unpack the method rapidly.  3. Check that the method is allowed in
347  the current context.
348  4. Check that the method arguments are valid.
349  5. Execute the method.
350
351 Method frame bodies are constructed as a list of AMQP data fields (bits,
352 integers, strings and string tables).  The marshalling code is trivially
353 generated directly from the protocol specifications, and can be very rapid.
354 */
355 type methodFrame struct {
356         ChannelId uint16
357         ClassId   uint16
358         MethodId  uint16
359         Method    message
360 }
361
362 func (f *methodFrame) channel() uint16 { return f.ChannelId }
363
364 /*
365 Heartbeating is a technique designed to undo one of TCP/IP's features, namely
366 its ability to recover from a broken physical connection by closing only after
367 a quite long time-out.  In some scenarios we need to know very rapidly if a
368 peer is disconnected or not responding for other reasons (e.g. it is looping).
369 Since heartbeating can be done at a low level, we implement this as a special
370 type of frame that peers exchange at the transport level, rather than as a
371 class method.
372 */
373 type heartbeatFrame struct {
374         ChannelId uint16
375 }
376
377 func (f *heartbeatFrame) channel() uint16 { return f.ChannelId }
378
379 /*
380 Certain methods (such as Basic.Publish, Basic.Deliver, etc.) are formally
381 defined as carrying content.  When a peer sends such a method frame, it always
382 follows it with a content header and zero or more content body frames.
383
384 A content header frame has this format:
385
386     0          2        4           12               14
387     +----------+--------+-----------+----------------+------------- - -
388     | class-id | weight | body size | property flags | property list...
389     +----------+--------+-----------+----------------+------------- - -
390       short     short    long long       short        remainder...
391
392 We place content body in distinct frames (rather than including it in the
393 method) so that AMQP may support "zero copy" techniques in which content is
394 never marshalled or encoded.  We place the content properties in their own
395 frame so that recipients can selectively discard contents they do not want to
396 process
397 */
398 type headerFrame struct {
399         ChannelId  uint16
400         ClassId    uint16
401         weight     uint16
402         Size       uint64
403         Properties properties
404 }
405
406 func (f *headerFrame) channel() uint16 { return f.ChannelId }
407
408 /*
409 Content is the application data we carry from client-to-client via the AMQP
410 server.  Content is, roughly speaking, a set of properties plus a binary data
411 part.  The set of allowed properties are defined by the Basic class, and these
412 form the "content header frame".  The data can be any size, and MAY be broken
413 into several (or many) chunks, each forming a "content body frame".
414
415 Looking at the frames for a specific channel, as they pass on the wire, we
416 might see something like this:
417
418                 [method]
419                 [method] [header] [body] [body]
420                 [method]
421                 ...
422 */
423 type bodyFrame struct {
424         ChannelId uint16
425         Body      []byte
426 }
427
428 func (f *bodyFrame) channel() uint16 { return f.ChannelId }