1 // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 // Source code and contact info at http://github.com/streadway/amqp
14 // 0 1 3 7 size+7 size+8
15 // +------+---------+-------------+ +------------+ +-----------+
16 // | type | channel | size | | payload | | frame-end |
17 // +------+---------+-------------+ +------------+ +-----------+
18 // octet short long size octets octet
19 const frameHeaderSize = 1 + 2 + 4 + 1
22 Channel represents an AMQP channel. Used as a context for valid message
23 exchange. Errors on methods with this Channel as a receiver means this channel
24 should be discarded and a new channel established.
29 m sync.Mutex // struct field mutex
30 confirmM sync.Mutex // publisher confirms state mutex
33 connection *Connection
40 // closed is set to 1 when the channel has been closed - see Channel.send()
43 // true when we will never notify again
46 // Channel and Connection exceptions will be broadcast on these listeners.
49 // Listeners for active=true flow control. When true is sent to a listener,
50 // publishing should pause until false is sent to listeners.
53 // Listeners for returned publishings for unroutable messages on mandatory
54 // publishings or undeliverable messages on immediate publishings.
57 // Listeners for when the server notifies the client that
58 // a consumer has been cancelled.
61 // Allocated when in confirm mode in order to track publish counter and order confirms
65 // Selects on any errors from shutdown during RPC
68 // State machine that manages frame order, must only be mutated by the connection
69 recv func(*Channel, frame) error
71 // Current state for frame re-assembly, only mutated from recv
72 message messageWithContent
77 // Constructs a new channel with the given framing rules
78 func newChannel(c *Connection, id uint16) *Channel {
82 rpc: make(chan message),
83 consumers: makeConsumers(),
84 confirms: newConfirms(),
85 recv: (*Channel).recvMethod,
86 errors: make(chan *Error, 1),
90 // shutdown is called by Connection after the channel has been removed from the
91 // connection registry.
92 func (ch *Channel) shutdown(e *Error) {
93 ch.destructor.Do(func() {
97 // Grab an exclusive lock for the notify channels
99 defer ch.notifyM.Unlock()
101 // Broadcast abnormal shutdown
103 for _, c := range ch.closes {
108 // Signal that from now on, Channel.send() should call
109 // Channel.sendClosed()
110 atomic.StoreInt32(&ch.closed, 1)
112 // Notify RPC if we're selecting
119 for _, c := range ch.closes {
123 for _, c := range ch.flows {
127 for _, c := range ch.returns {
131 for _, c := range ch.cancels {
135 // Set the slices to nil to prevent the dispatch() range from sending on
136 // the now closed channels after we release the notifyM mutex
142 if ch.confirms != nil {
151 // send calls Channel.sendOpen() during normal operation.
153 // After the channel has been closed, send calls Channel.sendClosed(), ensuring
154 // only 'channel.close' is sent to the server.
155 func (ch *Channel) send(msg message) (err error) {
156 // If the channel is closed, use Channel.sendClosed()
157 if atomic.LoadInt32(&ch.closed) == 1 {
158 return ch.sendClosed(msg)
161 return ch.sendOpen(msg)
164 func (ch *Channel) open() error {
165 return ch.call(&channelOpen{}, &channelOpenOk{})
168 // Performs a request/response call for when the message is not NoWait and is
169 // specified as Synchronous.
170 func (ch *Channel) call(req message, res ...message) error {
171 if err := ch.send(req); err != nil {
177 case e, ok := <-ch.errors:
183 case msg := <-ch.rpc:
185 for _, try := range res {
186 if reflect.TypeOf(msg) == reflect.TypeOf(try) {
188 vres := reflect.ValueOf(try).Elem()
189 vmsg := reflect.ValueOf(msg).Elem()
194 return ErrCommandInvalid
196 // RPC channel has been closed without an error, likely due to a hard
197 // error on the Connection. This indicates we have already been
198 // shutdown and if were waiting, will have returned from the errors chan.
206 func (ch *Channel) sendClosed(msg message) (err error) {
207 // After a 'channel.close' is sent or received the only valid response is
209 if _, ok := msg.(*channelCloseOk); ok {
210 return ch.connection.send(&methodFrame{
219 func (ch *Channel) sendOpen(msg message) (err error) {
220 if content, ok := msg.(messageWithContent); ok {
221 props, body := content.getContent()
222 class, _ := content.id()
224 // catch client max frame size==0 and server max frame size==0
225 // set size to length of what we're trying to publish
227 if ch.connection.Config.FrameSize > 0 {
228 size = ch.connection.Config.FrameSize - frameHeaderSize
233 if err = ch.connection.send(&methodFrame{
240 if err = ch.connection.send(&headerFrame{
243 Size: uint64(len(body)),
249 // chunk body into size (max frame size - frame header size)
250 for i, j := 0, size; i < len(body); i, j = j, j+size {
255 if err = ch.connection.send(&bodyFrame{
263 err = ch.connection.send(&methodFrame{
272 // Eventually called via the state machine from the connection's reader
273 // goroutine, so assumes serialized access.
274 func (ch *Channel) dispatch(msg message) {
275 switch m := msg.(type) {
277 // lock before sending connection.close-ok
278 // to avoid unexpected interleaving with basic.publish frames if
279 // publishing is happening concurrently
281 ch.send(&channelCloseOk{})
283 ch.connection.closeChannel(ch, newError(m.ReplyCode, m.ReplyText))
287 for _, c := range ch.flows {
291 ch.send(&channelFlowOk{Active: m.Active})
295 for _, c := range ch.cancels {
299 ch.consumers.cancel(m.ConsumerTag)
304 for _, c := range ch.returns {
312 ch.confirms.Multiple(Confirmation{m.DeliveryTag, true})
314 ch.confirms.One(Confirmation{m.DeliveryTag, true})
321 ch.confirms.Multiple(Confirmation{m.DeliveryTag, false})
323 ch.confirms.One(Confirmation{m.DeliveryTag, false})
328 ch.consumers.send(m.ConsumerTag, newDelivery(ch, m))
329 // TODO log failed consumer and close channel, this can happen when
330 // deliveries are in flight and a no-wait cancel has happened
337 func (ch *Channel) transition(f func(*Channel, frame) error) error {
342 func (ch *Channel) recvMethod(f frame) error {
343 switch frame := f.(type) {
345 if msg, ok := frame.Method.(messageWithContent); ok {
346 ch.body = make([]byte, 0)
348 return ch.transition((*Channel).recvHeader)
351 ch.dispatch(frame.Method) // termination state
352 return ch.transition((*Channel).recvMethod)
356 return ch.transition((*Channel).recvMethod)
360 return ch.transition((*Channel).recvMethod)
363 panic("unexpected frame type")
366 func (ch *Channel) recvHeader(f frame) error {
367 switch frame := f.(type) {
369 // interrupt content and handle method
370 return ch.recvMethod(f)
373 // start collecting if we expect body frames
377 ch.message.setContent(ch.header.Properties, ch.body)
378 ch.dispatch(ch.message) // termination state
379 return ch.transition((*Channel).recvMethod)
381 return ch.transition((*Channel).recvContent)
385 return ch.transition((*Channel).recvMethod)
388 panic("unexpected frame type")
391 // state after method + header and before the length
392 // defined by the header has been reached
393 func (ch *Channel) recvContent(f frame) error {
394 switch frame := f.(type) {
396 // interrupt content and handle method
397 return ch.recvMethod(f)
401 return ch.transition((*Channel).recvMethod)
404 ch.body = append(ch.body, frame.Body...)
406 if uint64(len(ch.body)) >= ch.header.Size {
407 ch.message.setContent(ch.header.Properties, ch.body)
408 ch.dispatch(ch.message) // termination state
409 return ch.transition((*Channel).recvMethod)
412 return ch.transition((*Channel).recvContent)
415 panic("unexpected frame type")
419 Close initiate a clean channel closure by sending a close message with the error
422 It is safe to call this method multiple times.
425 func (ch *Channel) Close() error {
426 defer ch.connection.closeChannel(ch, nil)
428 &channelClose{ReplyCode: replySuccess},
434 NotifyClose registers a listener for when the server sends a channel or
435 connection exception in the form of a Connection.Close or Channel.Close method.
436 Connection exceptions will be broadcast to all open channels and all channels
437 will be closed, where channel exceptions will only be broadcast to listeners to
440 The chan provided will be closed when the Channel is closed and on a
441 graceful close, no error will be sent.
444 func (ch *Channel) NotifyClose(c chan *Error) chan *Error {
446 defer ch.notifyM.Unlock()
451 ch.closes = append(ch.closes, c)
458 NotifyFlow registers a listener for basic.flow methods sent by the server.
459 When `false` is sent on one of the listener channels, all publishers should
460 pause until a `true` is sent.
462 The server may ask the producer to pause or restart the flow of Publishings
463 sent by on a channel. This is a simple flow-control mechanism that a server can
464 use to avoid overflowing its queues or otherwise finding itself receiving more
465 messages than it can process. Note that this method is not intended for window
466 control. It does not affect contents returned by basic.get-ok methods.
468 When a new channel is opened, it is active (flow is active). Some
469 applications assume that channels are inactive until started. To emulate
470 this behavior a client MAY open the channel, then pause it.
472 Publishers should respond to a flow messages as rapidly as possible and the
473 server may disconnect over producing channels that do not respect these
476 basic.flow-ok methods will always be returned to the server regardless of
477 the number of listeners there are.
479 To control the flow of deliveries from the server, use the Channel.Flow()
482 Note: RabbitMQ will rather use TCP pushback on the network connection instead
483 of sending basic.flow. This means that if a single channel is producing too
484 much on the same connection, all channels using that connection will suffer,
485 including acknowledgments from deliveries. Use different Connections if you
486 desire to interleave consumers and producers in the same process to avoid your
487 basic.ack messages from getting rate limited with your basic.publish messages.
490 func (ch *Channel) NotifyFlow(c chan bool) chan bool {
492 defer ch.notifyM.Unlock()
497 ch.flows = append(ch.flows, c)
504 NotifyReturn registers a listener for basic.return methods. These can be sent
505 from the server when a publish is undeliverable either from the mandatory or
508 A return struct has a copy of the Publishing along with some error
509 information about why the publishing failed.
512 func (ch *Channel) NotifyReturn(c chan Return) chan Return {
514 defer ch.notifyM.Unlock()
519 ch.returns = append(ch.returns, c)
526 NotifyCancel registers a listener for basic.cancel methods. These can be sent
527 from the server when a queue is deleted or when consuming from a mirrored queue
528 where the master has just failed (and was moved to another node).
530 The subscription tag is returned to the listener.
533 func (ch *Channel) NotifyCancel(c chan string) chan string {
535 defer ch.notifyM.Unlock()
540 ch.cancels = append(ch.cancels, c)
547 NotifyConfirm calls NotifyPublish and starts a goroutine sending
548 ordered Ack and Nack DeliveryTag to the respective channels.
550 For strict ordering, use NotifyPublish instead.
552 func (ch *Channel) NotifyConfirm(ack, nack chan uint64) (chan uint64, chan uint64) {
553 confirms := ch.NotifyPublish(make(chan Confirmation, len(ack)+len(nack)))
556 for c := range confirms {
560 nack <- c.DeliveryTag
573 NotifyPublish registers a listener for reliable publishing. Receives from this
574 chan for every publish after Channel.Confirm will be in order starting with
577 There will be one and only one Confirmation Publishing starting with the
578 delivery tag of 1 and progressing sequentially until the total number of
579 Publishings have been seen by the server.
581 Acknowledgments will be received in the order of delivery from the
582 NotifyPublish channels even if the server acknowledges them out of order.
584 The listener chan will be closed when the Channel is closed.
586 The capacity of the chan Confirmation must be at least as large as the
587 number of outstanding publishings. Not having enough buffered chans will
588 create a deadlock if you attempt to perform other operations on the Connection
589 or Channel while confirms are in-flight.
591 It's advisable to wait for all Confirmations to arrive before calling
592 Channel.Close() or Connection.Close().
595 func (ch *Channel) NotifyPublish(confirm chan Confirmation) chan Confirmation {
597 defer ch.notifyM.Unlock()
602 ch.confirms.Listen(confirm)
610 Qos controls how many messages or how many bytes the server will try to keep on
611 the network for consumers before receiving delivery acks. The intent of Qos is
612 to make sure the network buffers stay full between the server and client.
614 With a prefetch count greater than zero, the server will deliver that many
615 messages to consumers before acknowledgments are received. The server ignores
616 this option when consumers are started with noAck because no acknowledgments
617 are expected or sent.
619 With a prefetch size greater than zero, the server will try to keep at least
620 that many bytes of deliveries flushed to the network before receiving
621 acknowledgments from the consumers. This option is ignored when consumers are
624 When global is true, these Qos settings apply to all existing and future
625 consumers on all channels on the same connection. When false, the Channel.Qos
626 settings will apply to all existing and future consumers on this channel.
628 Please see the RabbitMQ Consumer Prefetch documentation for an explanation of
629 how the global flag is implemented in RabbitMQ, as it differs from the
630 AMQP 0.9.1 specification in that global Qos settings are limited in scope to
631 channels, not connections (https://www.rabbitmq.com/consumer-prefetch.html).
633 To get round-robin behavior between consumers consuming from the same queue on
634 different connections, set the prefetch count to 1, and the next available
635 message on the server will be delivered to the next available consumer.
637 If your consumer work time is reasonably consistent and not much greater
638 than two times your network round trip time, you will see significant
639 throughput improvements starting with a prefetch count of 2 or slightly
640 greater as described by benchmarks on RabbitMQ.
642 http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/
644 func (ch *Channel) Qos(prefetchCount, prefetchSize int, global bool) error {
647 PrefetchCount: uint16(prefetchCount),
648 PrefetchSize: uint32(prefetchSize),
656 Cancel stops deliveries to the consumer chan established in Channel.Consume and
657 identified by consumer.
659 Only use this method to cleanly stop receiving deliveries from the server and
660 cleanly shut down the consumer chan identified by this tag. Using this method
661 and waiting for remaining messages to flush from the consumer chan will ensure
662 all messages received on the network will be delivered to the receiver of your
665 Continue consuming from the chan Delivery provided by Channel.Consume until the
668 When noWait is true, do not wait for the server to acknowledge the cancel.
669 Only use this when you are certain there are no deliveries in flight that
670 require an acknowledgment, otherwise they will arrive and be dropped in the
671 client without an ack, and will not be redelivered to other consumers.
674 func (ch *Channel) Cancel(consumer string, noWait bool) error {
676 ConsumerTag: consumer,
679 res := &basicCancelOk{}
681 if err := ch.call(req, res); err != nil {
686 ch.consumers.cancel(res.ConsumerTag)
688 // Potentially could drop deliveries in flight
689 ch.consumers.cancel(consumer)
696 QueueDeclare declares a queue to hold messages and deliver to consumers.
697 Declaring creates a queue if it doesn't already exist, or ensures that an
698 existing queue matches the same parameters.
700 Every queue declared gets a default binding to the empty exchange "" which has
701 the type "direct" with the routing key matching the queue's name. With this
702 default binding, it is possible to publish messages that route directly to
703 this queue by publishing to "" with the routing key of the queue name.
705 QueueDeclare("alerts", true, false, false, false, nil)
706 Publish("", "alerts", false, false, Publishing{Body: []byte("...")})
708 Delivery Exchange Key Queue
709 -----------------------------------------------
710 key: alerts -> "" -> alerts -> alerts
712 The queue name may be empty, in which case the server will generate a unique name
713 which will be returned in the Name field of Queue struct.
715 Durable and Non-Auto-Deleted queues will survive server restarts and remain
716 when there are no remaining consumers or bindings. Persistent publishings will
717 be restored in this queue on server restart. These queues are only able to be
718 bound to durable exchanges.
720 Non-Durable and Auto-Deleted queues will not be redeclared on server restart
721 and will be deleted by the server after a short time when the last consumer is
722 canceled or the last consumer's channel is closed. Queues with this lifetime
723 can also be deleted normally with QueueDelete. These durable queues can only
724 be bound to non-durable exchanges.
726 Non-Durable and Non-Auto-Deleted queues will remain declared as long as the
727 server is running regardless of how many consumers. This lifetime is useful
728 for temporary topologies that may have long delays between consumer activity.
729 These queues can only be bound to non-durable exchanges.
731 Durable and Auto-Deleted queues will be restored on server restart, but without
732 active consumers will not survive and be removed. This Lifetime is unlikely
735 Exclusive queues are only accessible by the connection that declares them and
736 will be deleted when the connection closes. Channels on other connections
737 will receive an error when attempting to declare, bind, consume, purge or
738 delete a queue with the same name.
740 When noWait is true, the queue will assume to be declared on the server. A
741 channel exception will arrive if the conditions are met for existing queues
742 or attempting to modify an existing queue from a different connection.
744 When the error return value is not nil, you can assume the queue could not be
745 declared with these parameters, and the channel will be closed.
748 func (ch *Channel) QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error) {
749 if err := args.Validate(); err != nil {
753 req := &queueDeclare{
757 AutoDelete: autoDelete,
758 Exclusive: exclusive,
762 res := &queueDeclareOk{}
764 if err := ch.call(req, res); err != nil {
771 Messages: int(res.MessageCount),
772 Consumers: int(res.ConsumerCount),
776 return Queue{Name: name}, nil
781 QueueDeclarePassive is functionally and parametrically equivalent to
782 QueueDeclare, except that it sets the "passive" attribute to true. A passive
783 queue is assumed by RabbitMQ to already exist, and attempting to connect to a
784 non-existent queue will cause RabbitMQ to throw an exception. This function
785 can be used to test for the existence of a queue.
788 func (ch *Channel) QueueDeclarePassive(name string, durable, autoDelete, exclusive, noWait bool, args Table) (Queue, error) {
789 if err := args.Validate(); err != nil {
793 req := &queueDeclare{
797 AutoDelete: autoDelete,
798 Exclusive: exclusive,
802 res := &queueDeclareOk{}
804 if err := ch.call(req, res); err != nil {
811 Messages: int(res.MessageCount),
812 Consumers: int(res.ConsumerCount),
816 return Queue{Name: name}, nil
820 QueueInspect passively declares a queue by name to inspect the current message
821 count and consumer count.
823 Use this method to check how many messages ready for delivery reside in the queue,
824 how many consumers are receiving deliveries, and whether a queue by this
827 If the queue by this name exists, use Channel.QueueDeclare check if it is
828 declared with specific parameters.
830 If a queue by this name does not exist, an error will be returned and the
831 channel will be closed.
834 func (ch *Channel) QueueInspect(name string) (Queue, error) {
835 req := &queueDeclare{
839 res := &queueDeclareOk{}
841 err := ch.call(req, res)
845 Messages: int(res.MessageCount),
846 Consumers: int(res.ConsumerCount),
853 QueueBind binds an exchange to a queue so that publishings to the exchange will
854 be routed to the queue when the publishing routing key matches the binding
857 QueueBind("pagers", "alert", "log", false, nil)
858 QueueBind("emails", "info", "log", false, nil)
860 Delivery Exchange Key Queue
861 -----------------------------------------------
862 key: alert --> log ----> alert --> pagers
863 key: info ---> log ----> info ---> emails
864 key: debug --> log (none) (dropped)
866 If a binding with the same key and arguments already exists between the
867 exchange and queue, the attempt to rebind will be ignored and the existing
868 binding will be retained.
870 In the case that multiple bindings may cause the message to be routed to the
871 same queue, the server will only route the publishing once. This is possible
872 with topic exchanges.
874 QueueBind("pagers", "alert", "amq.topic", false, nil)
875 QueueBind("emails", "info", "amq.topic", false, nil)
876 QueueBind("emails", "#", "amq.topic", false, nil) // match everything
878 Delivery Exchange Key Queue
879 -----------------------------------------------
880 key: alert --> amq.topic ----> alert --> pagers
881 key: info ---> amq.topic ----> # ------> emails
883 key: debug --> amq.topic ----> # ------> emails
885 It is only possible to bind a durable queue to a durable exchange regardless of
886 whether the queue or exchange is auto-deleted. Bindings between durable queues
887 and exchanges will also be restored on server restart.
889 If the binding could not complete, an error will be returned and the channel
892 When noWait is false and the queue could not be bound, the channel will be
893 closed with an error.
896 func (ch *Channel) QueueBind(name, key, exchange string, noWait bool, args Table) error {
897 if err := args.Validate(); err != nil {
914 QueueUnbind removes a binding between an exchange and queue matching the key and
917 It is possible to send and empty string for the exchange name which means to
918 unbind the queue from the default exchange.
921 func (ch *Channel) QueueUnbind(name, key, exchange string, args Table) error {
922 if err := args.Validate(); err != nil {
938 QueuePurge removes all messages from the named queue which are not waiting to
939 be acknowledged. Messages that have been delivered but have not yet been
940 acknowledged will not be removed.
942 When successful, returns the number of messages purged.
944 If noWait is true, do not wait for the server response and the number of
945 messages purged will not be meaningful.
947 func (ch *Channel) QueuePurge(name string, noWait bool) (int, error) {
952 res := &queuePurgeOk{}
954 err := ch.call(req, res)
956 return int(res.MessageCount), err
960 QueueDelete removes the queue from the server including all bindings then
961 purges the messages based on server configuration, returning the number of
964 When ifUnused is true, the queue will not be deleted if there are any
965 consumers on the queue. If there are consumers, an error will be returned and
966 the channel will be closed.
968 When ifEmpty is true, the queue will not be deleted if there are any messages
969 remaining on the queue. If there are messages, an error will be returned and
970 the channel will be closed.
972 When noWait is true, the queue will be deleted without waiting for a response
973 from the server. The purged message count will not be meaningful. If the queue
974 could not be deleted, a channel exception will be raised and the channel will
978 func (ch *Channel) QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error) {
985 res := &queueDeleteOk{}
987 err := ch.call(req, res)
989 return int(res.MessageCount), err
993 Consume immediately starts delivering queued messages.
995 Begin receiving on the returned chan Delivery before any other operation on the
996 Connection or Channel.
998 Continues deliveries to the returned chan Delivery until Channel.Cancel,
999 Connection.Close, Channel.Close, or an AMQP exception occurs. Consumers must
1000 range over the chan to ensure all deliveries are received. Unreceived
1001 deliveries will block all methods on the same connection.
1003 All deliveries in AMQP must be acknowledged. It is expected of the consumer to
1004 call Delivery.Ack after it has successfully processed the delivery. If the
1005 consumer is cancelled or the channel or connection is closed any unacknowledged
1006 deliveries will be requeued at the end of the same queue.
1008 The consumer is identified by a string that is unique and scoped for all
1009 consumers on this channel. If you wish to eventually cancel the consumer, use
1010 the same non-empty identifier in Channel.Cancel. An empty string will cause
1011 the library to generate a unique identity. The consumer identity will be
1012 included in every Delivery in the ConsumerTag field
1014 When autoAck (also known as noAck) is true, the server will acknowledge
1015 deliveries to this consumer prior to writing the delivery to the network. When
1016 autoAck is true, the consumer should not call Delivery.Ack. Automatically
1017 acknowledging deliveries means that some deliveries may get lost if the
1018 consumer is unable to process them after the server delivers them.
1019 See http://www.rabbitmq.com/confirms.html for more details.
1021 When exclusive is true, the server will ensure that this is the sole consumer
1022 from this queue. When exclusive is false, the server will fairly distribute
1023 deliveries across multiple consumers.
1025 The noLocal flag is not supported by RabbitMQ.
1027 It's advisable to use separate connections for
1028 Channel.Publish and Channel.Consume so not to have TCP pushback on publishing
1029 affect the ability to consume messages, so this parameter is here mostly for
1032 When noWait is true, do not wait for the server to confirm the request and
1033 immediately begin deliveries. If it is not possible to consume, a channel
1034 exception will be raised and the channel will be closed.
1036 Optional arguments can be provided that have specific semantics for the queue
1039 Inflight messages, limited by Channel.Qos will be buffered until received from
1042 When the Channel or Connection is closed, all buffered and inflight messages will
1045 When the consumer tag is cancelled, all inflight messages will be delivered until
1046 the returned chan is closed.
1049 func (ch *Channel) Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args Table) (<-chan Delivery, error) {
1050 // When we return from ch.call, there may be a delivery already for the
1051 // consumer that hasn't been added to the consumer hash yet. Because of
1052 // this, we never rely on the server picking a consumer tag for us.
1054 if err := args.Validate(); err != nil {
1059 consumer = uniqueConsumerTag()
1062 req := &basicConsume{
1064 ConsumerTag: consumer,
1067 Exclusive: exclusive,
1071 res := &basicConsumeOk{}
1073 deliveries := make(chan Delivery)
1075 ch.consumers.add(consumer, deliveries)
1077 if err := ch.call(req, res); err != nil {
1078 ch.consumers.cancel(consumer)
1082 return (<-chan Delivery)(deliveries), nil
1086 ExchangeDeclare declares an exchange on the server. If the exchange does not
1087 already exist, the server will create it. If the exchange exists, the server
1088 verifies that it is of the provided type, durability and auto-delete flags.
1090 Errors returned from this method will close the channel.
1092 Exchange names starting with "amq." are reserved for pre-declared and
1093 standardized exchanges. The client MAY declare an exchange starting with
1094 "amq." if the passive option is set, or the exchange already exists. Names can
1095 consist of a non-empty sequence of letters, digits, hyphen, underscore,
1098 Each exchange belongs to one of a set of exchange kinds/types implemented by
1099 the server. The exchange types define the functionality of the exchange - i.e.
1100 how messages are routed through it. Once an exchange is declared, its type
1101 cannot be changed. The common types are "direct", "fanout", "topic" and
1104 Durable and Non-Auto-Deleted exchanges will survive server restarts and remain
1105 declared when there are no remaining bindings. This is the best lifetime for
1106 long-lived exchange configurations like stable routes and default exchanges.
1108 Non-Durable and Auto-Deleted exchanges will be deleted when there are no
1109 remaining bindings and not restored on server restart. This lifetime is
1110 useful for temporary topologies that should not pollute the virtual host on
1111 failure or after the consumers have completed.
1113 Non-Durable and Non-Auto-deleted exchanges will remain as long as the server is
1114 running including when there are no remaining bindings. This is useful for
1115 temporary topologies that may have long delays between bindings.
1117 Durable and Auto-Deleted exchanges will survive server restarts and will be
1118 removed before and after server restarts when there are no remaining bindings.
1119 These exchanges are useful for robust temporary topologies or when you require
1120 binding durable queues to auto-deleted exchanges.
1122 Note: RabbitMQ declares the default exchange types like 'amq.fanout' as
1123 durable, so queues that bind to these pre-declared exchanges must also be
1126 Exchanges declared as `internal` do not accept accept publishings. Internal
1127 exchanges are useful when you wish to implement inter-exchange topologies
1128 that should not be exposed to users of the broker.
1130 When noWait is true, declare without waiting for a confirmation from the server.
1131 The channel may be closed as a result of an error. Add a NotifyClose listener
1132 to respond to any exceptions.
1134 Optional amqp.Table of arguments that are specific to the server's implementation of
1135 the exchange can be sent for exchange types that require extra parameters.
1137 func (ch *Channel) ExchangeDeclare(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error {
1138 if err := args.Validate(); err != nil {
1148 AutoDelete: autoDelete,
1153 &exchangeDeclareOk{},
1159 ExchangeDeclarePassive is functionally and parametrically equivalent to
1160 ExchangeDeclare, except that it sets the "passive" attribute to true. A passive
1161 exchange is assumed by RabbitMQ to already exist, and attempting to connect to a
1162 non-existent exchange will cause RabbitMQ to throw an exception. This function
1163 can be used to detect the existence of an exchange.
1166 func (ch *Channel) ExchangeDeclarePassive(name, kind string, durable, autoDelete, internal, noWait bool, args Table) error {
1167 if err := args.Validate(); err != nil {
1177 AutoDelete: autoDelete,
1182 &exchangeDeclareOk{},
1187 ExchangeDelete removes the named exchange from the server. When an exchange is
1188 deleted all queue bindings on the exchange are also deleted. If this exchange
1189 does not exist, the channel will be closed with an error.
1191 When ifUnused is true, the server will only delete the exchange if it has no queue
1192 bindings. If the exchange has queue bindings the server does not delete it
1193 but close the channel with an exception instead. Set this to true if you are
1194 not the sole owner of the exchange.
1196 When noWait is true, do not wait for a server confirmation that the exchange has
1197 been deleted. Failing to delete the channel could close the channel. Add a
1198 NotifyClose listener to respond to these channel exceptions.
1200 func (ch *Channel) ExchangeDelete(name string, ifUnused, noWait bool) error {
1207 &exchangeDeleteOk{},
1212 ExchangeBind binds an exchange to another exchange to create inter-exchange
1213 routing topologies on the server. This can decouple the private topology and
1214 routing exchanges from exchanges intended solely for publishing endpoints.
1216 Binding two exchanges with identical arguments will not create duplicate
1219 Binding one exchange to another with multiple bindings will only deliver a
1220 message once. For example if you bind your exchange to `amq.fanout` with two
1221 different binding keys, only a single message will be delivered to your
1222 exchange even though multiple bindings will match.
1224 Given a message delivered to the source exchange, the message will be forwarded
1225 to the destination exchange when the routing key is matched.
1227 ExchangeBind("sell", "MSFT", "trade", false, nil)
1228 ExchangeBind("buy", "AAPL", "trade", false, nil)
1230 Delivery Source Key Destination
1231 example exchange exchange
1232 -----------------------------------------------
1233 key: AAPL --> trade ----> MSFT sell
1236 When noWait is true, do not wait for the server to confirm the binding. If any
1237 error occurs the channel will be closed. Add a listener to NotifyClose to
1238 handle these errors.
1240 Optional arguments specific to the exchanges bound can also be specified.
1242 func (ch *Channel) ExchangeBind(destination, key, source string, noWait bool, args Table) error {
1243 if err := args.Validate(); err != nil {
1249 Destination: destination,
1260 ExchangeUnbind unbinds the destination exchange from the source exchange on the
1261 server by removing the routing key between them. This is the inverse of
1262 ExchangeBind. If the binding does not currently exist, an error will be
1265 When noWait is true, do not wait for the server to confirm the deletion of the
1266 binding. If any error occurs the channel will be closed. Add a listener to
1267 NotifyClose to handle these errors.
1269 Optional arguments that are specific to the type of exchanges bound can also be
1270 provided. These must match the same arguments specified in ExchangeBind to
1271 identify the binding.
1273 func (ch *Channel) ExchangeUnbind(destination, key, source string, noWait bool, args Table) error {
1274 if err := args.Validate(); err != nil {
1280 Destination: destination,
1286 &exchangeUnbindOk{},
1291 Publish sends a Publishing from the client to an exchange on the server.
1293 When you want a single message to be delivered to a single queue, you can
1294 publish to the default exchange with the routingKey of the queue name. This is
1295 because every declared queue gets an implicit route to the default exchange.
1297 Since publishings are asynchronous, any undeliverable message will get returned
1298 by the server. Add a listener with Channel.NotifyReturn to handle any
1299 undeliverable message when calling publish with either the mandatory or
1300 immediate parameters as true.
1302 Publishings can be undeliverable when the mandatory flag is true and no queue is
1303 bound that matches the routing key, or when the immediate flag is true and no
1304 consumer on the matched queue is ready to accept the delivery.
1306 This can return an error when the channel, connection or socket is closed. The
1307 error or lack of an error does not indicate whether the server has received this
1310 It is possible for publishing to not reach the broker if the underlying socket
1311 is shut down without pending publishing packets being flushed from the kernel
1312 buffers. The easy way of making it probable that all publishings reach the
1313 server is to always call Connection.Close before terminating your publishing
1314 application. The way to ensure that all publishings reach the server is to add
1315 a listener to Channel.NotifyPublish and put the channel in confirm mode with
1316 Channel.Confirm. Publishing delivery tags and their corresponding
1317 confirmations start at 1. Exit when all publishings are confirmed.
1319 When Publish does not return an error and the channel is in confirm mode, the
1320 internal counter for DeliveryTags with the first confirmation starts at 1.
1323 func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg Publishing) error {
1324 if err := msg.Headers.Validate(); err != nil {
1331 if err := ch.send(&basicPublish{
1334 Mandatory: mandatory,
1335 Immediate: immediate,
1337 Properties: properties{
1338 Headers: msg.Headers,
1339 ContentType: msg.ContentType,
1340 ContentEncoding: msg.ContentEncoding,
1341 DeliveryMode: msg.DeliveryMode,
1342 Priority: msg.Priority,
1343 CorrelationId: msg.CorrelationId,
1344 ReplyTo: msg.ReplyTo,
1345 Expiration: msg.Expiration,
1346 MessageId: msg.MessageId,
1347 Timestamp: msg.Timestamp,
1357 ch.confirms.Publish()
1364 Get synchronously receives a single Delivery from the head of a queue from the
1365 server to the client. In almost all cases, using Channel.Consume will be
1368 If there was a delivery waiting on the queue and that delivery was received, the
1369 second return value will be true. If there was no delivery waiting or an error
1370 occurred, the ok bool will be false.
1372 All deliveries must be acknowledged including those from Channel.Get. Call
1373 Delivery.Ack on the returned delivery when you have fully processed this
1376 When autoAck is true, the server will automatically acknowledge this message so
1377 you don't have to. But if you are unable to fully process this message before
1378 the channel or connection is closed, the message will not get requeued.
1381 func (ch *Channel) Get(queue string, autoAck bool) (msg Delivery, ok bool, err error) {
1382 req := &basicGet{Queue: queue, NoAck: autoAck}
1383 res := &basicGetOk{}
1384 empty := &basicGetEmpty{}
1386 if err := ch.call(req, res, empty); err != nil {
1387 return Delivery{}, false, err
1390 if res.DeliveryTag > 0 {
1391 return *(newDelivery(ch, res)), true, nil
1394 return Delivery{}, false, nil
1398 Tx puts the channel into transaction mode on the server. All publishings and
1399 acknowledgments following this method will be atomically committed or rolled
1400 back for a single queue. Call either Channel.TxCommit or Channel.TxRollback to
1401 leave a this transaction and immediately start a new transaction.
1403 The atomicity across multiple queues is not defined as queue declarations and
1404 bindings are not included in the transaction.
1406 The behavior of publishings that are delivered as mandatory or immediate while
1407 the channel is in a transaction is not defined.
1409 Once a channel has been put into transaction mode, it cannot be taken out of
1410 transaction mode. Use a different channel for non-transactional semantics.
1413 func (ch *Channel) Tx() error {
1421 TxCommit atomically commits all publishings and acknowledgments for a single
1422 queue and immediately start a new transaction.
1424 Calling this method without having called Channel.Tx is an error.
1427 func (ch *Channel) TxCommit() error {
1435 TxRollback atomically rolls back all publishings and acknowledgments for a
1436 single queue and immediately start a new transaction.
1438 Calling this method without having called Channel.Tx is an error.
1441 func (ch *Channel) TxRollback() error {
1449 Flow pauses the delivery of messages to consumers on this channel. Channels
1450 are opened with flow control active, to open a channel with paused
1451 deliveries immediately call this method with `false` after calling
1454 When active is `false`, this method asks the server to temporarily pause deliveries
1455 until called again with active as `true`.
1457 Channel.Get methods will not be affected by flow control.
1459 This method is not intended to act as window control. Use Channel.Qos to limit
1460 the number of unacknowledged messages or bytes in flight instead.
1462 The server may also send us flow methods to throttle our publishings. A well
1463 behaving publishing client should add a listener with Channel.NotifyFlow and
1464 pause its publishings when `false` is sent on that channel.
1466 Note: RabbitMQ prefers to use TCP push back to control flow for all channels on
1467 a connection, so under high volume scenarios, it's wise to open separate
1468 Connections for publishings and deliveries.
1471 func (ch *Channel) Flow(active bool) error {
1473 &channelFlow{Active: active},
1479 Confirm puts this channel into confirm mode so that the client can ensure all
1480 publishings have successfully been received by the server. After entering this
1481 mode, the server will send a basic.ack or basic.nack message with the deliver
1482 tag set to a 1 based incremental index corresponding to every publishing
1483 received after the this method returns.
1485 Add a listener to Channel.NotifyPublish to respond to the Confirmations. If
1486 Channel.NotifyPublish is not called, the Confirmations will be silently
1489 The order of acknowledgments is not bound to the order of deliveries.
1491 Ack and Nack confirmations will arrive at some point in the future.
1493 Unroutable mandatory or immediate messages are acknowledged immediately after
1494 any Channel.NotifyReturn listeners have been notified. Other messages are
1495 acknowledged when all queues that should have the message routed to them have
1496 either received acknowledgment of delivery or have enqueued the message,
1497 persisting the message if necessary.
1499 When noWait is true, the client will not wait for a response. A channel
1500 exception could occur if the server does not support this method.
1503 func (ch *Channel) Confirm(noWait bool) error {
1505 &confirmSelect{Nowait: noWait},
1512 ch.confirming = true
1513 ch.confirmM.Unlock()
1519 Recover redelivers all unacknowledged deliveries on this channel.
1521 When requeue is false, messages will be redelivered to the original consumer.
1523 When requeue is true, messages will be redelivered to any available consumer,
1524 potentially including the original.
1526 If the deliveries cannot be recovered, an error will be returned and the channel
1529 Note: this method is not implemented on RabbitMQ, use Delivery.Nack instead
1531 func (ch *Channel) Recover(requeue bool) error {
1533 &basicRecover{Requeue: requeue},
1539 Ack acknowledges a delivery by its delivery tag when having been consumed with
1540 Channel.Consume or Channel.Get.
1542 Ack acknowledges all message received prior to the delivery tag when multiple
1545 See also Delivery.Ack
1547 func (ch *Channel) Ack(tag uint64, multiple bool) error {
1551 return ch.send(&basicAck{
1558 Nack negatively acknowledges a delivery by its delivery tag. Prefer this
1559 method to notify the server that you were not able to process this delivery and
1560 it must be redelivered or dropped.
1562 See also Delivery.Nack
1564 func (ch *Channel) Nack(tag uint64, multiple bool, requeue bool) error {
1568 return ch.send(&basicNack{
1576 Reject negatively acknowledges a delivery by its delivery tag. Prefer Nack
1577 over Reject when communicating with a RabbitMQ server because you can Nack
1578 multiple messages, reducing the amount of protocol messages to exchange.
1580 See also Delivery.Reject
1582 func (ch *Channel) Reject(tag uint64, requeue bool) error {
1586 return ch.send(&basicReject{