1 // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 // Source code and contact info at http://github.com/streadway/amqp
22 maxChannelMax = (2 << 15) - 1
24 defaultHeartbeat = 10 * time.Second
25 defaultConnectionTimeout = 30 * time.Second
26 defaultProduct = "https://github.com/streadway/amqp"
28 // Safer default that makes channel leaks a lot easier to spot
29 // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593.
30 defaultChannelMax = (2 << 10) - 1
31 defaultLocale = "en_US"
34 // Config is used in DialConfig and Open to specify the desired tuning
35 // parameters used during a connection open handshake. The negotiated tuning
36 // will be stored in the returned connection's Config field.
38 // The SASL mechanisms to try in the client request, and the successful
39 // mechanism used on the Connection object.
40 // If SASL is nil, PlainAuth from the URL is used.
43 // Vhost specifies the namespace of permissions, exchanges, queues and
44 // bindings on the server. Dial sets this to the path parsed from the URL.
47 ChannelMax int // 0 max channels means 2^16 - 1
48 FrameSize int // 0 max bytes means unlimited
49 Heartbeat time.Duration // less than 1s uses the server's interval
51 // TLSClientConfig specifies the client configuration of the TLS connection
52 // when establishing a tls transport.
53 // If the URL uses an amqps scheme, then an empty tls.Config with the
54 // ServerName from the URL is used.
55 TLSClientConfig *tls.Config
57 // Properties is table of properties that the client advertises to the server.
58 // This is an optional setting - if the application does not set this,
59 // the underlying library will use a generic set of client properties.
62 // Connection locale that we expect to always be en_US
63 // Even though servers must return it as per the AMQP 0-9-1 spec,
64 // we are not aware of it being used other than to satisfy the spec requirements
67 // Dial returns a net.Conn prepared for a TLS handshake with TSLClientConfig,
68 // then an AMQP connection handshake.
69 // If Dial is nil, net.DialTimeout with a 30s connection and 30s deadline is
70 // used during TLS and AMQP handshaking.
71 Dial func(network, addr string) (net.Conn, error)
74 // Connection manages the serialization and deserialization of frames from IO
75 // and dispatches the frames to the appropriate channel. All RPC methods and
76 // asynchronous Publishing, Delivery, Ack, Nack and Return messages are
77 // multiplexed on this channel. There must always be active receivers for
78 // every asynchronous message on this connection.
79 type Connection struct {
80 destructor sync.Once // shutdown once
81 sendM sync.Mutex // conn writer mutex
82 m sync.Mutex // struct field mutex
84 conn io.ReadWriteCloser
88 sends chan time.Time // timestamps of each frame sent
89 deadlines chan readDeadliner // heartbeater updates read deadlines
91 allocator *allocator // id generator valid after openTune
92 channels map[uint16]*Channel
94 noNotify bool // true when we will never notify again
96 blocks []chan Blocking
100 Config Config // The negotiated Config after connection.open
102 Major int // Server's major version
103 Minor int // Server's minor version
104 Properties Table // Server properties
105 Locales []string // Server locales
107 closed int32 // Will be 1 if the connection is closed, 0 otherwise. Should only be accessed as atomic
110 type readDeadliner interface {
111 SetReadDeadline(time.Time) error
114 // DefaultDial establishes a connection when config.Dial is not provided
115 func DefaultDial(connectionTimeout time.Duration) func(network, addr string) (net.Conn, error) {
116 return func(network, addr string) (net.Conn, error) {
117 conn, err := net.DialTimeout(network, addr, connectionTimeout)
122 // Heartbeating hasn't started yet, don't stall forever on a dead server.
123 // A deadline is set for TLS and AMQP handshaking. After AMQP is established,
124 // the deadline is cleared in openComplete.
125 if err := conn.SetDeadline(time.Now().Add(connectionTimeout)); err != nil {
133 // Dial accepts a string in the AMQP URI format and returns a new Connection
134 // over TCP using PlainAuth. Defaults to a server heartbeat interval of 10
135 // seconds and sets the handshake deadline to 30 seconds. After handshake,
136 // deadlines are cleared.
138 // Dial uses the zero value of tls.Config when it encounters an amqps://
139 // scheme. It is equivalent to calling DialTLS(amqp, nil).
140 func Dial(url string) (*Connection, error) {
141 return DialConfig(url, Config{
142 Heartbeat: defaultHeartbeat,
143 Locale: defaultLocale,
147 // DialTLS accepts a string in the AMQP URI format and returns a new Connection
148 // over TCP using PlainAuth. Defaults to a server heartbeat interval of 10
149 // seconds and sets the initial read deadline to 30 seconds.
151 // DialTLS uses the provided tls.Config when encountering an amqps:// scheme.
152 func DialTLS(url string, amqps *tls.Config) (*Connection, error) {
153 return DialConfig(url, Config{
154 Heartbeat: defaultHeartbeat,
155 TLSClientConfig: amqps,
156 Locale: defaultLocale,
160 // DialConfig accepts a string in the AMQP URI format and a configuration for
161 // the transport and connection setup, returning a new Connection. Defaults to
162 // a server heartbeat interval of 10 seconds and sets the initial read deadline
164 func DialConfig(url string, config Config) (*Connection, error) {
168 uri, err := ParseURI(url)
173 if config.SASL == nil {
174 config.SASL = []Authentication{uri.PlainAuth()}
177 if config.Vhost == "" {
178 config.Vhost = uri.Vhost
181 addr := net.JoinHostPort(uri.Host, strconv.FormatInt(int64(uri.Port), 10))
183 dialer := config.Dial
185 dialer = DefaultDial(defaultConnectionTimeout)
188 conn, err = dialer("tcp", addr)
193 if uri.Scheme == "amqps" {
194 if config.TLSClientConfig == nil {
195 config.TLSClientConfig = new(tls.Config)
198 // If ServerName has not been specified in TLSClientConfig,
199 // set it to the URI host used for this connection.
200 if config.TLSClientConfig.ServerName == "" {
201 config.TLSClientConfig.ServerName = uri.Host
204 client := tls.Client(conn, config.TLSClientConfig)
205 if err := client.Handshake(); err != nil {
214 return Open(conn, config)
218 Open accepts an already established connection, or other io.ReadWriteCloser as
219 a transport. Use this method if you have established a TLS connection or wish
220 to use your own custom transport.
223 func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) {
226 writer: &writer{bufio.NewWriter(conn)},
227 channels: make(map[uint16]*Channel),
228 rpc: make(chan message),
229 sends: make(chan time.Time),
230 errors: make(chan *Error, 1),
231 deadlines: make(chan readDeadliner, 1),
234 return c, c.open(config)
238 LocalAddr returns the local TCP peer address, or ":0" (the zero value of net.TCPAddr)
239 as a fallback default value if the underlying transport does not support LocalAddr().
241 func (c *Connection) LocalAddr() net.Addr {
242 if conn, ok := c.conn.(interface {
245 return conn.LocalAddr()
247 return &net.TCPAddr{}
250 // ConnectionState returns basic TLS details of the underlying transport.
251 // Returns a zero value when the underlying connection does not implement
252 // ConnectionState() tls.ConnectionState.
253 func (c *Connection) ConnectionState() tls.ConnectionState {
254 if conn, ok := c.conn.(interface {
255 ConnectionState() tls.ConnectionState
257 return conn.ConnectionState()
259 return tls.ConnectionState{}
263 NotifyClose registers a listener for close events either initiated by an error
264 accompanying a connection.close method or by a normal shutdown.
266 On normal shutdowns, the chan will be closed.
268 To reconnect after a transport or protocol error, register a listener here and
269 re-run your setup process.
272 func (c *Connection) NotifyClose(receiver chan *Error) chan *Error {
279 c.closes = append(c.closes, receiver)
286 NotifyBlocked registers a listener for RabbitMQ specific TCP flow control
287 method extensions connection.blocked and connection.unblocked. Flow control is
288 active with a reason when Blocking.Blocked is true. When a Connection is
289 blocked, all methods will block across all connections until server resources
292 This optional extension is supported by the server when the
293 "connection.blocked" server capability key is true.
296 func (c *Connection) NotifyBlocked(receiver chan Blocking) chan Blocking {
303 c.blocks = append(c.blocks, receiver)
310 Close requests and waits for the response to close the AMQP connection.
312 It's advisable to use this message when publishing to ensure all kernel buffers
313 have been flushed on the server and client before exiting.
315 An error indicates that server may not have received this request to close but
316 the connection should be treated as closed regardless.
318 After returning from this call, all resources associated with this connection,
319 including the underlying io, Channels, Notify listeners and Channel consumers
322 func (c *Connection) Close() error {
327 defer c.shutdown(nil)
330 ReplyCode: replySuccess,
331 ReplyText: "kthxbai",
333 &connectionCloseOk{},
337 func (c *Connection) closeWith(err *Error) error {
342 defer c.shutdown(err)
345 ReplyCode: uint16(err.Code),
346 ReplyText: err.Reason,
348 &connectionCloseOk{},
352 // IsClosed returns true if the connection is marked as closed, otherwise false
354 func (c *Connection) IsClosed() bool {
355 return (atomic.LoadInt32(&c.closed) == 1)
358 func (c *Connection) send(f frame) error {
364 err := c.writer.WriteFrame(f)
368 // shutdown could be re-entrant from signaling notify chans
369 go c.shutdown(&Error{
374 // Broadcast we sent a frame, reducing heartbeats, only
375 // if there is something that can receive - like a non-reentrant
376 // call or if the heartbeater isn't running
378 case c.sends <- time.Now():
386 func (c *Connection) shutdown(err *Error) {
387 atomic.StoreInt32(&c.closed, 1)
389 c.destructor.Do(func() {
394 for _, c := range c.closes {
402 // Shutdown handler goroutine can still receive the result.
405 for _, c := range c.closes {
409 for _, c := range c.blocks {
413 // Shutdown the channel, but do not use closeChannel() as it calls
414 // releaseChannel() which requires the connection lock.
416 // Ranging over c.channels and calling releaseChannel() that mutates
417 // c.channels is racy - see commit 6063341 for an example.
418 for _, ch := range c.channels {
424 c.channels = map[uint16]*Channel{}
425 c.allocator = newAllocator(1, c.Config.ChannelMax)
430 // All methods sent to the connection channel should be synchronous so we
431 // can handle them directly without a framing component
432 func (c *Connection) demux(f frame) {
433 if f.channel() == 0 {
440 func (c *Connection) dispatch0(f frame) {
441 switch mf := f.(type) {
443 switch m := mf.Method.(type) {
444 case *connectionClose:
445 // Send immediately as shutdown will close our side of the writer.
448 Method: &connectionCloseOk{},
451 c.shutdown(newError(m.ReplyCode, m.ReplyText))
452 case *connectionBlocked:
453 for _, c := range c.blocks {
454 c <- Blocking{Active: true, Reason: m.Reason}
456 case *connectionUnblocked:
457 for _, c := range c.blocks {
458 c <- Blocking{Active: false}
463 case *heartbeatFrame:
464 // kthx - all reads reset our deadline. so we can drop this
466 // lolwat - channel0 only responds to methods and heartbeats
467 c.closeWith(ErrUnexpectedFrame)
471 func (c *Connection) dispatchN(f frame) {
473 channel := c.channels[f.channel()]
477 channel.recv(channel, f)
483 // section 2.3.7: "When a peer decides to close a channel or connection, it
484 // sends a Close method. The receiving peer MUST respond to a Close with a
485 // Close-Ok, and then both parties can close their channel or connection. Note
486 // that if peers ignore Close, deadlock can happen when both peers send Close
487 // at the same time."
489 // When we don't have a channel, so we must respond with close-ok on a close
490 // method. This can happen between a channel exception on an asynchronous
491 // method like basic.publish and a synchronous close with channel.close.
492 // In that case, we'll get both a channel.close and channel.close-ok in any
494 func (c *Connection) dispatchClosed(f frame) {
495 // Only consider method frames, drop content/header frames
496 if mf, ok := f.(*methodFrame); ok {
497 switch mf.Method.(type) {
500 ChannelId: f.channel(),
501 Method: &channelCloseOk{},
503 case *channelCloseOk:
504 // we are already closed, so do nothing
506 // unexpected method on closed channel
507 c.closeWith(ErrClosed)
512 // Reads each frame off the IO and hand off to the connection object that
513 // will demux the streams and dispatch to one of the opened channels or
514 // handle on channel 0 (the connection channel).
515 func (c *Connection) reader(r io.Reader) {
516 buf := bufio.NewReader(r)
517 frames := &reader{buf}
518 conn, haveDeadliner := r.(readDeadliner)
521 frame, err := frames.ReadFrame()
524 c.shutdown(&Error{Code: FrameError, Reason: err.Error()})
536 // Ensures that at least one frame is being sent at the tuned interval with a
537 // jitter tolerance of 1s
538 func (c *Connection) heartbeater(interval time.Duration, done chan *Error) {
539 const maxServerHeartbeatsInFlight = 3
541 var sendTicks <-chan time.Time
543 ticker := time.NewTicker(interval)
548 lastSent := time.Now()
552 case at, stillSending := <-c.sends:
553 // When actively sending, depend on sent frames to reset server timer
560 case at := <-sendTicks:
561 // When idle, fill the space with a heartbeat frame
562 if at.Sub(lastSent) > interval-time.Second {
563 if err := c.send(&heartbeatFrame{}); err != nil {
564 // send heartbeats even after close/closeOk so we
565 // tick until the connection starts erroring
570 case conn := <-c.deadlines:
571 // When reading, reset our side of the deadline, if we've negotiated one with
572 // a deadline that covers at least 2 server heartbeats
574 conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval))
583 // Convenience method to inspect the Connection.Properties["capabilities"]
584 // Table for server identified capabilities like "basic.ack" or
586 func (c *Connection) isCapable(featureName string) bool {
587 capabilities, _ := c.Properties["capabilities"].(Table)
588 hasFeature, _ := capabilities[featureName].(bool)
592 // allocateChannel records but does not open a new channel with a unique id.
593 // This method is the initial part of the channel lifecycle and paired with
595 func (c *Connection) allocateChannel() (*Channel, error) {
600 return nil, ErrClosed
603 id, ok := c.allocator.next()
605 return nil, ErrChannelMax
608 ch := newChannel(c, uint16(id))
609 c.channels[uint16(id)] = ch
614 // releaseChannel removes a channel from the registry as the final part of the
616 func (c *Connection) releaseChannel(id uint16) {
620 delete(c.channels, id)
621 c.allocator.release(int(id))
624 // openChannel allocates and opens a channel, must be paired with closeChannel
625 func (c *Connection) openChannel() (*Channel, error) {
626 ch, err := c.allocateChannel()
631 if err := ch.open(); err != nil {
632 c.releaseChannel(ch.id)
638 // closeChannel releases and initiates a shutdown of the channel. All channel
639 // closures should be initiated here for proper channel lifecycle management on
641 func (c *Connection) closeChannel(ch *Channel, e *Error) {
643 c.releaseChannel(ch.id)
647 Channel opens a unique, concurrent server channel to process the bulk of AMQP
648 messages. Any error from methods on this receiver will render the receiver
649 invalid and a new Channel should be opened.
652 func (c *Connection) Channel() (*Channel, error) {
653 return c.openChannel()
656 func (c *Connection) call(req message, res ...message) error {
657 // Special case for when the protocol header frame is sent insted of a
660 if err := c.send(&methodFrame{ChannelId: 0, Method: req}); err != nil {
666 case err, ok := <-c.errors:
673 // Try to match one of the result types
674 for _, try := range res {
675 if reflect.TypeOf(msg) == reflect.TypeOf(try) {
677 vres := reflect.ValueOf(try).Elem()
678 vmsg := reflect.ValueOf(msg).Elem()
683 return ErrCommandInvalid
688 // Connection = open-Connection *use-Connection close-Connection
689 // open-Connection = C:protocol-header
690 // S:START C:START-OK
694 // challenge = S:SECURE C:SECURE-OK
695 // use-Connection = *channel
696 // close-Connection = C:CLOSE S:CLOSE-OK
697 // / S:CLOSE C:CLOSE-OK
698 func (c *Connection) open(config Config) error {
699 if err := c.send(&protocolHeader{}); err != nil {
703 return c.openStart(config)
706 func (c *Connection) openStart(config Config) error {
707 start := &connectionStart{}
709 if err := c.call(nil, start); err != nil {
713 c.Major = int(start.VersionMajor)
714 c.Minor = int(start.VersionMinor)
715 c.Properties = Table(start.ServerProperties)
716 c.Locales = strings.Split(start.Locales, " ")
718 // eventually support challenge/response here by also responding to
720 auth, ok := pickSASLMechanism(config.SASL, strings.Split(start.Mechanisms, " "))
725 // Save this mechanism off as the one we chose
726 c.Config.SASL = []Authentication{auth}
728 // Set the connection locale to client locale
729 c.Config.Locale = config.Locale
731 return c.openTune(config, auth)
734 func (c *Connection) openTune(config Config, auth Authentication) error {
735 if len(config.Properties) == 0 {
736 config.Properties = Table{
737 "product": defaultProduct,
738 "version": defaultVersion,
742 config.Properties["capabilities"] = Table{
743 "connection.blocked": true,
744 "consumer_cancel_notify": true,
747 ok := &connectionStartOk{
748 ClientProperties: config.Properties,
749 Mechanism: auth.Mechanism(),
750 Response: auth.Response(),
751 Locale: config.Locale,
753 tune := &connectionTune{}
755 if err := c.call(ok, tune); err != nil {
756 // per spec, a connection can only be closed when it has been opened
757 // so at this point, we know it's an auth error, but the socket
758 // was closed instead. Return a meaningful error.
759 return ErrCredentials
762 // When the server and client both use default 0, then the max channel is
763 // only limited by uint16.
764 c.Config.ChannelMax = pick(config.ChannelMax, int(tune.ChannelMax))
765 if c.Config.ChannelMax == 0 {
766 c.Config.ChannelMax = defaultChannelMax
768 c.Config.ChannelMax = min(c.Config.ChannelMax, maxChannelMax)
770 // Frame size includes headers and end byte (len(payload)+8), even if
771 // this is less than FrameMinSize, use what the server sends because the
772 // alternative is to stop the handshake here.
773 c.Config.FrameSize = pick(config.FrameSize, int(tune.FrameMax))
775 // Save this off for resetDeadline()
776 c.Config.Heartbeat = time.Second * time.Duration(pick(
777 int(config.Heartbeat/time.Second),
778 int(tune.Heartbeat)))
780 // "The client should start sending heartbeats after receiving a
781 // Connection.Tune method"
782 go c.heartbeater(c.Config.Heartbeat, c.NotifyClose(make(chan *Error, 1)))
784 if err := c.send(&methodFrame{
786 Method: &connectionTuneOk{
787 ChannelMax: uint16(c.Config.ChannelMax),
788 FrameMax: uint32(c.Config.FrameSize),
789 Heartbeat: uint16(c.Config.Heartbeat / time.Second),
795 return c.openVhost(config)
798 func (c *Connection) openVhost(config Config) error {
799 req := &connectionOpen{VirtualHost: config.Vhost}
800 res := &connectionOpenOk{}
802 if err := c.call(req, res); err != nil {
803 // Cannot be closed yet, but we know it's a vhost problem
807 c.Config.Vhost = config.Vhost
809 return c.openComplete()
812 // openComplete performs any final Connection initialization dependent on the
813 // connection handshake and clears any state needed for TLS and AMQP handshaking.
814 func (c *Connection) openComplete() error {
815 // We clear the deadlines and let the heartbeater reset the read deadline if requested.
816 // RabbitMQ uses TCP flow control at this point for pushback so Writes can
817 // intentionally block.
818 if deadliner, ok := c.conn.(interface {
819 SetDeadline(time.Time) error
821 _ = deadliner.SetDeadline(time.Time{})
824 c.allocator = newAllocator(1, c.Config.ChannelMax)
828 func max(a, b int) int {
835 func min(a, b int) int {
842 func pick(client, server int) int {
843 if client == 0 || server == 0 {
844 return max(client, server)
846 return min(client, server)