1 // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 // Source code and contact info at http://github.com/streadway/amqp
13 var errDeliveryNotInitialized = errors.New("delivery not initialized")
15 // Acknowledger notifies the server of successful or failed consumption of
16 // delivieries via identifier found in the Delivery.DeliveryTag field.
18 // Applications can provide mock implementations in tests of Delivery handlers.
19 type Acknowledger interface {
20 Ack(tag uint64, multiple bool) error
21 Nack(tag uint64, multiple bool, requeue bool) error
22 Reject(tag uint64, requeue bool) error
25 // Delivery captures the fields for a previously delivered message resident in
26 // a queue to be delivered by the server to a consumer from Channel.Consume or
28 type Delivery struct {
29 Acknowledger Acknowledger // the channel from which this delivery arrived
31 Headers Table // Application or header exchange table
34 ContentType string // MIME content type
35 ContentEncoding string // MIME content encoding
36 DeliveryMode uint8 // queue implementation use - non-persistent (1) or persistent (2)
37 Priority uint8 // queue implementation use - 0 to 9
38 CorrelationId string // application use - correlation identifier
39 ReplyTo string // application use - address to reply to (ex: RPC)
40 Expiration string // implementation use - message expiration spec
41 MessageId string // application use - message identifier
42 Timestamp time.Time // application use - message timestamp
43 Type string // application use - message type name
44 UserId string // application use - creating user - should be authenticated user
45 AppId string // application use - creating application id
47 // Valid only with Channel.Consume
50 // Valid only with Channel.Get
55 Exchange string // basic.publish exchange
56 RoutingKey string // basic.publish routing key
61 func newDelivery(channel *Channel, msg messageWithContent) *Delivery {
62 props, body := msg.getContent()
65 Acknowledger: channel,
67 Headers: props.Headers,
68 ContentType: props.ContentType,
69 ContentEncoding: props.ContentEncoding,
70 DeliveryMode: props.DeliveryMode,
71 Priority: props.Priority,
72 CorrelationId: props.CorrelationId,
73 ReplyTo: props.ReplyTo,
74 Expiration: props.Expiration,
75 MessageId: props.MessageId,
76 Timestamp: props.Timestamp,
84 // Properties for the delivery types
85 switch m := msg.(type) {
87 delivery.ConsumerTag = m.ConsumerTag
88 delivery.DeliveryTag = m.DeliveryTag
89 delivery.Redelivered = m.Redelivered
90 delivery.Exchange = m.Exchange
91 delivery.RoutingKey = m.RoutingKey
94 delivery.MessageCount = m.MessageCount
95 delivery.DeliveryTag = m.DeliveryTag
96 delivery.Redelivered = m.Redelivered
97 delivery.Exchange = m.Exchange
98 delivery.RoutingKey = m.RoutingKey
105 Ack delegates an acknowledgement through the Acknowledger interface that the
106 client or server has finished work on a delivery.
108 All deliveries in AMQP must be acknowledged. If you called Channel.Consume
109 with autoAck true then the server will be automatically ack each message and
110 this method should not be called. Otherwise, you must call Delivery.Ack after
111 you have successfully processed this delivery.
113 When multiple is true, this delivery and all prior unacknowledged deliveries
114 on the same channel will be acknowledged. This is useful for batch processing
117 An error will indicate that the acknowledge could not be delivered to the
118 channel it was sent from.
120 Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
121 delivery that is not automatically acknowledged.
123 func (d Delivery) Ack(multiple bool) error {
124 if d.Acknowledger == nil {
125 return errDeliveryNotInitialized
127 return d.Acknowledger.Ack(d.DeliveryTag, multiple)
131 Reject delegates a negatively acknowledgement through the Acknowledger interface.
133 When requeue is true, queue this message to be delivered to a consumer on a
134 different channel. When requeue is false or the server is unable to queue this
135 message, it will be dropped.
137 If you are batch processing deliveries, and your server supports it, prefer
140 Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
141 delivery that is not automatically acknowledged.
143 func (d Delivery) Reject(requeue bool) error {
144 if d.Acknowledger == nil {
145 return errDeliveryNotInitialized
147 return d.Acknowledger.Reject(d.DeliveryTag, requeue)
151 Nack negatively acknowledge the delivery of message(s) identified by the
152 delivery tag from either the client or server.
154 When multiple is true, nack messages up to and including delivered messages up
155 until the delivery tag delivered on the same channel.
157 When requeue is true, request the server to deliver this message to a different
158 consumer. If it is not possible or requeue is false, the message will be
159 dropped or delivered to a server configured dead-letter queue.
161 This method must not be used to select or requeue messages the client wishes
162 not to handle, rather it is to inform the server that the client is incapable
163 of handling this message at this time.
165 Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
166 delivery that is not automatically acknowledged.
168 func (d Delivery) Nack(multiple, requeue bool) error {
169 if d.Acknowledger == nil {
170 return errDeliveryNotInitialized
172 return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue)