src: Add DMA localagent
[barometer.git] / src / dma / vendor / github.com / streadway / amqp / types.go
diff --git a/src/dma/vendor/github.com/streadway/amqp/types.go b/src/dma/vendor/github.com/streadway/amqp/types.go
new file mode 100644 (file)
index 0000000..ff5ea3c
--- /dev/null
@@ -0,0 +1,427 @@
+// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+// Source code and contact info at http://github.com/streadway/amqp
+
+package amqp
+
+import (
+       "fmt"
+       "io"
+       "time"
+)
+
+// Constants for standard AMQP 0-9-1 exchange types.
+const (
+       ExchangeDirect  = "direct"
+       ExchangeFanout  = "fanout"
+       ExchangeTopic   = "topic"
+       ExchangeHeaders = "headers"
+)
+
+var (
+       // ErrClosed is returned when the channel or connection is not open
+       ErrClosed = &Error{Code: ChannelError, Reason: "channel/connection is not open"}
+
+       // ErrChannelMax is returned when Connection.Channel has been called enough
+       // times that all channel IDs have been exhausted in the client or the
+       // server.
+       ErrChannelMax = &Error{Code: ChannelError, Reason: "channel id space exhausted"}
+
+       // ErrSASL is returned from Dial when the authentication mechanism could not
+       // be negoated.
+       ErrSASL = &Error{Code: AccessRefused, Reason: "SASL could not negotiate a shared mechanism"}
+
+       // ErrCredentials is returned when the authenticated client is not authorized
+       // to any vhost.
+       ErrCredentials = &Error{Code: AccessRefused, Reason: "username or password not allowed"}
+
+       // ErrVhost is returned when the authenticated user is not permitted to
+       // access the requested Vhost.
+       ErrVhost = &Error{Code: AccessRefused, Reason: "no access to this vhost"}
+
+       // ErrSyntax is hard protocol error, indicating an unsupported protocol,
+       // implementation or encoding.
+       ErrSyntax = &Error{Code: SyntaxError, Reason: "invalid field or value inside of a frame"}
+
+       // ErrFrame is returned when the protocol frame cannot be read from the
+       // server, indicating an unsupported protocol or unsupported frame type.
+       ErrFrame = &Error{Code: FrameError, Reason: "frame could not be parsed"}
+
+       // ErrCommandInvalid is returned when the server sends an unexpected response
+       // to this requested message type. This indicates a bug in this client.
+       ErrCommandInvalid = &Error{Code: CommandInvalid, Reason: "unexpected command received"}
+
+       // ErrUnexpectedFrame is returned when something other than a method or
+       // heartbeat frame is delivered to the Connection, indicating a bug in the
+       // client.
+       ErrUnexpectedFrame = &Error{Code: UnexpectedFrame, Reason: "unexpected frame received"}
+
+       // ErrFieldType is returned when writing a message containing a Go type unsupported by AMQP.
+       ErrFieldType = &Error{Code: SyntaxError, Reason: "unsupported table field type"}
+)
+
+// Error captures the code and reason a channel or connection has been closed
+// by the server.
+type Error struct {
+       Code    int    // constant code from the specification
+       Reason  string // description of the error
+       Server  bool   // true when initiated from the server, false when from this library
+       Recover bool   // true when this error can be recovered by retrying later or with different parameters
+}
+
+func newError(code uint16, text string) *Error {
+       return &Error{
+               Code:    int(code),
+               Reason:  text,
+               Recover: isSoftExceptionCode(int(code)),
+               Server:  true,
+       }
+}
+
+func (e Error) Error() string {
+       return fmt.Sprintf("Exception (%d) Reason: %q", e.Code, e.Reason)
+}
+
+// Used by header frames to capture routing and header information
+type properties struct {
+       ContentType     string    // MIME content type
+       ContentEncoding string    // MIME content encoding
+       Headers         Table     // Application or header exchange table
+       DeliveryMode    uint8     // queue implementation use - Transient (1) or Persistent (2)
+       Priority        uint8     // queue implementation use - 0 to 9
+       CorrelationId   string    // application use - correlation identifier
+       ReplyTo         string    // application use - address to to reply to (ex: RPC)
+       Expiration      string    // implementation use - message expiration spec
+       MessageId       string    // application use - message identifier
+       Timestamp       time.Time // application use - message timestamp
+       Type            string    // application use - message type name
+       UserId          string    // application use - creating user id
+       AppId           string    // application use - creating application
+       reserved1       string    // was cluster-id - process for buffer consumption
+}
+
+// DeliveryMode.  Transient means higher throughput but messages will not be
+// restored on broker restart.  The delivery mode of publishings is unrelated
+// to the durability of the queues they reside on.  Transient messages will
+// not be restored to durable queues, persistent messages will be restored to
+// durable queues and lost on non-durable queues during server restart.
+//
+// This remains typed as uint8 to match Publishing.DeliveryMode.  Other
+// delivery modes specific to custom queue implementations are not enumerated
+// here.
+const (
+       Transient  uint8 = 1
+       Persistent uint8 = 2
+)
+
+// The property flags are an array of bits that indicate the presence or
+// absence of each property value in sequence.  The bits are ordered from most
+// high to low - bit 15 indicates the first property.
+const (
+       flagContentType     = 0x8000
+       flagContentEncoding = 0x4000
+       flagHeaders         = 0x2000
+       flagDeliveryMode    = 0x1000
+       flagPriority        = 0x0800
+       flagCorrelationId   = 0x0400
+       flagReplyTo         = 0x0200
+       flagExpiration      = 0x0100
+       flagMessageId       = 0x0080
+       flagTimestamp       = 0x0040
+       flagType            = 0x0020
+       flagUserId          = 0x0010
+       flagAppId           = 0x0008
+       flagReserved1       = 0x0004
+)
+
+// Queue captures the current server state of the queue on the server returned
+// from Channel.QueueDeclare or Channel.QueueInspect.
+type Queue struct {
+       Name      string // server confirmed or generated name
+       Messages  int    // count of messages not awaiting acknowledgment
+       Consumers int    // number of consumers receiving deliveries
+}
+
+// Publishing captures the client message sent to the server.  The fields
+// outside of the Headers table included in this struct mirror the underlying
+// fields in the content frame.  They use native types for convenience and
+// efficiency.
+type Publishing struct {
+       // Application or exchange specific fields,
+       // the headers exchange will inspect this field.
+       Headers Table
+
+       // Properties
+       ContentType     string    // MIME content type
+       ContentEncoding string    // MIME content encoding
+       DeliveryMode    uint8     // Transient (0 or 1) or Persistent (2)
+       Priority        uint8     // 0 to 9
+       CorrelationId   string    // correlation identifier
+       ReplyTo         string    // address to to reply to (ex: RPC)
+       Expiration      string    // message expiration spec
+       MessageId       string    // message identifier
+       Timestamp       time.Time // message timestamp
+       Type            string    // message type name
+       UserId          string    // creating user id - ex: "guest"
+       AppId           string    // creating application id
+
+       // The application specific payload of the message
+       Body []byte
+}
+
+// Blocking notifies the server's TCP flow control of the Connection.  When a
+// server hits a memory or disk alarm it will block all connections until the
+// resources are reclaimed.  Use NotifyBlock on the Connection to receive these
+// events.
+type Blocking struct {
+       Active bool   // TCP pushback active/inactive on server
+       Reason string // Server reason for activation
+}
+
+// Confirmation notifies the acknowledgment or negative acknowledgement of a
+// publishing identified by its delivery tag.  Use NotifyPublish on the Channel
+// to consume these events.
+type Confirmation struct {
+       DeliveryTag uint64 // A 1 based counter of publishings from when the channel was put in Confirm mode
+       Ack         bool   // True when the server successfully received the publishing
+}
+
+// Decimal matches the AMQP decimal type.  Scale is the number of decimal
+// digits Scale == 2, Value == 12345, Decimal == 123.45
+type Decimal struct {
+       Scale uint8
+       Value int32
+}
+
+// Table stores user supplied fields of the following types:
+//
+//   bool
+//   byte
+//   float32
+//   float64
+//   int16
+//   int32
+//   int64
+//   nil
+//   string
+//   time.Time
+//   amqp.Decimal
+//   amqp.Table
+//   []byte
+//   []interface{} - containing above types
+//
+// Functions taking a table will immediately fail when the table contains a
+// value of an unsupported type.
+//
+// The caller must be specific in which precision of integer it wishes to
+// encode.
+//
+// Use a type assertion when reading values from a table for type conversion.
+//
+// RabbitMQ expects int32 for integer values.
+//
+type Table map[string]interface{}
+
+func validateField(f interface{}) error {
+       switch fv := f.(type) {
+       case nil, bool, byte, int16, int32, int64, float32, float64, string, []byte, Decimal, time.Time:
+               return nil
+
+       case []interface{}:
+               for _, v := range fv {
+                       if err := validateField(v); err != nil {
+                               return fmt.Errorf("in array %s", err)
+                       }
+               }
+               return nil
+
+       case Table:
+               for k, v := range fv {
+                       if err := validateField(v); err != nil {
+                               return fmt.Errorf("table field %q %s", k, err)
+                       }
+               }
+               return nil
+       }
+
+       return fmt.Errorf("value %t not supported", f)
+}
+
+// Validate returns and error if any Go types in the table are incompatible with AMQP types.
+func (t Table) Validate() error {
+       return validateField(t)
+}
+
+// Heap interface for maintaining delivery tags
+type tagSet []uint64
+
+func (set tagSet) Len() int              { return len(set) }
+func (set tagSet) Less(i, j int) bool    { return (set)[i] < (set)[j] }
+func (set tagSet) Swap(i, j int)         { (set)[i], (set)[j] = (set)[j], (set)[i] }
+func (set *tagSet) Push(tag interface{}) { *set = append(*set, tag.(uint64)) }
+func (set *tagSet) Pop() interface{} {
+       val := (*set)[len(*set)-1]
+       *set = (*set)[:len(*set)-1]
+       return val
+}
+
+type message interface {
+       id() (uint16, uint16)
+       wait() bool
+       read(io.Reader) error
+       write(io.Writer) error
+}
+
+type messageWithContent interface {
+       message
+       getContent() (properties, []byte)
+       setContent(properties, []byte)
+}
+
+/*
+The base interface implemented as:
+
+2.3.5  frame Details
+
+All frames consist of a header (7 octets), a payload of arbitrary size, and a 'frame-end' octet that detects
+malformed frames:
+
+  0      1         3             7                  size+7 size+8
+  +------+---------+-------------+  +------------+  +-----------+
+  | type | channel |     size    |  |  payload   |  | frame-end |
+  +------+---------+-------------+  +------------+  +-----------+
+   octet   short         long         size octets       octet
+
+To read a frame, we:
+
+ 1. Read the header and check the frame type and channel.
+ 2. Depending on the frame type, we read the payload and process it.
+ 3. Read the frame end octet.
+
+In realistic implementations where performance is a concern, we would use
+“read-ahead buffering” or “gathering reads” to avoid doing three separate
+system calls to read a frame.
+
+*/
+type frame interface {
+       write(io.Writer) error
+       channel() uint16
+}
+
+type reader struct {
+       r io.Reader
+}
+
+type writer struct {
+       w io.Writer
+}
+
+// Implements the frame interface for Connection RPC
+type protocolHeader struct{}
+
+func (protocolHeader) write(w io.Writer) error {
+       _, err := w.Write([]byte{'A', 'M', 'Q', 'P', 0, 0, 9, 1})
+       return err
+}
+
+func (protocolHeader) channel() uint16 {
+       panic("only valid as initial handshake")
+}
+
+/*
+Method frames carry the high-level protocol commands (which we call "methods").
+One method frame carries one command.  The method frame payload has this format:
+
+  0          2           4
+  +----------+-----------+-------------- - -
+  | class-id | method-id | arguments...
+  +----------+-----------+-------------- - -
+     short      short    ...
+
+To process a method frame, we:
+ 1. Read the method frame payload.
+ 2. Unpack it into a structure.  A given method always has the same structure,
+ so we can unpack the method rapidly.  3. Check that the method is allowed in
+ the current context.
+ 4. Check that the method arguments are valid.
+ 5. Execute the method.
+
+Method frame bodies are constructed as a list of AMQP data fields (bits,
+integers, strings and string tables).  The marshalling code is trivially
+generated directly from the protocol specifications, and can be very rapid.
+*/
+type methodFrame struct {
+       ChannelId uint16
+       ClassId   uint16
+       MethodId  uint16
+       Method    message
+}
+
+func (f *methodFrame) channel() uint16 { return f.ChannelId }
+
+/*
+Heartbeating is a technique designed to undo one of TCP/IP's features, namely
+its ability to recover from a broken physical connection by closing only after
+a quite long time-out.  In some scenarios we need to know very rapidly if a
+peer is disconnected or not responding for other reasons (e.g. it is looping).
+Since heartbeating can be done at a low level, we implement this as a special
+type of frame that peers exchange at the transport level, rather than as a
+class method.
+*/
+type heartbeatFrame struct {
+       ChannelId uint16
+}
+
+func (f *heartbeatFrame) channel() uint16 { return f.ChannelId }
+
+/*
+Certain methods (such as Basic.Publish, Basic.Deliver, etc.) are formally
+defined as carrying content.  When a peer sends such a method frame, it always
+follows it with a content header and zero or more content body frames.
+
+A content header frame has this format:
+
+    0          2        4           12               14
+    +----------+--------+-----------+----------------+------------- - -
+    | class-id | weight | body size | property flags | property list...
+    +----------+--------+-----------+----------------+------------- - -
+      short     short    long long       short        remainder...
+
+We place content body in distinct frames (rather than including it in the
+method) so that AMQP may support "zero copy" techniques in which content is
+never marshalled or encoded.  We place the content properties in their own
+frame so that recipients can selectively discard contents they do not want to
+process
+*/
+type headerFrame struct {
+       ChannelId  uint16
+       ClassId    uint16
+       weight     uint16
+       Size       uint64
+       Properties properties
+}
+
+func (f *headerFrame) channel() uint16 { return f.ChannelId }
+
+/*
+Content is the application data we carry from client-to-client via the AMQP
+server.  Content is, roughly speaking, a set of properties plus a binary data
+part.  The set of allowed properties are defined by the Basic class, and these
+form the "content header frame".  The data can be any size, and MAY be broken
+into several (or many) chunks, each forming a "content body frame".
+
+Looking at the frames for a specific channel, as they pass on the wire, we
+might see something like this:
+
+               [method]
+               [method] [header] [body] [body]
+               [method]
+               ...
+*/
+type bodyFrame struct {
+       ChannelId uint16
+       Body      []byte
+}
+
+func (f *bodyFrame) channel() uint16 { return f.ChannelId }