1 // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 // Source code and contact info at http://github.com/streadway/amqp
22 maxChannelMax = (2 << 15) - 1
24 defaultHeartbeat = 10 * time.Second
25 defaultConnectionTimeout = 30 * time.Second
26 defaultProduct = "https://github.com/streadway/amqp"
28 // Safer default that makes channel leaks a lot easier to spot
29 // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593.
30 defaultChannelMax = (2 << 10) - 1
31 defaultLocale = "en_US"
34 // Config is used in DialConfig and Open to specify the desired tuning
35 // parameters used during a connection open handshake. The negotiated tuning
36 // will be stored in the returned connection's Config field.
38 // The SASL mechanisms to try in the client request, and the successful
39 // mechanism used on the Connection object.
40 // If SASL is nil, PlainAuth from the URL is used.
43 // Vhost specifies the namespace of permissions, exchanges, queues and
44 // bindings on the server. Dial sets this to the path parsed from the URL.
47 ChannelMax int // 0 max channels means 2^16 - 1
48 FrameSize int // 0 max bytes means unlimited
49 Heartbeat time.Duration // less than 1s uses the server's interval
51 // TLSClientConfig specifies the client configuration of the TLS connection
52 // when establishing a tls transport.
53 // If the URL uses an amqps scheme, then an empty tls.Config with the
54 // ServerName from the URL is used.
55 TLSClientConfig *tls.Config
57 // Properties is table of properties that the client advertises to the server.
58 // This is an optional setting - if the application does not set this,
59 // the underlying library will use a generic set of client properties.
62 // Connection locale that we expect to always be en_US
63 // Even though servers must return it as per the AMQP 0-9-1 spec,
64 // we are not aware of it being used other than to satisfy the spec requirements
67 // Dial returns a net.Conn prepared for a TLS handshake with TSLClientConfig,
68 // then an AMQP connection handshake.
69 // If Dial is nil, net.DialTimeout with a 30s connection and 30s deadline is
70 // used during TLS and AMQP handshaking.
71 Dial func(network, addr string) (net.Conn, error)
74 // Connection manages the serialization and deserialization of frames from IO
75 // and dispatches the frames to the appropriate channel. All RPC methods and
76 // asynchronous Publishing, Delivery, Ack, Nack and Return messages are
77 // multiplexed on this channel. There must always be active receivers for
78 // every asynchronous message on this connection.
79 type Connection struct {
80 destructor sync.Once // shutdown once
81 sendM sync.Mutex // conn writer mutex
82 m sync.Mutex // struct field mutex
84 conn io.ReadWriteCloser
88 sends chan time.Time // timestamps of each frame sent
89 deadlines chan readDeadliner // heartbeater updates read deadlines
91 allocator *allocator // id generator valid after openTune
92 channels map[uint16]*Channel
94 noNotify bool // true when we will never notify again
96 blocks []chan Blocking
100 Config Config // The negotiated Config after connection.open
102 Major int // Server's major version
103 Minor int // Server's minor version
104 Properties Table // Server properties
105 Locales []string // Server locales
107 closed int32 // Will be 1 if the connection is closed, 0 otherwise. Should only be accessed as atomic
110 type readDeadliner interface {
111 SetReadDeadline(time.Time) error
114 // defaultDial establishes a connection when config.Dial is not provided
115 func defaultDial(network, addr string) (net.Conn, error) {
116 conn, err := net.DialTimeout(network, addr, defaultConnectionTimeout)
121 // Heartbeating hasn't started yet, don't stall forever on a dead server.
122 // A deadline is set for TLS and AMQP handshaking. After AMQP is established,
123 // the deadline is cleared in openComplete.
124 if err := conn.SetDeadline(time.Now().Add(defaultConnectionTimeout)); err != nil {
131 // Dial accepts a string in the AMQP URI format and returns a new Connection
132 // over TCP using PlainAuth. Defaults to a server heartbeat interval of 10
133 // seconds and sets the handshake deadline to 30 seconds. After handshake,
134 // deadlines are cleared.
136 // Dial uses the zero value of tls.Config when it encounters an amqps://
137 // scheme. It is equivalent to calling DialTLS(amqp, nil).
138 func Dial(url string) (*Connection, error) {
139 return DialConfig(url, Config{
140 Heartbeat: defaultHeartbeat,
141 Locale: defaultLocale,
145 // DialTLS accepts a string in the AMQP URI format and returns a new Connection
146 // over TCP using PlainAuth. Defaults to a server heartbeat interval of 10
147 // seconds and sets the initial read deadline to 30 seconds.
149 // DialTLS uses the provided tls.Config when encountering an amqps:// scheme.
150 func DialTLS(url string, amqps *tls.Config) (*Connection, error) {
151 return DialConfig(url, Config{
152 Heartbeat: defaultHeartbeat,
153 TLSClientConfig: amqps,
154 Locale: defaultLocale,
158 // DialConfig accepts a string in the AMQP URI format and a configuration for
159 // the transport and connection setup, returning a new Connection. Defaults to
160 // a server heartbeat interval of 10 seconds and sets the initial read deadline
162 func DialConfig(url string, config Config) (*Connection, error) {
166 uri, err := ParseURI(url)
171 if config.SASL == nil {
172 config.SASL = []Authentication{uri.PlainAuth()}
175 if config.Vhost == "" {
176 config.Vhost = uri.Vhost
179 addr := net.JoinHostPort(uri.Host, strconv.FormatInt(int64(uri.Port), 10))
181 dialer := config.Dial
186 conn, err = dialer("tcp", addr)
191 if uri.Scheme == "amqps" {
192 if config.TLSClientConfig == nil {
193 config.TLSClientConfig = new(tls.Config)
196 // If ServerName has not been specified in TLSClientConfig,
197 // set it to the URI host used for this connection.
198 if config.TLSClientConfig.ServerName == "" {
199 config.TLSClientConfig.ServerName = uri.Host
202 client := tls.Client(conn, config.TLSClientConfig)
203 if err := client.Handshake(); err != nil {
211 return Open(conn, config)
215 Open accepts an already established connection, or other io.ReadWriteCloser as
216 a transport. Use this method if you have established a TLS connection or wish
217 to use your own custom transport.
220 func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) {
223 writer: &writer{bufio.NewWriter(conn)},
224 channels: make(map[uint16]*Channel),
225 rpc: make(chan message),
226 sends: make(chan time.Time),
227 errors: make(chan *Error, 1),
228 deadlines: make(chan readDeadliner, 1),
231 return c, c.open(config)
235 LocalAddr returns the local TCP peer address, or ":0" (the zero value of net.TCPAddr)
236 as a fallback default value if the underlying transport does not support LocalAddr().
238 func (c *Connection) LocalAddr() net.Addr {
239 if conn, ok := c.conn.(interface {
242 return conn.LocalAddr()
244 return &net.TCPAddr{}
247 // ConnectionState returns basic TLS details of the underlying transport.
248 // Returns a zero value when the underlying connection does not implement
249 // ConnectionState() tls.ConnectionState.
250 func (c *Connection) ConnectionState() tls.ConnectionState {
251 if conn, ok := c.conn.(interface {
252 ConnectionState() tls.ConnectionState
254 return conn.ConnectionState()
256 return tls.ConnectionState{}
260 NotifyClose registers a listener for close events either initiated by an error
261 accompanying a connection.close method or by a normal shutdown.
263 On normal shutdowns, the chan will be closed.
265 To reconnect after a transport or protocol error, register a listener here and
266 re-run your setup process.
269 func (c *Connection) NotifyClose(receiver chan *Error) chan *Error {
276 c.closes = append(c.closes, receiver)
283 NotifyBlocked registers a listener for RabbitMQ specific TCP flow control
284 method extensions connection.blocked and connection.unblocked. Flow control is
285 active with a reason when Blocking.Blocked is true. When a Connection is
286 blocked, all methods will block across all connections until server resources
289 This optional extension is supported by the server when the
290 "connection.blocked" server capability key is true.
293 func (c *Connection) NotifyBlocked(receiver chan Blocking) chan Blocking {
300 c.blocks = append(c.blocks, receiver)
307 Close requests and waits for the response to close the AMQP connection.
309 It's advisable to use this message when publishing to ensure all kernel buffers
310 have been flushed on the server and client before exiting.
312 An error indicates that server may not have received this request to close but
313 the connection should be treated as closed regardless.
315 After returning from this call, all resources associated with this connection,
316 including the underlying io, Channels, Notify listeners and Channel consumers
319 func (c *Connection) Close() error {
324 defer c.shutdown(nil)
327 ReplyCode: replySuccess,
328 ReplyText: "kthxbai",
330 &connectionCloseOk{},
334 func (c *Connection) closeWith(err *Error) error {
339 defer c.shutdown(err)
342 ReplyCode: uint16(err.Code),
343 ReplyText: err.Reason,
345 &connectionCloseOk{},
349 func (c *Connection) isClosed() bool {
350 return (atomic.LoadInt32(&c.closed) == 1)
353 func (c *Connection) send(f frame) error {
359 err := c.writer.WriteFrame(f)
363 // shutdown could be re-entrant from signaling notify chans
364 go c.shutdown(&Error{
369 // Broadcast we sent a frame, reducing heartbeats, only
370 // if there is something that can receive - like a non-reentrant
371 // call or if the heartbeater isn't running
373 case c.sends <- time.Now():
381 func (c *Connection) shutdown(err *Error) {
382 atomic.StoreInt32(&c.closed, 1)
384 c.destructor.Do(func() {
389 for _, c := range c.closes {
397 // Shutdown handler goroutine can still receive the result.
400 for _, c := range c.closes {
404 for _, c := range c.blocks {
408 // Shutdown the channel, but do not use closeChannel() as it calls
409 // releaseChannel() which requires the connection lock.
411 // Ranging over c.channels and calling releaseChannel() that mutates
412 // c.channels is racy - see commit 6063341 for an example.
413 for _, ch := range c.channels {
419 c.channels = map[uint16]*Channel{}
420 c.allocator = newAllocator(1, c.Config.ChannelMax)
425 // All methods sent to the connection channel should be synchronous so we
426 // can handle them directly without a framing component
427 func (c *Connection) demux(f frame) {
428 if f.channel() == 0 {
435 func (c *Connection) dispatch0(f frame) {
436 switch mf := f.(type) {
438 switch m := mf.Method.(type) {
439 case *connectionClose:
440 // Send immediately as shutdown will close our side of the writer.
443 Method: &connectionCloseOk{},
446 c.shutdown(newError(m.ReplyCode, m.ReplyText))
447 case *connectionBlocked:
448 for _, c := range c.blocks {
449 c <- Blocking{Active: true, Reason: m.Reason}
451 case *connectionUnblocked:
452 for _, c := range c.blocks {
453 c <- Blocking{Active: false}
458 case *heartbeatFrame:
459 // kthx - all reads reset our deadline. so we can drop this
461 // lolwat - channel0 only responds to methods and heartbeats
462 c.closeWith(ErrUnexpectedFrame)
466 func (c *Connection) dispatchN(f frame) {
468 channel := c.channels[f.channel()]
472 channel.recv(channel, f)
478 // section 2.3.7: "When a peer decides to close a channel or connection, it
479 // sends a Close method. The receiving peer MUST respond to a Close with a
480 // Close-Ok, and then both parties can close their channel or connection. Note
481 // that if peers ignore Close, deadlock can happen when both peers send Close
482 // at the same time."
484 // When we don't have a channel, so we must respond with close-ok on a close
485 // method. This can happen between a channel exception on an asynchronous
486 // method like basic.publish and a synchronous close with channel.close.
487 // In that case, we'll get both a channel.close and channel.close-ok in any
489 func (c *Connection) dispatchClosed(f frame) {
490 // Only consider method frames, drop content/header frames
491 if mf, ok := f.(*methodFrame); ok {
492 switch mf.Method.(type) {
495 ChannelId: f.channel(),
496 Method: &channelCloseOk{},
498 case *channelCloseOk:
499 // we are already closed, so do nothing
501 // unexpected method on closed channel
502 c.closeWith(ErrClosed)
507 // Reads each frame off the IO and hand off to the connection object that
508 // will demux the streams and dispatch to one of the opened channels or
509 // handle on channel 0 (the connection channel).
510 func (c *Connection) reader(r io.Reader) {
511 buf := bufio.NewReader(r)
512 frames := &reader{buf}
513 conn, haveDeadliner := r.(readDeadliner)
516 frame, err := frames.ReadFrame()
519 c.shutdown(&Error{Code: FrameError, Reason: err.Error()})
531 // Ensures that at least one frame is being sent at the tuned interval with a
532 // jitter tolerance of 1s
533 func (c *Connection) heartbeater(interval time.Duration, done chan *Error) {
534 const maxServerHeartbeatsInFlight = 3
536 var sendTicks <-chan time.Time
538 ticker := time.NewTicker(interval)
543 lastSent := time.Now()
547 case at, stillSending := <-c.sends:
548 // When actively sending, depend on sent frames to reset server timer
555 case at := <-sendTicks:
556 // When idle, fill the space with a heartbeat frame
557 if at.Sub(lastSent) > interval-time.Second {
558 if err := c.send(&heartbeatFrame{}); err != nil {
559 // send heartbeats even after close/closeOk so we
560 // tick until the connection starts erroring
565 case conn := <-c.deadlines:
566 // When reading, reset our side of the deadline, if we've negotiated one with
567 // a deadline that covers at least 2 server heartbeats
569 conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval))
578 // Convenience method to inspect the Connection.Properties["capabilities"]
579 // Table for server identified capabilities like "basic.ack" or
581 func (c *Connection) isCapable(featureName string) bool {
582 capabilities, _ := c.Properties["capabilities"].(Table)
583 hasFeature, _ := capabilities[featureName].(bool)
587 // allocateChannel records but does not open a new channel with a unique id.
588 // This method is the initial part of the channel lifecycle and paired with
590 func (c *Connection) allocateChannel() (*Channel, error) {
595 return nil, ErrClosed
598 id, ok := c.allocator.next()
600 return nil, ErrChannelMax
603 ch := newChannel(c, uint16(id))
604 c.channels[uint16(id)] = ch
609 // releaseChannel removes a channel from the registry as the final part of the
611 func (c *Connection) releaseChannel(id uint16) {
615 delete(c.channels, id)
616 c.allocator.release(int(id))
619 // openChannel allocates and opens a channel, must be paired with closeChannel
620 func (c *Connection) openChannel() (*Channel, error) {
621 ch, err := c.allocateChannel()
626 if err := ch.open(); err != nil {
627 c.releaseChannel(ch.id)
633 // closeChannel releases and initiates a shutdown of the channel. All channel
634 // closures should be initiated here for proper channel lifecycle management on
636 func (c *Connection) closeChannel(ch *Channel, e *Error) {
638 c.releaseChannel(ch.id)
642 Channel opens a unique, concurrent server channel to process the bulk of AMQP
643 messages. Any error from methods on this receiver will render the receiver
644 invalid and a new Channel should be opened.
647 func (c *Connection) Channel() (*Channel, error) {
648 return c.openChannel()
651 func (c *Connection) call(req message, res ...message) error {
652 // Special case for when the protocol header frame is sent insted of a
655 if err := c.send(&methodFrame{ChannelId: 0, Method: req}); err != nil {
661 case err, ok := <-c.errors:
668 // Try to match one of the result types
669 for _, try := range res {
670 if reflect.TypeOf(msg) == reflect.TypeOf(try) {
672 vres := reflect.ValueOf(try).Elem()
673 vmsg := reflect.ValueOf(msg).Elem()
678 return ErrCommandInvalid
683 // Connection = open-Connection *use-Connection close-Connection
684 // open-Connection = C:protocol-header
685 // S:START C:START-OK
689 // challenge = S:SECURE C:SECURE-OK
690 // use-Connection = *channel
691 // close-Connection = C:CLOSE S:CLOSE-OK
692 // / S:CLOSE C:CLOSE-OK
693 func (c *Connection) open(config Config) error {
694 if err := c.send(&protocolHeader{}); err != nil {
698 return c.openStart(config)
701 func (c *Connection) openStart(config Config) error {
702 start := &connectionStart{}
704 if err := c.call(nil, start); err != nil {
708 c.Major = int(start.VersionMajor)
709 c.Minor = int(start.VersionMinor)
710 c.Properties = Table(start.ServerProperties)
711 c.Locales = strings.Split(start.Locales, " ")
713 // eventually support challenge/response here by also responding to
715 auth, ok := pickSASLMechanism(config.SASL, strings.Split(start.Mechanisms, " "))
720 // Save this mechanism off as the one we chose
721 c.Config.SASL = []Authentication{auth}
723 // Set the connection locale to client locale
724 c.Config.Locale = config.Locale
726 return c.openTune(config, auth)
729 func (c *Connection) openTune(config Config, auth Authentication) error {
730 if len(config.Properties) == 0 {
731 config.Properties = Table{
732 "product": defaultProduct,
733 "version": defaultVersion,
737 config.Properties["capabilities"] = Table{
738 "connection.blocked": true,
739 "consumer_cancel_notify": true,
742 ok := &connectionStartOk{
743 ClientProperties: config.Properties,
744 Mechanism: auth.Mechanism(),
745 Response: auth.Response(),
746 Locale: config.Locale,
748 tune := &connectionTune{}
750 if err := c.call(ok, tune); err != nil {
751 // per spec, a connection can only be closed when it has been opened
752 // so at this point, we know it's an auth error, but the socket
753 // was closed instead. Return a meaningful error.
754 return ErrCredentials
757 // When the server and client both use default 0, then the max channel is
758 // only limited by uint16.
759 c.Config.ChannelMax = pick(config.ChannelMax, int(tune.ChannelMax))
760 if c.Config.ChannelMax == 0 {
761 c.Config.ChannelMax = defaultChannelMax
763 c.Config.ChannelMax = min(c.Config.ChannelMax, maxChannelMax)
765 // Frame size includes headers and end byte (len(payload)+8), even if
766 // this is less than FrameMinSize, use what the server sends because the
767 // alternative is to stop the handshake here.
768 c.Config.FrameSize = pick(config.FrameSize, int(tune.FrameMax))
770 // Save this off for resetDeadline()
771 c.Config.Heartbeat = time.Second * time.Duration(pick(
772 int(config.Heartbeat/time.Second),
773 int(tune.Heartbeat)))
775 // "The client should start sending heartbeats after receiving a
776 // Connection.Tune method"
777 go c.heartbeater(c.Config.Heartbeat, c.NotifyClose(make(chan *Error, 1)))
779 if err := c.send(&methodFrame{
781 Method: &connectionTuneOk{
782 ChannelMax: uint16(c.Config.ChannelMax),
783 FrameMax: uint32(c.Config.FrameSize),
784 Heartbeat: uint16(c.Config.Heartbeat / time.Second),
790 return c.openVhost(config)
793 func (c *Connection) openVhost(config Config) error {
794 req := &connectionOpen{VirtualHost: config.Vhost}
795 res := &connectionOpenOk{}
797 if err := c.call(req, res); err != nil {
798 // Cannot be closed yet, but we know it's a vhost problem
802 c.Config.Vhost = config.Vhost
804 return c.openComplete()
807 // openComplete performs any final Connection initialization dependent on the
808 // connection handshake and clears any state needed for TLS and AMQP handshaking.
809 func (c *Connection) openComplete() error {
810 // We clear the deadlines and let the heartbeater reset the read deadline if requested.
811 // RabbitMQ uses TCP flow control at this point for pushback so Writes can
812 // intentionally block.
813 if deadliner, ok := c.conn.(interface {
814 SetDeadline(time.Time) error
816 _ = deadliner.SetDeadline(time.Time{})
819 c.allocator = newAllocator(1, c.Config.ChannelMax)
823 func max(a, b int) int {
830 func min(a, b int) int {
837 func pick(client, server int) int {
838 if client == 0 || server == 0 {
839 return max(client, server)
841 return min(client, server)