barometer: update DMA's vendoring packages
[barometer.git] / src / dma / vendor / github.com / streadway / amqp / channel.go
1 // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 // Source code and contact info at http://github.com/streadway/amqp
5
6 package amqp
7
8 import (
9         "reflect"
10         "sync"
11         "sync/atomic"
12 )
13
14 // 0      1         3             7                  size+7 size+8
15 // +------+---------+-------------+  +------------+  +-----------+
16 // | type | channel |     size    |  |  payload   |  | frame-end |
17 // +------+---------+-------------+  +------------+  +-----------+
18 //  octet   short         long         size octets       octet
19 const frameHeaderSize = 1 + 2 + 4 + 1
20
21 /*
22 Channel represents an AMQP channel. Used as a context for valid message
23 exchange.  Errors on methods with this Channel as a receiver means this channel
24 should be discarded and a new channel established.
25
26 */
27 type Channel struct {
28         destructor sync.Once
29         m          sync.Mutex // struct field mutex
30         confirmM   sync.Mutex // publisher confirms state mutex
31         notifyM    sync.RWMutex
32
33         connection *Connection
34
35         rpc       chan message
36         consumers *consumers
37
38         id uint16
39
40         // closed is set to 1 when the channel has been closed - see Channel.send()
41         closed int32
42
43         // true when we will never notify again
44         noNotify bool
45
46         // Channel and Connection exceptions will be broadcast on these listeners.
47         closes []chan *Error
48
49         // Listeners for active=true flow control.  When true is sent to a listener,
50         // publishing should pause until false is sent to listeners.
51         flows []chan bool
52
53         // Listeners for returned publishings for unroutable messages on mandatory
54         // publishings or undeliverable messages on immediate publishings.
55         returns []chan Return
56
57         // Listeners for when the server notifies the client that
58         // a consumer has been cancelled.
59         cancels []chan string
60
61         // Allocated when in confirm mode in order to track publish counter and order confirms
62         confirms   *confirms
63         confirming bool
64
65         // Selects on any errors from shutdown during RPC
66         errors chan *Error
67
68         // State machine that manages frame order, must only be mutated by the connection
69         recv func(*Channel, frame) error
70
71         // Current state for frame re-assembly, only mutated from recv
72         message messageWithContent
73         header  *headerFrame
74         body    []byte
75 }
76
77 // Constructs a new channel with the given framing rules
78 func newChannel(c *Connection, id uint16) *Channel {
79         return &Channel{
80                 connection: c,
81                 id:         id,
82                 rpc:        make(chan message),
83                 consumers:  makeConsumers(),
84                 confirms:   newConfirms(),
85                 recv:       (*Channel).recvMethod,
86                 errors:     make(chan *Error, 1),
87         }
88 }
89
90 // shutdown is called by Connection after the channel has been removed from the
91 // connection registry.
92 func (ch *Channel) shutdown(e *Error) {
93         ch.destructor.Do(func() {
94                 ch.m.Lock()
95                 defer ch.m.Unlock()
96
97                 // Grab an exclusive lock for the notify channels
98                 ch.notifyM.Lock()
99                 defer ch.notifyM.Unlock()
100
101                 // Broadcast abnormal shutdown
102                 if e != nil {
103                         for _, c := range ch.closes {
104                                 c <- e
105                         }
106                 }
107
108                 // Signal that from now on, Channel.send() should call
109                 // Channel.sendClosed()
110                 atomic.StoreInt32(&ch.closed, 1)
111
112                 // Notify RPC if we're selecting
113                 if e != nil {
114                         ch.errors <- e
115                 }
116
117                 ch.consumers.close()
118
119                 for _, c := range ch.closes {
120                         close(c)
121                 }
122
123                 for _, c := range ch.flows {
124                         close(c)
125                 }
126
127                 for _, c := range ch.returns {
128                         close(c)
129                 }
130
131                 for _, c := range ch.cancels {
132                         close(c)
133                 }
134
135                 // Set the slices to nil to prevent the dispatch() range from sending on
136                 // the now closed channels after we release the notifyM mutex
137                 ch.flows = nil
138                 ch.closes = nil
139                 ch.returns = nil
140                 ch.cancels = nil
141
142                 if ch.confirms != nil {
143                         ch.confirms.Close()
144                 }
145
146                 close(ch.errors)
147                 ch.noNotify = true
148         })
149 }
150
151 // send calls Channel.sendOpen() during normal operation.
152 //
153 // After the channel has been closed, send calls Channel.sendClosed(), ensuring
154 // only 'channel.close' is sent to the server.
155 func (ch *Channel) send(msg message) (err error) {
156         // If the channel is closed, use Channel.sendClosed()
157         if atomic.LoadInt32(&ch.closed) == 1 {
158                 return ch.sendClosed(msg)
159         }
160
161         return ch.sendOpen(msg)
162 }
163
164 func (ch *Channel) open() error {
165         return ch.call(&channelOpen{}, &channelOpenOk{})
166 }
167
168 // Performs a request/response call for when the message is not NoWait and is
169 // specified as Synchronous.
170 func (ch *Channel) call(req message, res ...message) error {
171         if err := ch.send(req); err != nil {
172                 return err
173         }
174
175         if req.wait() {
176                 select {
177                 case e, ok := <-ch.errors:
178                         if ok {
179                                 return e
180                         }
181                         return ErrClosed
182
183                 case msg := <-ch.rpc:
184                         if msg != nil {
185                                 for _, try := range res {
186                                         if reflect.TypeOf(msg) == reflect.TypeOf(try) {
187                                                 // *res = *msg
188                                                 vres := reflect.ValueOf(try).Elem()
189                                                 vmsg := reflect.ValueOf(msg).Elem()
190                                                 vres.Set(vmsg)
191                                                 return nil
192                                         }
193                                 }
194                                 return ErrCommandInvalid
195                         }
196                         // RPC channel has been closed without an error, likely due to a hard
197                         // error on the Connection.  This indicates we have already been
198                         // shutdown and if were waiting, will have returned from the errors chan.
199                         return ErrClosed
200                 }
201         }
202
203         return nil
204 }
205
206 func (ch *Channel) sendClosed(msg message) (err error) {
207         // After a 'channel.close' is sent or received the only valid response is
208         // channel.close-ok
209         if _, ok := msg.(*channelCloseOk); ok {
210                 return ch.connection.send(&methodFrame{
211                         ChannelId: ch.id,
212                         Method:    msg,
213                 })
214         }
215
216         return ErrClosed
217 }
218
219 func (ch *Channel) sendOpen(msg message) (err error) {
220         if content, ok := msg.(messageWithContent); ok {
221                 props, body := content.getContent()
222                 class, _ := content.id()
223
224                 // catch client max frame size==0 and server max frame size==0
225                 // set size to length of what we're trying to publish
226                 var size int
227                 if ch.connection.Config.FrameSize > 0 {
228                         size = ch.connection.Config.FrameSize - frameHeaderSize
229                 } else {
230                         size = len(body)
231                 }
232
233                 if err = ch.connection.send(&methodFrame{
234                         ChannelId: ch.id,
235                         Method:    content,
236                 }); err != nil {
237                         return
238                 }
239
240                 if err = ch.connection.send(&headerFrame{
241                         ChannelId:  ch.id,
242                         ClassId:    class,
243                         Size:       uint64(len(body)),
244                         Properties: props,
245                 }); err != nil {
246                         return
247                 }
248
249                 // chunk body into size (max frame size - frame header size)
250                 for i, j := 0, size; i < len(body); i, j = j, j+size {
251                         if j > len(body) {
252                                 j = len(body)
253                         }
254
255                         if err = ch.connection.send(&bodyFrame{
256                                 ChannelId: ch.id,
257                                 Body:      body[i:j],
258                         }); err != nil {
259                                 return
260                         }
261                 }
262         } else {
263                 err = ch.connection.send(&methodFrame{
264                         ChannelId: ch.id,
265                         Method:    msg,
266                 })
267         }
268
269         return
270 }
271
272 // Eventually called via the state machine from the connection's reader
273 // goroutine, so assumes serialized access.
274 func (ch *Channel) dispatch(msg message) {
275         switch m := msg.(type) {
276         case *channelClose:
277                 // lock before sending connection.close-ok
278                 // to avoid unexpected interleaving with basic.publish frames if
279                 // publishing is happening concurrently
280                 ch.m.Lock()
281                 ch.send(&channelCloseOk{})
282                 ch.m.Unlock()
283                 ch.connection.closeChannel(ch, newError(m.ReplyCode, m.ReplyText))
284
285         case *channelFlow:
286                 ch.notifyM.RLock()
287                 for _, c := range ch.flows {
288                         c <- m.Active
289                 }
290                 ch.notifyM.RUnlock()
291                 ch.send(&channelFlowOk{Active: m.Active})
292
293         case *basicCancel:
294                 ch.notifyM.RLock()
295                 for _, c := range ch.cancels {
296                         c <- m.ConsumerTag
297                 }
298                 ch.notifyM.RUnlock()
299                 ch.consumers.cancel(m.ConsumerTag)
300
301         case *basicReturn:
302                 ret := newReturn(*m)
303                 ch.notifyM.RLock()
304                 for _, c := range ch.returns {
305                         c <- *ret
306                 }
307                 ch.notifyM.RUnlock()
308
309         case *basicAck:
310                 if ch.confirming {
311                         if m.Multiple {
312                                 ch.confirms.Multiple(Confirmation{m.DeliveryTag, true})
313                         } else {
314                                 ch.confirms.One(Confirmation{m.DeliveryTag, true})
315                         }
316                 }
317
318         case *basicNack:
319                 if ch.confirming {
320                         if m.Multiple {
321                                 ch.confirms.Multiple(Confirmation{m.DeliveryTag, false})
322                         } else {
323                                 ch.confirms.One(Confirmation{m.DeliveryTag, false})
324                         }
325                 }
326
327         case *basicDeliver:
328                 ch.consumers.send(m.ConsumerTag, newDelivery(ch, m))
329                 // TODO log failed consumer and close channel, this can happen when
330                 // deliveries are in flight and a no-wait cancel has happened
331
332         default:
333                 ch.rpc <- msg
334         }
335 }
336
337 func (ch *Channel) transition(f func(*Channel, frame) error) error {
338         ch.recv = f
339         return nil
340 }
341
342 func (ch *Channel) recvMethod(f frame) error {
343         switch frame := f.(type) {
344         case *methodFrame:
345                 if msg, ok := frame.Method.(messageWithContent); ok {
346                         ch.body = make([]byte, 0)
347                         ch.message = msg
348                         return ch.transition((*Channel).recvHeader)
349                 }
350
351                 ch.dispatch(frame.Method) // termination state
352                 return ch.transition((*Channel).recvMethod)
353
354         case *headerFrame:
355                 // drop
356                 return ch.transition((*Channel).recvMethod)
357
358         case *bodyFrame:
359                 // drop
360                 return ch.transition((*Channel).recvMethod)
361         }
362
363         panic("unexpected frame type")
364 }
365
366 func (ch *Channel) recvHeader(f frame) error {
367         switch frame := f.(type) {
368         case *methodFrame:
369                 // interrupt content and handle method
370                 return ch.recvMethod(f)
371
372         case *headerFrame:
373                 // start collecting if we expect body frames
374                 ch.header = frame
375
376                 if frame.Size == 0 {
377                         ch.message.setContent(ch.header.Properties, ch.body)
378                         ch.dispatch(ch.message) // termination state
379                         return ch.transition((*Channel).recvMethod)
380                 }
381                 return ch.transition((*Channel).recvContent)
382
383         case *bodyFrame:
384                 // drop and reset
385                 return ch.transition((*Channel).recvMethod)
386         }
387
388         panic("unexpected frame type")
389 }
390
391 // state after method + header and before the length
392 // defined by the header has been reached
393 func (ch *Channel) recvContent(f frame) error {
394         switch frame := f.(type) {
395         case *methodFrame:
396                 // interrupt content and handle method
397                 return ch.recvMethod(f)
398
399         case *headerFrame:
400                 // drop and reset
401                 return ch.transition((*Channel).recvMethod)
402
403         case *bodyFrame:
404                 ch.body = append(ch.body, frame.Body...)
405
406                 if uint64(len(ch.body)) >= ch.header.Size {
407                         ch.message.setContent(ch.header.Properties, ch.body)
408                         ch.dispatch(ch.message) // termination state
409                         return ch.transition((*Channel).recvMethod)
410                 }
411
412                 return ch.transition((*Channel).recvContent)
413         }
414
415         panic("unexpected frame type")
416 }
417
418 /*
419 Close initiate a clean channel closure by sending a close message with the error
420 code set to '200'.
421
422 It is safe to call this method multiple times.
423
424 */
425 func (ch *Channel) Close() error {
426         defer ch.connection.closeChannel(ch, nil)
427         return ch.call(
428                 &channelClose{ReplyCode: replySuccess},
429                 &channelCloseOk{},
430         )
431 }
432
433 /*
434 NotifyClose registers a listener for when the server sends a channel or
435 connection exception in the form of a Connection.Close or Channel.Close method.
436 Connection exceptions will be broadcast to all open channels and all channels
437 will be closed, where channel exceptions will only be broadcast to listeners to
438 this channel.
439
440 The chan provided will be closed when the Channel is closed and on a
441 graceful close, no error will be sent.
442
443 */
444 func (ch *Channel) NotifyClose(c chan *Error) chan *Error {
445         ch.notifyM.Lock()
446         defer ch.notifyM.Unlock()
447
448         if ch.noNotify {
449                 close(c)
450         } else {
451                 ch.closes = append(ch.closes, c)
452         }
453
454         return c
455 }
456
457 /*
458 NotifyFlow registers a listener for basic.flow methods sent by the server.
459 When `false` is sent on one of the listener channels, all publishers should
460 pause until a `true` is sent.
461
462 The server may ask the producer to pause or restart the flow of Publishings
463 sent by on a channel. This is a simple flow-control mechanism that a server can
464 use to avoid overflowing its queues or otherwise finding itself receiving more
465 messages than it can process. Note that this method is not intended for window
466 control. It does not affect contents returned by basic.get-ok methods.
467
468 When a new channel is opened, it is active (flow is active). Some
469 applications assume that channels are inactive until started. To emulate
470 this behavior a client MAY open the channel, then pause it.
471
472 Publishers should respond to a flow messages as rapidly as possible and the
473 server may disconnect over producing channels that do not respect these
474 messages.
475
476 basic.flow-ok methods will always be returned to the server regardless of
477 the number of listeners there are.
478
479 To control the flow of deliveries from the server, use the Channel.Flow()
480 method instead.
481
482 Note: RabbitMQ will rather use TCP pushback on the network connection instead
483 of sending basic.flow.  This means that if a single channel is producing too
484 much on the same connection, all channels using that connection will suffer,
485 including acknowledgments from deliveries.  Use different Connections if you
486 desire to interleave consumers and producers in the same process to avoid your
487 basic.ack messages from getting rate limited with your basic.publish messages.
488
489 */
490 func (ch *Channel) NotifyFlow(c chan bool) chan bool {
491         ch.notifyM.Lock()
492         defer ch.notifyM.Unlock()
493
494         if ch.noNotify {
495                 close(c)
496         } else {
497                 ch.flows = append(ch.flows, c)
498         }
499
500         return c
501 }
502
503 /*
504 NotifyReturn registers a listener for basic.return methods.  These can be sent
505 from the server when a publish is undeliverable either from the mandatory or
506 immediate flags.
507
508 A return struct has a copy of the Publishing along with some error
509 information about why the publishing failed.
510
511 */
512 func (ch *Channel) NotifyReturn(c chan Return) chan Return {
513         ch.notifyM.Lock()
514         defer ch.notifyM.Unlock()
515
516         if ch.noNotify {
517                 close(c)
518         } else {
519                 ch.returns = append(ch.returns, c)
520         }
521
522         return c
523 }
524
525 /*
526 NotifyCancel registers a listener for basic.cancel methods.  These can be sent
527 from the server when a queue is deleted or when consuming from a mirrored queue
528 where the master has just failed (and was moved to another node).
529
530 The subscription tag is returned to the listener.
531
532 */
533 func (ch *Channel) NotifyCancel(c chan string) chan string {
534         ch.notifyM.Lock()
535         defer ch.notifyM.Unlock()
536
537         if ch.noNotify {
538                 close(c)
539         } else {
540                 ch.cancels = append(ch.cancels, c)
541         }
542
543         return c
544 }
545
546 /*
547 NotifyConfirm calls NotifyPublish and starts a goroutine sending
548 ordered Ack and Nack DeliveryTag to the respective channels.
549
550 For strict ordering, use NotifyPublish instead.
551 */
552 func (ch *Channel) NotifyConfirm(ack, nack chan uint64) (chan uint64, chan uint64) {
553         confirms := ch.NotifyPublish(make(chan Confirmation, len(ack)+len(nack)))
554
555         go func() {
556                 for c := range confirms {
557                         if c.Ack {
558                                 ack <- c.DeliveryTag
559                         } else {
560                                 nack <- c.DeliveryTag
561                         }
562                 }
563                 close(ack)
564                 if nack != ack {
565                         close(nack)
566                 }
567         }()
568
569         return ack, nack
570 }
571
572 /*
573 NotifyPublish registers a listener for reliable publishing. Receives from this
574 chan for every publish after Channel.Confirm will be in order starting with
575 DeliveryTag 1.
576
577 There will be one and only one Confirmation Publishing starting with the
578 delivery tag of 1 and progressing sequentially until the total number of
579 Publishings have been seen by the server.
580
581 Acknowledgments will be received in the order of delivery from the
582 NotifyPublish channels even if the server acknowledges them out of order.
583
584 The listener chan will be closed when the Channel is closed.
585
586 The capacity of the chan Confirmation must be at least as large as the
587 number of outstanding publishings.  Not having enough buffered chans will
588 create a deadlock if you attempt to perform other operations on the Connection
589 or Channel while confirms are in-flight.
590
591 It's advisable to wait for all Confirmations to arrive before calling
592 Channel.Close() or Connection.Close().
593
594 */
595 func (ch *Channel) NotifyPublish(confirm chan Confirmation) chan Confirmation {
596         ch.notifyM.Lock()
597         defer ch.notifyM.Unlock()
598
599         if ch.noNotify {
600                 close(confirm)
601         } else {
602                 ch.confirms.Listen(confirm)
603         }
604
605         return confirm
606
607 }
608
609 /*
610 Qos controls how many messages or how many bytes the server will try to keep on
611 the network for consumers before receiving delivery acks.  The intent of Qos is
612 to make sure the network buffers stay full between the server and client.
613
614 With a prefetch count greater than zero, the server will deliver that many
615 messages to consumers before acknowledgments are received.  The server ignores
616 this option when consumers are started with noAck because no acknowledgments
617 are expected or sent.
618
619 With a prefetch size greater than zero, the server will try to keep at least
620 that many bytes of deliveries flushed to the network before receiving
621 acknowledgments from the consumers.  This option is ignored when consumers are
622 started with noAck.
623
624 When global is true, these Qos settings apply to all existing and future
625 consumers on all channels on the same connection.  When false, the Channel.Qos
626 settings will apply to all existing and future consumers on this channel.
627
628 Please see the RabbitMQ Consumer Prefetch documentation for an explanation of
629 how the global flag is implemented in RabbitMQ, as it differs from the
630 AMQP 0.9.1 specification in that global Qos settings are limited in scope to
631 channels, not connections (https://www.rabbitmq.com/consumer-prefetch.html).
632
633 To get round-robin behavior between consumers consuming from the same queue on
634 different connections, set the prefetch count to 1, and the next available
635 message on the server will be delivered to the next available consumer.
636
637 If your consumer work time is reasonably consistent and not much greater
638 than two times your network round trip time, you will see significant
639 throughput improvements starting with a prefetch count of 2 or slightly
640 greater as described by benchmarks on RabbitMQ.
641
642 http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
643 */
644 func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error {
645         return ch.call(
646                 &basicQos{
647                         PrefetchCount: uint16(prefetchCount),
648                         PrefetchSize:  uint32(prefetchSize),
649                         Global:        global,
650                 },
651                 &basicQosOk{},
652         )
653 }
654
655 /*
656 Cancel stops deliveries to the consumer chan established in Channel.Consume and
657 identified by consumer.
658
659 Only use this method to cleanly stop receiving deliveries from the server and
660 cleanly shut down the consumer chan identified by this tag.  Using this method
661 and waiting for remaining messages to flush from the consumer chan will ensure
662 all messages received on the network will be delivered to the receiver of your
663 consumer chan.
664
665 Continue consuming from the chan Delivery provided by Channel.Consume until the
666 chan closes.
667
668 When noWait is true, do not wait for the server to acknowledge the cancel.
669 Only use this when you are certain there are no deliveries in flight that
670 require an acknowledgment, otherwise they will arrive and be dropped in the
671 client without an ack, and will not be redelivered to other consumers.
672
673 */
674 func (ch *Channel) Cancel(consumer string, noWait bool) error {
675         req := &basicCancel{
676                 ConsumerTag: consumer,
677                 NoWait:      noWait,
678         }
679         res := &basicCancelOk{}
680
681         if err := ch.call(req, res); err != nil {
682                 return err
683         }
684
685         if req.wait() {
686                 ch.consumers.cancel(res.ConsumerTag)
687         } else {
688                 // Potentially could drop deliveries in flight
689                 ch.consumers.cancel(consumer)
690         }
691
692         return nil
693 }
694
695 /*
696 QueueDeclare declares a queue to hold messages and deliver to consumers.
697 Declaring creates a queue if it doesn't already exist, or ensures that an
698 existing queue matches the same parameters.
699
700 Every queue declared gets a default binding to the empty exchange "" which has
701 the type "direct" with the routing key matching the queue's name.  With this
702 default binding, it is possible to publish messages that route directly to
703 this queue by publishing to "" with the routing key of the queue name.
704
705   QueueDeclare("alerts", true, false, false, false, nil)
706   Publish("", "alerts", false, false, Publishing{Body: []byte("...")})
707
708   Delivery       Exchange  Key       Queue
709   -----------------------------------------------
710   key: alerts -> ""     -> alerts -> alerts
711
712 The queue name may be empty, in which case the server will generate a unique name
713 which will be returned in the Name field of Queue struct.
714
715 Durable and Non-Auto-Deleted queues will survive server restarts and remain
716 when there are no remaining consumers or bindings.  Persistent publishings will
717 be restored in this queue on server restart.  These queues are only able to be
718 bound to durable exchanges.
719
720 Non-Durable and Auto-Deleted queues will not be redeclared on server restart
721 and will be deleted by the server after a short time when the last consumer is
722 canceled or the last consumer's channel is closed.  Queues with this lifetime
723 can also be deleted normally with QueueDelete.  These durable queues can only
724 be bound to non-durable exchanges.
725
726 Non-Durable and Non-Auto-Deleted queues will remain declared as long as the
727 server is running regardless of how many consumers.  This lifetime is useful
728 for temporary topologies that may have long delays between consumer activity.
729 These queues can only be bound to non-durable exchanges.
730
731 Durable and Auto-Deleted queues will be restored on server restart, but without
732 active consumers will not survive and be removed.  This Lifetime is unlikely
733 to be useful.
734
735 Exclusive queues are only accessible by the connection that declares them and
736 will be deleted when the connection closes.  Channels on other connections
737 will receive an error when attempting  to declare, bind, consume, purge or
738 delete a queue with the same name.
739
740 When noWait is true, the queue will assume to be declared on the server.  A
741 channel exception will arrive if the conditions are met for existing queues
742 or attempting to modify an existing queue from a different connection.
743
744 When the error return value is not nil, you can assume the queue could not be
745 declared with these parameters, and the channel will be closed.
746
747 */
748 func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error) {
749         if err := args.Validate(); err != nil {
750                 return Queue{}, err
751         }
752
753         req := &queueDeclare{
754                 Queue:      name,
755                 Passive:    false,
756                 Durable:    durable,
757                 AutoDelete: autoDelete,
758                 Exclusive:  exclusive,
759                 NoWait:     noWait,
760                 Arguments:  args,
761         }
762         res := &queueDeclareOk{}
763
764         if err := ch.call(req, res); err != nil {
765                 return Queue{}, err
766         }
767
768         if req.wait() {
769                 return Queue{
770                         Name:      res.Queue,
771                         Messages:  int(res.MessageCount),
772                         Consumers: int(res.ConsumerCount),
773                 }, nil
774         }
775
776         return Queue{Name: name}, nil
777 }
778
779 /*
780
781 QueueDeclarePassive is functionally and parametrically equivalent to
782 QueueDeclare, except that it sets the "passive" attribute to true. A passive
783 queue is assumed by RabbitMQ to already exist, and attempting to connect to a
784 non-existent queue will cause RabbitMQ to throw an exception. This function
785 can be used to test for the existence of a queue.
786
787 */
788 func (ch *Channel) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error) {
789         if err := args.Validate(); err != nil {
790                 return Queue{}, err
791         }
792
793         req := &queueDeclare{
794                 Queue:      name,
795                 Passive:    true,
796                 Durable:    durable,
797                 AutoDelete: autoDelete,
798                 Exclusive:  exclusive,
799                 NoWait:     noWait,
800                 Arguments:  args,
801         }
802         res := &queueDeclareOk{}
803
804         if err := ch.call(req, res); err != nil {
805                 return Queue{}, err
806         }
807
808         if req.wait() {
809                 return Queue{
810                         Name:      res.Queue,
811                         Messages:  int(res.MessageCount),
812                         Consumers: int(res.ConsumerCount),
813                 }, nil
814         }
815
816         return Queue{Name: name}, nil
817 }
818
819 /*
820 QueueInspect passively declares a queue by name to inspect the current message
821 count and consumer count.
822
823 Use this method to check how many messages ready for delivery reside in the queue,
824 how many consumers are receiving deliveries, and whether a queue by this
825 name already exists.
826
827 If the queue by this name exists, use Channel.QueueDeclare check if it is
828 declared with specific parameters.
829
830 If a queue by this name does not exist, an error will be returned and the
831 channel will be closed.
832
833 */
834 func (ch *Channel) QueueInspect(name string) (Queue, error) {
835         req := &queueDeclare{
836                 Queue:   name,
837                 Passive: true,
838         }
839         res := &queueDeclareOk{}
840
841         err := ch.call(req, res)
842
843         state := Queue{
844                 Name:      name,
845                 Messages:  int(res.MessageCount),
846                 Consumers: int(res.ConsumerCount),
847         }
848
849         return state, err
850 }
851
852 /*
853 QueueBind binds an exchange to a queue so that publishings to the exchange will
854 be routed to the queue when the publishing routing key matches the binding
855 routing key.
856
857   QueueBind("pagers", "alert", "log", false, nil)
858   QueueBind("emails", "info", "log", false, nil)
859
860   Delivery       Exchange  Key       Queue
861   -----------------------------------------------
862   key: alert --> log ----> alert --> pagers
863   key: info ---> log ----> info ---> emails
864   key: debug --> log       (none)    (dropped)
865
866 If a binding with the same key and arguments already exists between the
867 exchange and queue, the attempt to rebind will be ignored and the existing
868 binding will be retained.
869
870 In the case that multiple bindings may cause the message to be routed to the
871 same queue, the server will only route the publishing once.  This is possible
872 with topic exchanges.
873
874   QueueBind("pagers", "alert", "amq.topic", false, nil)
875   QueueBind("emails", "info", "amq.topic", false, nil)
876   QueueBind("emails", "#", "amq.topic", false, nil) // match everything
877
878   Delivery       Exchange        Key       Queue
879   -----------------------------------------------
880   key: alert --> amq.topic ----> alert --> pagers
881   key: info ---> amq.topic ----> # ------> emails
882                            \---> info ---/
883   key: debug --> amq.topic ----> # ------> emails
884
885 It is only possible to bind a durable queue to a durable exchange regardless of
886 whether the queue or exchange is auto-deleted.  Bindings between durable queues
887 and exchanges will also be restored on server restart.
888
889 If the binding could not complete, an error will be returned and the channel
890 will be closed.
891
892 When noWait is false and the queue could not be bound, the channel will be
893 closed with an error.
894
895 */
896 func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error {
897         if err := args.Validate(); err != nil {
898                 return err
899         }
900
901         return ch.call(
902                 &queueBind{
903                         Queue:      name,
904                         Exchange:   exchange,
905                         RoutingKey: key,
906                         NoWait:     noWait,
907                         Arguments:  args,
908                 },
909                 &queueBindOk{},
910         )
911 }
912
913 /*
914 QueueUnbind removes a binding between an exchange and queue matching the key and
915 arguments.
916
917 It is possible to send and empty string for the exchange name which means to
918 unbind the queue from the default exchange.
919
920 */
921 func (ch *Channel) QueueUnbind(name, key, exchange string, args Table) error {
922         if err := args.Validate(); err != nil {
923                 return err
924         }
925
926         return ch.call(
927                 &queueUnbind{
928                         Queue:      name,
929                         Exchange:   exchange,
930                         RoutingKey: key,
931                         Arguments:  args,
932                 },
933                 &queueUnbindOk{},
934         )
935 }
936
937 /*
938 QueuePurge removes all messages from the named queue which are not waiting to
939 be acknowledged.  Messages that have been delivered but have not yet been
940 acknowledged will not be removed.
941
942 When successful, returns the number of messages purged.
943
944 If noWait is true, do not wait for the server response and the number of
945 messages purged will not be meaningful.
946 */
947 func (ch *Channel) QueuePurge(name string, noWait bool) (int, error) {
948         req := &queuePurge{
949                 Queue:  name,
950                 NoWait: noWait,
951         }
952         res := &queuePurgeOk{}
953
954         err := ch.call(req, res)
955
956         return int(res.MessageCount), err
957 }
958
959 /*
960 QueueDelete removes the queue from the server including all bindings then
961 purges the messages based on server configuration, returning the number of
962 messages purged.
963
964 When ifUnused is true, the queue will not be deleted if there are any
965 consumers on the queue.  If there are consumers, an error will be returned and
966 the channel will be closed.
967
968 When ifEmpty is true, the queue will not be deleted if there are any messages
969 remaining on the queue.  If there are messages, an error will be returned and
970 the channel will be closed.
971
972 When noWait is true, the queue will be deleted without waiting for a response
973 from the server.  The purged message count will not be meaningful. If the queue
974 could not be deleted, a channel exception will be raised and the channel will
975 be closed.
976
977 */
978 func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error) {
979         req := &queueDelete{
980                 Queue:    name,
981                 IfUnused: ifUnused,
982                 IfEmpty:  ifEmpty,
983                 NoWait:   noWait,
984         }
985         res := &queueDeleteOk{}
986
987         err := ch.call(req, res)
988
989         return int(res.MessageCount), err
990 }
991
992 /*
993 Consume immediately starts delivering queued messages.
994
995 Begin receiving on the returned chan Delivery before any other operation on the
996 Connection or Channel.
997
998 Continues deliveries to the returned chan Delivery until Channel.Cancel,
999 Connection.Close, Channel.Close, or an AMQP exception occurs.  Consumers must
1000 range over the chan to ensure all deliveries are received.  Unreceived
1001 deliveries will block all methods on the same connection.
1002
1003 All deliveries in AMQP must be acknowledged.  It is expected of the consumer to
1004 call Delivery.Ack after it has successfully processed the delivery.  If the
1005 consumer is cancelled or the channel or connection is closed any unacknowledged
1006 deliveries will be requeued at the end of the same queue.
1007
1008 The consumer is identified by a string that is unique and scoped for all
1009 consumers on this channel.  If you wish to eventually cancel the consumer, use
1010 the same non-empty identifier in Channel.Cancel.  An empty string will cause
1011 the library to generate a unique identity.  The consumer identity will be
1012 included in every Delivery in the ConsumerTag field
1013
1014 When autoAck (also known as noAck) is true, the server will acknowledge
1015 deliveries to this consumer prior to writing the delivery to the network.  When
1016 autoAck is true, the consumer should not call Delivery.Ack. Automatically
1017 acknowledging deliveries means that some deliveries may get lost if the
1018 consumer is unable to process them after the server delivers them.
1019 See http://www.rabbitmq.com/confirms.html for more details.
1020
1021 When exclusive is true, the server will ensure that this is the sole consumer
1022 from this queue. When exclusive is false, the server will fairly distribute
1023 deliveries across multiple consumers.
1024
1025 The noLocal flag is not supported by RabbitMQ.
1026
1027 It's advisable to use separate connections for
1028 Channel.Publish and Channel.Consume so not to have TCP pushback on publishing
1029 affect the ability to consume messages, so this parameter is here mostly for
1030 completeness.
1031
1032 When noWait is true, do not wait for the server to confirm the request and
1033 immediately begin deliveries.  If it is not possible to consume, a channel
1034 exception will be raised and the channel will be closed.
1035
1036 Optional arguments can be provided that have specific semantics for the queue
1037 or server.
1038
1039 Inflight messages, limited by Channel.Qos will be buffered until received from
1040 the returned chan.
1041
1042 When the Channel or Connection is closed, all buffered and inflight messages will
1043 be dropped.
1044
1045 When the consumer tag is cancelled, all inflight messages will be delivered until
1046 the returned chan is closed.
1047
1048 */
1049 func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
1050         // When we return from ch.call, there may be a delivery already for the
1051         // consumer that hasn't been added to the consumer hash yet.  Because of
1052         // this, we never rely on the server picking a consumer tag for us.
1053
1054         if err := args.Validate(); err != nil {
1055                 return nil, err
1056         }
1057
1058         if consumer == "" {
1059                 consumer = uniqueConsumerTag()
1060         }
1061
1062         req := &basicConsume{
1063                 Queue:       queue,
1064                 ConsumerTag: consumer,
1065                 NoLocal:     noLocal,
1066                 NoAck:       autoAck,
1067                 Exclusive:   exclusive,
1068                 NoWait:      noWait,
1069                 Arguments:   args,
1070         }
1071         res := &basicConsumeOk{}
1072
1073         deliveries := make(chan Delivery)
1074
1075         ch.consumers.add(consumer, deliveries)
1076
1077         if err := ch.call(req, res); err != nil {
1078                 ch.consumers.cancel(consumer)
1079                 return nil, err
1080         }
1081
1082         return (<-chan Delivery)(deliveries), nil
1083 }
1084
1085 /*
1086 ExchangeDeclare declares an exchange on the server. If the exchange does not
1087 already exist, the server will create it.  If the exchange exists, the server
1088 verifies that it is of the provided type, durability and auto-delete flags.
1089
1090 Errors returned from this method will close the channel.
1091
1092 Exchange names starting with "amq." are reserved for pre-declared and
1093 standardized exchanges. The client MAY declare an exchange starting with
1094 "amq." if the passive option is set, or the exchange already exists.  Names can
1095 consist of a non-empty sequence of letters, digits, hyphen, underscore,
1096 period, or colon.
1097
1098 Each exchange belongs to one of a set of exchange kinds/types implemented by
1099 the server. The exchange types define the functionality of the exchange - i.e.
1100 how messages are routed through it. Once an exchange is declared, its type
1101 cannot be changed.  The common types are "direct", "fanout", "topic" and
1102 "headers".
1103
1104 Durable and Non-Auto-Deleted exchanges will survive server restarts and remain
1105 declared when there are no remaining bindings.  This is the best lifetime for
1106 long-lived exchange configurations like stable routes and default exchanges.
1107
1108 Non-Durable and Auto-Deleted exchanges will be deleted when there are no
1109 remaining bindings and not restored on server restart.  This lifetime is
1110 useful for temporary topologies that should not pollute the virtual host on
1111 failure or after the consumers have completed.
1112
1113 Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is
1114 running including when there are no remaining bindings.  This is useful for
1115 temporary topologies that may have long delays between bindings.
1116
1117 Durable and Auto-Deleted exchanges will survive server restarts and will be
1118 removed before and after server restarts when there are no remaining bindings.
1119 These exchanges are useful for robust temporary topologies or when you require
1120 binding durable queues to auto-deleted exchanges.
1121
1122 Note: RabbitMQ declares the default exchange types like 'amq.fanout' as
1123 durable, so queues that bind to these pre-declared exchanges must also be
1124 durable.
1125
1126 Exchanges declared as `internal` do not accept accept publishings. Internal
1127 exchanges are useful when you wish to implement inter-exchange topologies
1128 that should not be exposed to users of the broker.
1129
1130 When noWait is true, declare without waiting for a confirmation from the server.
1131 The channel may be closed as a result of an error.  Add a NotifyClose listener
1132 to respond to any exceptions.
1133
1134 Optional amqp.Table of arguments that are specific to the server's implementation of
1135 the exchange can be sent for exchange types that require extra parameters.
1136 */
1137 func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error {
1138         if err := args.Validate(); err != nil {
1139                 return err
1140         }
1141
1142         return ch.call(
1143                 &exchangeDeclare{
1144                         Exchange:   name,
1145                         Type:       kind,
1146                         Passive:    false,
1147                         Durable:    durable,
1148                         AutoDelete: autoDelete,
1149                         Internal:   internal,
1150                         NoWait:     noWait,
1151                         Arguments:  args,
1152                 },
1153                 &exchangeDeclareOk{},
1154         )
1155 }
1156
1157 /*
1158
1159 ExchangeDeclarePassive is functionally and parametrically equivalent to
1160 ExchangeDeclare, except that it sets the "passive" attribute to true. A passive
1161 exchange is assumed by RabbitMQ to already exist, and attempting to connect to a
1162 non-existent exchange will cause RabbitMQ to throw an exception. This function
1163 can be used to detect the existence of an exchange.
1164
1165 */
1166 func (ch *Channel) ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error {
1167         if err := args.Validate(); err != nil {
1168                 return err
1169         }
1170
1171         return ch.call(
1172                 &exchangeDeclare{
1173                         Exchange:   name,
1174                         Type:       kind,
1175                         Passive:    true,
1176                         Durable:    durable,
1177                         AutoDelete: autoDelete,
1178                         Internal:   internal,
1179                         NoWait:     noWait,
1180                         Arguments:  args,
1181                 },
1182                 &exchangeDeclareOk{},
1183         )
1184 }
1185
1186 /*
1187 ExchangeDelete removes the named exchange from the server. When an exchange is
1188 deleted all queue bindings on the exchange are also deleted.  If this exchange
1189 does not exist, the channel will be closed with an error.
1190
1191 When ifUnused is true, the server will only delete the exchange if it has no queue
1192 bindings.  If the exchange has queue bindings the server does not delete it
1193 but close the channel with an exception instead.  Set this to true if you are
1194 not the sole owner of the exchange.
1195
1196 When noWait is true, do not wait for a server confirmation that the exchange has
1197 been deleted.  Failing to delete the channel could close the channel.  Add a
1198 NotifyClose listener to respond to these channel exceptions.
1199 */
1200 func (ch *Channel) ExchangeDelete(name string, ifUnused, noWait bool) error {
1201         return ch.call(
1202                 &exchangeDelete{
1203                         Exchange: name,
1204                         IfUnused: ifUnused,
1205                         NoWait:   noWait,
1206                 },
1207                 &exchangeDeleteOk{},
1208         )
1209 }
1210
1211 /*
1212 ExchangeBind binds an exchange to another exchange to create inter-exchange
1213 routing topologies on the server.  This can decouple the private topology and
1214 routing exchanges from exchanges intended solely for publishing endpoints.
1215
1216 Binding two exchanges with identical arguments will not create duplicate
1217 bindings.
1218
1219 Binding one exchange to another with multiple bindings will only deliver a
1220 message once.  For example if you bind your exchange to `amq.fanout` with two
1221 different binding keys, only a single message will be delivered to your
1222 exchange even though multiple bindings will match.
1223
1224 Given a message delivered to the source exchange, the message will be forwarded
1225 to the destination exchange when the routing key is matched.
1226
1227   ExchangeBind("sell", "MSFT", "trade", false, nil)
1228   ExchangeBind("buy", "AAPL", "trade", false, nil)
1229
1230   Delivery       Source      Key      Destination
1231   example        exchange             exchange
1232   -----------------------------------------------
1233   key: AAPL  --> trade ----> MSFT     sell
1234                        \---> AAPL --> buy
1235
1236 When noWait is true, do not wait for the server to confirm the binding.  If any
1237 error occurs the channel will be closed.  Add a listener to NotifyClose to
1238 handle these errors.
1239
1240 Optional arguments specific to the exchanges bound can also be specified.
1241 */
1242 func (ch *Channel) ExchangeBind(destination, key, source string, noWait bool, args Table) error {
1243         if err := args.Validate(); err != nil {
1244                 return err
1245         }
1246
1247         return ch.call(
1248                 &exchangeBind{
1249                         Destination: destination,
1250                         Source:      source,
1251                         RoutingKey:  key,
1252                         NoWait:      noWait,
1253                         Arguments:   args,
1254                 },
1255                 &exchangeBindOk{},
1256         )
1257 }
1258
1259 /*
1260 ExchangeUnbind unbinds the destination exchange from the source exchange on the
1261 server by removing the routing key between them.  This is the inverse of
1262 ExchangeBind.  If the binding does not currently exist, an error will be
1263 returned.
1264
1265 When noWait is true, do not wait for the server to confirm the deletion of the
1266 binding.  If any error occurs the channel will be closed.  Add a listener to
1267 NotifyClose to handle these errors.
1268
1269 Optional arguments that are specific to the type of exchanges bound can also be
1270 provided.  These must match the same arguments specified in ExchangeBind to
1271 identify the binding.
1272 */
1273 func (ch *Channel) ExchangeUnbind(destination, key, source string, noWait bool, args Table) error {
1274         if err := args.Validate(); err != nil {
1275                 return err
1276         }
1277
1278         return ch.call(
1279                 &exchangeUnbind{
1280                         Destination: destination,
1281                         Source:      source,
1282                         RoutingKey:  key,
1283                         NoWait:      noWait,
1284                         Arguments:   args,
1285                 },
1286                 &exchangeUnbindOk{},
1287         )
1288 }
1289
1290 /*
1291 Publish sends a Publishing from the client to an exchange on the server.
1292
1293 When you want a single message to be delivered to a single queue, you can
1294 publish to the default exchange with the routingKey of the queue name.  This is
1295 because every declared queue gets an implicit route to the default exchange.
1296
1297 Since publishings are asynchronous, any undeliverable message will get returned
1298 by the server.  Add a listener with Channel.NotifyReturn to handle any
1299 undeliverable message when calling publish with either the mandatory or
1300 immediate parameters as true.
1301
1302 Publishings can be undeliverable when the mandatory flag is true and no queue is
1303 bound that matches the routing key, or when the immediate flag is true and no
1304 consumer on the matched queue is ready to accept the delivery.
1305
1306 This can return an error when the channel, connection or socket is closed.  The
1307 error or lack of an error does not indicate whether the server has received this
1308 publishing.
1309
1310 It is possible for publishing to not reach the broker if the underlying socket
1311 is shut down without pending publishing packets being flushed from the kernel
1312 buffers.  The easy way of making it probable that all publishings reach the
1313 server is to always call Connection.Close before terminating your publishing
1314 application.  The way to ensure that all publishings reach the server is to add
1315 a listener to Channel.NotifyPublish and put the channel in confirm mode with
1316 Channel.Confirm.  Publishing delivery tags and their corresponding
1317 confirmations start at 1.  Exit when all publishings are confirmed.
1318
1319 When Publish does not return an error and the channel is in confirm mode, the
1320 internal counter for DeliveryTags with the first confirmation starts at 1.
1321
1322 */
1323 func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error {
1324         if err := msg.Headers.Validate(); err != nil {
1325                 return err
1326         }
1327
1328         ch.m.Lock()
1329         defer ch.m.Unlock()
1330
1331         if err := ch.send(&basicPublish{
1332                 Exchange:   exchange,
1333                 RoutingKey: key,
1334                 Mandatory:  mandatory,
1335                 Immediate:  immediate,
1336                 Body:       msg.Body,
1337                 Properties: properties{
1338                         Headers:         msg.Headers,
1339                         ContentType:     msg.ContentType,
1340                         ContentEncoding: msg.ContentEncoding,
1341                         DeliveryMode:    msg.DeliveryMode,
1342                         Priority:        msg.Priority,
1343                         CorrelationId:   msg.CorrelationId,
1344                         ReplyTo:         msg.ReplyTo,
1345                         Expiration:      msg.Expiration,
1346                         MessageId:       msg.MessageId,
1347                         Timestamp:       msg.Timestamp,
1348                         Type:            msg.Type,
1349                         UserId:          msg.UserId,
1350                         AppId:           msg.AppId,
1351                 },
1352         }); err != nil {
1353                 return err
1354         }
1355
1356         if ch.confirming {
1357                 ch.confirms.Publish()
1358         }
1359
1360         return nil
1361 }
1362
1363 /*
1364 Get synchronously receives a single Delivery from the head of a queue from the
1365 server to the client.  In almost all cases, using Channel.Consume will be
1366 preferred.
1367
1368 If there was a delivery waiting on the queue and that delivery was received, the
1369 second return value will be true.  If there was no delivery waiting or an error
1370 occurred, the ok bool will be false.
1371
1372 All deliveries must be acknowledged including those from Channel.Get.  Call
1373 Delivery.Ack on the returned delivery when you have fully processed this
1374 delivery.
1375
1376 When autoAck is true, the server will automatically acknowledge this message so
1377 you don't have to.  But if you are unable to fully process this message before
1378 the channel or connection is closed, the message will not get requeued.
1379
1380 */
1381 func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error) {
1382         req := &basicGet{Queue: queue, NoAck: autoAck}
1383         res := &basicGetOk{}
1384         empty := &basicGetEmpty{}
1385
1386         if err := ch.call(req, res, empty); err != nil {
1387                 return Delivery{}, false, err
1388         }
1389
1390         if res.DeliveryTag > 0 {
1391                 return *(newDelivery(ch, res)), true, nil
1392         }
1393
1394         return Delivery{}, false, nil
1395 }
1396
1397 /*
1398 Tx puts the channel into transaction mode on the server.  All publishings and
1399 acknowledgments following this method will be atomically committed or rolled
1400 back for a single queue.  Call either Channel.TxCommit or Channel.TxRollback to
1401 leave a this transaction and immediately start a new transaction.
1402
1403 The atomicity across multiple queues is not defined as queue declarations and
1404 bindings are not included in the transaction.
1405
1406 The behavior of publishings that are delivered as mandatory or immediate while
1407 the channel is in a transaction is not defined.
1408
1409 Once a channel has been put into transaction mode, it cannot be taken out of
1410 transaction mode.  Use a different channel for non-transactional semantics.
1411
1412 */
1413 func (ch *Channel) Tx() error {
1414         return ch.call(
1415                 &txSelect{},
1416                 &txSelectOk{},
1417         )
1418 }
1419
1420 /*
1421 TxCommit atomically commits all publishings and acknowledgments for a single
1422 queue and immediately start a new transaction.
1423
1424 Calling this method without having called Channel.Tx is an error.
1425
1426 */
1427 func (ch *Channel) TxCommit() error {
1428         return ch.call(
1429                 &txCommit{},
1430                 &txCommitOk{},
1431         )
1432 }
1433
1434 /*
1435 TxRollback atomically rolls back all publishings and acknowledgments for a
1436 single queue and immediately start a new transaction.
1437
1438 Calling this method without having called Channel.Tx is an error.
1439
1440 */
1441 func (ch *Channel) TxRollback() error {
1442         return ch.call(
1443                 &txRollback{},
1444                 &txRollbackOk{},
1445         )
1446 }
1447
1448 /*
1449 Flow pauses the delivery of messages to consumers on this channel.  Channels
1450 are opened with flow control active, to open a channel with paused
1451 deliveries immediately call this method with `false` after calling
1452 Connection.Channel.
1453
1454 When active is `false`, this method asks the server to temporarily pause deliveries
1455 until called again with active as `true`.
1456
1457 Channel.Get methods will not be affected by flow control.
1458
1459 This method is not intended to act as window control.  Use Channel.Qos to limit
1460 the number of unacknowledged messages or bytes in flight instead.
1461
1462 The server may also send us flow methods to throttle our publishings.  A well
1463 behaving publishing client should add a listener with Channel.NotifyFlow and
1464 pause its publishings when `false` is sent on that channel.
1465
1466 Note: RabbitMQ prefers to use TCP push back to control flow for all channels on
1467 a connection, so under high volume scenarios, it's wise to open separate
1468 Connections for publishings and deliveries.
1469
1470 */
1471 func (ch *Channel) Flow(active bool) error {
1472         return ch.call(
1473                 &channelFlow{Active: active},
1474                 &channelFlowOk{},
1475         )
1476 }
1477
1478 /*
1479 Confirm puts this channel into confirm mode so that the client can ensure all
1480 publishings have successfully been received by the server.  After entering this
1481 mode, the server will send a basic.ack or basic.nack message with the deliver
1482 tag set to a 1 based incremental index corresponding to every publishing
1483 received after the this method returns.
1484
1485 Add a listener to Channel.NotifyPublish to respond to the Confirmations. If
1486 Channel.NotifyPublish is not called, the Confirmations will be silently
1487 ignored.
1488
1489 The order of acknowledgments is not bound to the order of deliveries.
1490
1491 Ack and Nack confirmations will arrive at some point in the future.
1492
1493 Unroutable mandatory or immediate messages are acknowledged immediately after
1494 any Channel.NotifyReturn listeners have been notified.  Other messages are
1495 acknowledged when all queues that should have the message routed to them have
1496 either received acknowledgment of delivery or have enqueued the message,
1497 persisting the message if necessary.
1498
1499 When noWait is true, the client will not wait for a response.  A channel
1500 exception could occur if the server does not support this method.
1501
1502 */
1503 func (ch *Channel) Confirm(noWait bool) error {
1504         if err := ch.call(
1505                 &confirmSelect{Nowait: noWait},
1506                 &confirmSelectOk{},
1507         ); err != nil {
1508                 return err
1509         }
1510
1511         ch.confirmM.Lock()
1512         ch.confirming = true
1513         ch.confirmM.Unlock()
1514
1515         return nil
1516 }
1517
1518 /*
1519 Recover redelivers all unacknowledged deliveries on this channel.
1520
1521 When requeue is false, messages will be redelivered to the original consumer.
1522
1523 When requeue is true, messages will be redelivered to any available consumer,
1524 potentially including the original.
1525
1526 If the deliveries cannot be recovered, an error will be returned and the channel
1527 will be closed.
1528
1529 Note: this method is not implemented on RabbitMQ, use Delivery.Nack instead
1530 */
1531 func (ch *Channel) Recover(requeue bool) error {
1532         return ch.call(
1533                 &basicRecover{Requeue: requeue},
1534                 &basicRecoverOk{},
1535         )
1536 }
1537
1538 /*
1539 Ack acknowledges a delivery by its delivery tag when having been consumed with
1540 Channel.Consume or Channel.Get.
1541
1542 Ack acknowledges all message received prior to the delivery tag when multiple
1543 is true.
1544
1545 See also Delivery.Ack
1546 */
1547 func (ch *Channel) Ack(tag uint64, multiple bool) error {
1548         ch.m.Lock()
1549         defer ch.m.Unlock()
1550
1551         return ch.send(&basicAck{
1552                 DeliveryTag: tag,
1553                 Multiple:    multiple,
1554         })
1555 }
1556
1557 /*
1558 Nack negatively acknowledges a delivery by its delivery tag.  Prefer this
1559 method to notify the server that you were not able to process this delivery and
1560 it must be redelivered or dropped.
1561
1562 See also Delivery.Nack
1563 */
1564 func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error {
1565         ch.m.Lock()
1566         defer ch.m.Unlock()
1567
1568         return ch.send(&basicNack{
1569                 DeliveryTag: tag,
1570                 Multiple:    multiple,
1571                 Requeue:     requeue,
1572         })
1573 }
1574
1575 /*
1576 Reject negatively acknowledges a delivery by its delivery tag.  Prefer Nack
1577 over Reject when communicating with a RabbitMQ server because you can Nack
1578 multiple messages, reducing the amount of protocol messages to exchange.
1579
1580 See also Delivery.Reject
1581 */
1582 func (ch *Channel) Reject(tag uint64, requeue bool) error {
1583         ch.m.Lock()
1584         defer ch.m.Unlock()
1585
1586         return ch.send(&basicReject{
1587                 DeliveryTag: tag,
1588                 Requeue:     requeue,
1589         })
1590 }