src: Add DMA localagent
[barometer.git] / src / dma / vendor / github.com / streadway / amqp / connection.go
1 // Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
2 // Use of this source code is governed by a BSD-style
3 // license that can be found in the LICENSE file.
4 // Source code and contact info at http://github.com/streadway/amqp
5
6 package amqp
7
8 import (
9         "bufio"
10         "crypto/tls"
11         "io"
12         "net"
13         "reflect"
14         "strconv"
15         "strings"
16         "sync"
17         "sync/atomic"
18         "time"
19 )
20
21 const (
22         maxChannelMax = (2 << 15) - 1
23
24         defaultHeartbeat         = 10 * time.Second
25         defaultConnectionTimeout = 30 * time.Second
26         defaultProduct           = "https://github.com/streadway/amqp"
27         defaultVersion           = "β"
28         // Safer default that makes channel leaks a lot easier to spot
29         // before they create operational headaches. See https://github.com/rabbitmq/rabbitmq-server/issues/1593.
30         defaultChannelMax = (2 << 10) - 1
31         defaultLocale     = "en_US"
32 )
33
34 // Config is used in DialConfig and Open to specify the desired tuning
35 // parameters used during a connection open handshake.  The negotiated tuning
36 // will be stored in the returned connection's Config field.
37 type Config struct {
38         // The SASL mechanisms to try in the client request, and the successful
39         // mechanism used on the Connection object.
40         // If SASL is nil, PlainAuth from the URL is used.
41         SASL []Authentication
42
43         // Vhost specifies the namespace of permissions, exchanges, queues and
44         // bindings on the server.  Dial sets this to the path parsed from the URL.
45         Vhost string
46
47         ChannelMax int           // 0 max channels means 2^16 - 1
48         FrameSize  int           // 0 max bytes means unlimited
49         Heartbeat  time.Duration // less than 1s uses the server's interval
50
51         // TLSClientConfig specifies the client configuration of the TLS connection
52         // when establishing a tls transport.
53         // If the URL uses an amqps scheme, then an empty tls.Config with the
54         // ServerName from the URL is used.
55         TLSClientConfig *tls.Config
56
57         // Properties is table of properties that the client advertises to the server.
58         // This is an optional setting - if the application does not set this,
59         // the underlying library will use a generic set of client properties.
60         Properties Table
61
62         // Connection locale that we expect to always be en_US
63         // Even though servers must return it as per the AMQP 0-9-1 spec,
64         // we are not aware of it being used other than to satisfy the spec requirements
65         Locale string
66
67         // Dial returns a net.Conn prepared for a TLS handshake with TSLClientConfig,
68         // then an AMQP connection handshake.
69         // If Dial is nil, net.DialTimeout with a 30s connection and 30s deadline is
70         // used during TLS and AMQP handshaking.
71         Dial func(network, addr string) (net.Conn, error)
72 }
73
74 // Connection manages the serialization and deserialization of frames from IO
75 // and dispatches the frames to the appropriate channel.  All RPC methods and
76 // asynchronous Publishing, Delivery, Ack, Nack and Return messages are
77 // multiplexed on this channel.  There must always be active receivers for
78 // every asynchronous message on this connection.
79 type Connection struct {
80         destructor sync.Once  // shutdown once
81         sendM      sync.Mutex // conn writer mutex
82         m          sync.Mutex // struct field mutex
83
84         conn io.ReadWriteCloser
85
86         rpc       chan message
87         writer    *writer
88         sends     chan time.Time     // timestamps of each frame sent
89         deadlines chan readDeadliner // heartbeater updates read deadlines
90
91         allocator *allocator // id generator valid after openTune
92         channels  map[uint16]*Channel
93
94         noNotify bool // true when we will never notify again
95         closes   []chan *Error
96         blocks   []chan Blocking
97
98         errors chan *Error
99
100         Config Config // The negotiated Config after connection.open
101
102         Major      int      // Server's major version
103         Minor      int      // Server's minor version
104         Properties Table    // Server properties
105         Locales    []string // Server locales
106
107         closed int32 // Will be 1 if the connection is closed, 0 otherwise. Should only be accessed as atomic
108 }
109
110 type readDeadliner interface {
111         SetReadDeadline(time.Time) error
112 }
113
114 // defaultDial establishes a connection when config.Dial is not provided
115 func defaultDial(network, addr string) (net.Conn, error) {
116         conn, err := net.DialTimeout(network, addr, defaultConnectionTimeout)
117         if err != nil {
118                 return nil, err
119         }
120
121         // Heartbeating hasn't started yet, don't stall forever on a dead server.
122         // A deadline is set for TLS and AMQP handshaking. After AMQP is established,
123         // the deadline is cleared in openComplete.
124         if err := conn.SetDeadline(time.Now().Add(defaultConnectionTimeout)); err != nil {
125                 return nil, err
126         }
127
128         return conn, nil
129 }
130
131 // Dial accepts a string in the AMQP URI format and returns a new Connection
132 // over TCP using PlainAuth.  Defaults to a server heartbeat interval of 10
133 // seconds and sets the handshake deadline to 30 seconds. After handshake,
134 // deadlines are cleared.
135 //
136 // Dial uses the zero value of tls.Config when it encounters an amqps://
137 // scheme.  It is equivalent to calling DialTLS(amqp, nil).
138 func Dial(url string) (*Connection, error) {
139         return DialConfig(url, Config{
140                 Heartbeat: defaultHeartbeat,
141                 Locale:    defaultLocale,
142         })
143 }
144
145 // DialTLS accepts a string in the AMQP URI format and returns a new Connection
146 // over TCP using PlainAuth.  Defaults to a server heartbeat interval of 10
147 // seconds and sets the initial read deadline to 30 seconds.
148 //
149 // DialTLS uses the provided tls.Config when encountering an amqps:// scheme.
150 func DialTLS(url string, amqps *tls.Config) (*Connection, error) {
151         return DialConfig(url, Config{
152                 Heartbeat:       defaultHeartbeat,
153                 TLSClientConfig: amqps,
154                 Locale:          defaultLocale,
155         })
156 }
157
158 // DialConfig accepts a string in the AMQP URI format and a configuration for
159 // the transport and connection setup, returning a new Connection.  Defaults to
160 // a server heartbeat interval of 10 seconds and sets the initial read deadline
161 // to 30 seconds.
162 func DialConfig(url string, config Config) (*Connection, error) {
163         var err error
164         var conn net.Conn
165
166         uri, err := ParseURI(url)
167         if err != nil {
168                 return nil, err
169         }
170
171         if config.SASL == nil {
172                 config.SASL = []Authentication{uri.PlainAuth()}
173         }
174
175         if config.Vhost == "" {
176                 config.Vhost = uri.Vhost
177         }
178
179         addr := net.JoinHostPort(uri.Host, strconv.FormatInt(int64(uri.Port), 10))
180
181         dialer := config.Dial
182         if dialer == nil {
183                 dialer = defaultDial
184         }
185
186         conn, err = dialer("tcp", addr)
187         if err != nil {
188                 return nil, err
189         }
190
191         if uri.Scheme == "amqps" {
192                 if config.TLSClientConfig == nil {
193                         config.TLSClientConfig = new(tls.Config)
194                 }
195
196                 // If ServerName has not been specified in TLSClientConfig,
197                 // set it to the URI host used for this connection.
198                 if config.TLSClientConfig.ServerName == "" {
199                         config.TLSClientConfig.ServerName = uri.Host
200                 }
201
202                 client := tls.Client(conn, config.TLSClientConfig)
203                 if err := client.Handshake(); err != nil {
204                         conn.Close()
205                         return nil, err
206                 }
207
208                 conn = client
209         }
210
211         return Open(conn, config)
212 }
213
214 /*
215 Open accepts an already established connection, or other io.ReadWriteCloser as
216 a transport.  Use this method if you have established a TLS connection or wish
217 to use your own custom transport.
218
219 */
220 func Open(conn io.ReadWriteCloser, config Config) (*Connection, error) {
221         c := &Connection{
222                 conn:      conn,
223                 writer:    &writer{bufio.NewWriter(conn)},
224                 channels:  make(map[uint16]*Channel),
225                 rpc:       make(chan message),
226                 sends:     make(chan time.Time),
227                 errors:    make(chan *Error, 1),
228                 deadlines: make(chan readDeadliner, 1),
229         }
230         go c.reader(conn)
231         return c, c.open(config)
232 }
233
234 /*
235 LocalAddr returns the local TCP peer address, or ":0" (the zero value of net.TCPAddr)
236 as a fallback default value if the underlying transport does not support LocalAddr().
237 */
238 func (c *Connection) LocalAddr() net.Addr {
239         if conn, ok := c.conn.(interface {
240                 LocalAddr() net.Addr
241         }); ok {
242                 return conn.LocalAddr()
243         }
244         return &net.TCPAddr{}
245 }
246
247 // ConnectionState returns basic TLS details of the underlying transport.
248 // Returns a zero value when the underlying connection does not implement
249 // ConnectionState() tls.ConnectionState.
250 func (c *Connection) ConnectionState() tls.ConnectionState {
251         if conn, ok := c.conn.(interface {
252                 ConnectionState() tls.ConnectionState
253         }); ok {
254                 return conn.ConnectionState()
255         }
256         return tls.ConnectionState{}
257 }
258
259 /*
260 NotifyClose registers a listener for close events either initiated by an error
261 accompanying a connection.close method or by a normal shutdown.
262
263 On normal shutdowns, the chan will be closed.
264
265 To reconnect after a transport or protocol error, register a listener here and
266 re-run your setup process.
267
268 */
269 func (c *Connection) NotifyClose(receiver chan *Error) chan *Error {
270         c.m.Lock()
271         defer c.m.Unlock()
272
273         if c.noNotify {
274                 close(receiver)
275         } else {
276                 c.closes = append(c.closes, receiver)
277         }
278
279         return receiver
280 }
281
282 /*
283 NotifyBlocked registers a listener for RabbitMQ specific TCP flow control
284 method extensions connection.blocked and connection.unblocked.  Flow control is
285 active with a reason when Blocking.Blocked is true.  When a Connection is
286 blocked, all methods will block across all connections until server resources
287 become free again.
288
289 This optional extension is supported by the server when the
290 "connection.blocked" server capability key is true.
291
292 */
293 func (c *Connection) NotifyBlocked(receiver chan Blocking) chan Blocking {
294         c.m.Lock()
295         defer c.m.Unlock()
296
297         if c.noNotify {
298                 close(receiver)
299         } else {
300                 c.blocks = append(c.blocks, receiver)
301         }
302
303         return receiver
304 }
305
306 /*
307 Close requests and waits for the response to close the AMQP connection.
308
309 It's advisable to use this message when publishing to ensure all kernel buffers
310 have been flushed on the server and client before exiting.
311
312 An error indicates that server may not have received this request to close but
313 the connection should be treated as closed regardless.
314
315 After returning from this call, all resources associated with this connection,
316 including the underlying io, Channels, Notify listeners and Channel consumers
317 will also be closed.
318 */
319 func (c *Connection) Close() error {
320         if c.isClosed() {
321                 return ErrClosed
322         }
323
324         defer c.shutdown(nil)
325         return c.call(
326                 &connectionClose{
327                         ReplyCode: replySuccess,
328                         ReplyText: "kthxbai",
329                 },
330                 &connectionCloseOk{},
331         )
332 }
333
334 func (c *Connection) closeWith(err *Error) error {
335         if c.isClosed() {
336                 return ErrClosed
337         }
338
339         defer c.shutdown(err)
340         return c.call(
341                 &connectionClose{
342                         ReplyCode: uint16(err.Code),
343                         ReplyText: err.Reason,
344                 },
345                 &connectionCloseOk{},
346         )
347 }
348
349 func (c *Connection) isClosed() bool {
350         return (atomic.LoadInt32(&c.closed) == 1)
351 }
352
353 func (c *Connection) send(f frame) error {
354         if c.isClosed() {
355                 return ErrClosed
356         }
357
358         c.sendM.Lock()
359         err := c.writer.WriteFrame(f)
360         c.sendM.Unlock()
361
362         if err != nil {
363                 // shutdown could be re-entrant from signaling notify chans
364                 go c.shutdown(&Error{
365                         Code:   FrameError,
366                         Reason: err.Error(),
367                 })
368         } else {
369                 // Broadcast we sent a frame, reducing heartbeats, only
370                 // if there is something that can receive - like a non-reentrant
371                 // call or if the heartbeater isn't running
372                 select {
373                 case c.sends <- time.Now():
374                 default:
375                 }
376         }
377
378         return err
379 }
380
381 func (c *Connection) shutdown(err *Error) {
382         atomic.StoreInt32(&c.closed, 1)
383
384         c.destructor.Do(func() {
385                 c.m.Lock()
386                 defer c.m.Unlock()
387
388                 if err != nil {
389                         for _, c := range c.closes {
390                                 c <- err
391                         }
392                 }
393
394                 if err != nil {
395                         c.errors <- err
396                 }
397                 // Shutdown handler goroutine can still receive the result.
398                 close(c.errors)
399
400                 for _, c := range c.closes {
401                         close(c)
402                 }
403
404                 for _, c := range c.blocks {
405                         close(c)
406                 }
407
408                 // Shutdown the channel, but do not use closeChannel() as it calls
409                 // releaseChannel() which requires the connection lock.
410                 //
411                 // Ranging over c.channels and calling releaseChannel() that mutates
412                 // c.channels is racy - see commit 6063341 for an example.
413                 for _, ch := range c.channels {
414                         ch.shutdown(err)
415                 }
416
417                 c.conn.Close()
418
419                 c.channels = map[uint16]*Channel{}
420                 c.allocator = newAllocator(1, c.Config.ChannelMax)
421                 c.noNotify = true
422         })
423 }
424
425 // All methods sent to the connection channel should be synchronous so we
426 // can handle them directly without a framing component
427 func (c *Connection) demux(f frame) {
428         if f.channel() == 0 {
429                 c.dispatch0(f)
430         } else {
431                 c.dispatchN(f)
432         }
433 }
434
435 func (c *Connection) dispatch0(f frame) {
436         switch mf := f.(type) {
437         case *methodFrame:
438                 switch m := mf.Method.(type) {
439                 case *connectionClose:
440                         // Send immediately as shutdown will close our side of the writer.
441                         c.send(&methodFrame{
442                                 ChannelId: 0,
443                                 Method:    &connectionCloseOk{},
444                         })
445
446                         c.shutdown(newError(m.ReplyCode, m.ReplyText))
447                 case *connectionBlocked:
448                         for _, c := range c.blocks {
449                                 c <- Blocking{Active: true, Reason: m.Reason}
450                         }
451                 case *connectionUnblocked:
452                         for _, c := range c.blocks {
453                                 c <- Blocking{Active: false}
454                         }
455                 default:
456                         c.rpc <- m
457                 }
458         case *heartbeatFrame:
459                 // kthx - all reads reset our deadline.  so we can drop this
460         default:
461                 // lolwat - channel0 only responds to methods and heartbeats
462                 c.closeWith(ErrUnexpectedFrame)
463         }
464 }
465
466 func (c *Connection) dispatchN(f frame) {
467         c.m.Lock()
468         channel := c.channels[f.channel()]
469         c.m.Unlock()
470
471         if channel != nil {
472                 channel.recv(channel, f)
473         } else {
474                 c.dispatchClosed(f)
475         }
476 }
477
478 // section 2.3.7: "When a peer decides to close a channel or connection, it
479 // sends a Close method.  The receiving peer MUST respond to a Close with a
480 // Close-Ok, and then both parties can close their channel or connection.  Note
481 // that if peers ignore Close, deadlock can happen when both peers send Close
482 // at the same time."
483 //
484 // When we don't have a channel, so we must respond with close-ok on a close
485 // method.  This can happen between a channel exception on an asynchronous
486 // method like basic.publish and a synchronous close with channel.close.
487 // In that case, we'll get both a channel.close and channel.close-ok in any
488 // order.
489 func (c *Connection) dispatchClosed(f frame) {
490         // Only consider method frames, drop content/header frames
491         if mf, ok := f.(*methodFrame); ok {
492                 switch mf.Method.(type) {
493                 case *channelClose:
494                         c.send(&methodFrame{
495                                 ChannelId: f.channel(),
496                                 Method:    &channelCloseOk{},
497                         })
498                 case *channelCloseOk:
499                         // we are already closed, so do nothing
500                 default:
501                         // unexpected method on closed channel
502                         c.closeWith(ErrClosed)
503                 }
504         }
505 }
506
507 // Reads each frame off the IO and hand off to the connection object that
508 // will demux the streams and dispatch to one of the opened channels or
509 // handle on channel 0 (the connection channel).
510 func (c *Connection) reader(r io.Reader) {
511         buf := bufio.NewReader(r)
512         frames := &reader{buf}
513         conn, haveDeadliner := r.(readDeadliner)
514
515         for {
516                 frame, err := frames.ReadFrame()
517
518                 if err != nil {
519                         c.shutdown(&Error{Code: FrameError, Reason: err.Error()})
520                         return
521                 }
522
523                 c.demux(frame)
524
525                 if haveDeadliner {
526                         c.deadlines <- conn
527                 }
528         }
529 }
530
531 // Ensures that at least one frame is being sent at the tuned interval with a
532 // jitter tolerance of 1s
533 func (c *Connection) heartbeater(interval time.Duration, done chan *Error) {
534         const maxServerHeartbeatsInFlight = 3
535
536         var sendTicks <-chan time.Time
537         if interval > 0 {
538                 ticker := time.NewTicker(interval)
539                 defer ticker.Stop()
540                 sendTicks = ticker.C
541         }
542
543         lastSent := time.Now()
544
545         for {
546                 select {
547                 case at, stillSending := <-c.sends:
548                         // When actively sending, depend on sent frames to reset server timer
549                         if stillSending {
550                                 lastSent = at
551                         } else {
552                                 return
553                         }
554
555                 case at := <-sendTicks:
556                         // When idle, fill the space with a heartbeat frame
557                         if at.Sub(lastSent) > interval-time.Second {
558                                 if err := c.send(&heartbeatFrame{}); err != nil {
559                                         // send heartbeats even after close/closeOk so we
560                                         // tick until the connection starts erroring
561                                         return
562                                 }
563                         }
564
565                 case conn := <-c.deadlines:
566                         // When reading, reset our side of the deadline, if we've negotiated one with
567                         // a deadline that covers at least 2 server heartbeats
568                         if interval > 0 {
569                                 conn.SetReadDeadline(time.Now().Add(maxServerHeartbeatsInFlight * interval))
570                         }
571
572                 case <-done:
573                         return
574                 }
575         }
576 }
577
578 // Convenience method to inspect the Connection.Properties["capabilities"]
579 // Table for server identified capabilities like "basic.ack" or
580 // "confirm.select".
581 func (c *Connection) isCapable(featureName string) bool {
582         capabilities, _ := c.Properties["capabilities"].(Table)
583         hasFeature, _ := capabilities[featureName].(bool)
584         return hasFeature
585 }
586
587 // allocateChannel records but does not open a new channel with a unique id.
588 // This method is the initial part of the channel lifecycle and paired with
589 // releaseChannel
590 func (c *Connection) allocateChannel() (*Channel, error) {
591         c.m.Lock()
592         defer c.m.Unlock()
593
594         if c.isClosed() {
595                 return nil, ErrClosed
596         }
597
598         id, ok := c.allocator.next()
599         if !ok {
600                 return nil, ErrChannelMax
601         }
602
603         ch := newChannel(c, uint16(id))
604         c.channels[uint16(id)] = ch
605
606         return ch, nil
607 }
608
609 // releaseChannel removes a channel from the registry as the final part of the
610 // channel lifecycle
611 func (c *Connection) releaseChannel(id uint16) {
612         c.m.Lock()
613         defer c.m.Unlock()
614
615         delete(c.channels, id)
616         c.allocator.release(int(id))
617 }
618
619 // openChannel allocates and opens a channel, must be paired with closeChannel
620 func (c *Connection) openChannel() (*Channel, error) {
621         ch, err := c.allocateChannel()
622         if err != nil {
623                 return nil, err
624         }
625
626         if err := ch.open(); err != nil {
627                 c.releaseChannel(ch.id)
628                 return nil, err
629         }
630         return ch, nil
631 }
632
633 // closeChannel releases and initiates a shutdown of the channel.  All channel
634 // closures should be initiated here for proper channel lifecycle management on
635 // this connection.
636 func (c *Connection) closeChannel(ch *Channel, e *Error) {
637         ch.shutdown(e)
638         c.releaseChannel(ch.id)
639 }
640
641 /*
642 Channel opens a unique, concurrent server channel to process the bulk of AMQP
643 messages.  Any error from methods on this receiver will render the receiver
644 invalid and a new Channel should be opened.
645
646 */
647 func (c *Connection) Channel() (*Channel, error) {
648         return c.openChannel()
649 }
650
651 func (c *Connection) call(req message, res ...message) error {
652         // Special case for when the protocol header frame is sent insted of a
653         // request method
654         if req != nil {
655                 if err := c.send(&methodFrame{ChannelId: 0, Method: req}); err != nil {
656                         return err
657                 }
658         }
659
660         select {
661         case err, ok := <-c.errors:
662                 if !ok {
663                         return ErrClosed
664                 }
665                 return err
666
667         case msg := <-c.rpc:
668                 // Try to match one of the result types
669                 for _, try := range res {
670                         if reflect.TypeOf(msg) == reflect.TypeOf(try) {
671                                 // *res = *msg
672                                 vres := reflect.ValueOf(try).Elem()
673                                 vmsg := reflect.ValueOf(msg).Elem()
674                                 vres.Set(vmsg)
675                                 return nil
676                         }
677                 }
678                 return ErrCommandInvalid
679         }
680         // unreachable
681 }
682
683 //    Connection          = open-Connection *use-Connection close-Connection
684 //    open-Connection     = C:protocol-header
685 //                          S:START C:START-OK
686 //                          *challenge
687 //                          S:TUNE C:TUNE-OK
688 //                          C:OPEN S:OPEN-OK
689 //    challenge           = S:SECURE C:SECURE-OK
690 //    use-Connection      = *channel
691 //    close-Connection    = C:CLOSE S:CLOSE-OK
692 //                        / S:CLOSE C:CLOSE-OK
693 func (c *Connection) open(config Config) error {
694         if err := c.send(&protocolHeader{}); err != nil {
695                 return err
696         }
697
698         return c.openStart(config)
699 }
700
701 func (c *Connection) openStart(config Config) error {
702         start := &connectionStart{}
703
704         if err := c.call(nil, start); err != nil {
705                 return err
706         }
707
708         c.Major = int(start.VersionMajor)
709         c.Minor = int(start.VersionMinor)
710         c.Properties = Table(start.ServerProperties)
711         c.Locales = strings.Split(start.Locales, " ")
712
713         // eventually support challenge/response here by also responding to
714         // connectionSecure.
715         auth, ok := pickSASLMechanism(config.SASL, strings.Split(start.Mechanisms, " "))
716         if !ok {
717                 return ErrSASL
718         }
719
720         // Save this mechanism off as the one we chose
721         c.Config.SASL = []Authentication{auth}
722
723         // Set the connection locale to client locale
724         c.Config.Locale = config.Locale
725
726         return c.openTune(config, auth)
727 }
728
729 func (c *Connection) openTune(config Config, auth Authentication) error {
730         if len(config.Properties) == 0 {
731                 config.Properties = Table{
732                         "product": defaultProduct,
733                         "version": defaultVersion,
734                 }
735         }
736
737         config.Properties["capabilities"] = Table{
738                 "connection.blocked":     true,
739                 "consumer_cancel_notify": true,
740         }
741
742         ok := &connectionStartOk{
743                 ClientProperties: config.Properties,
744                 Mechanism:        auth.Mechanism(),
745                 Response:         auth.Response(),
746                 Locale:           config.Locale,
747         }
748         tune := &connectionTune{}
749
750         if err := c.call(ok, tune); err != nil {
751                 // per spec, a connection can only be closed when it has been opened
752                 // so at this point, we know it's an auth error, but the socket
753                 // was closed instead.  Return a meaningful error.
754                 return ErrCredentials
755         }
756
757         // When the server and client both use default 0, then the max channel is
758         // only limited by uint16.
759         c.Config.ChannelMax = pick(config.ChannelMax, int(tune.ChannelMax))
760         if c.Config.ChannelMax == 0 {
761                 c.Config.ChannelMax = defaultChannelMax
762         }
763         c.Config.ChannelMax = min(c.Config.ChannelMax, maxChannelMax)
764
765         // Frame size includes headers and end byte (len(payload)+8), even if
766         // this is less than FrameMinSize, use what the server sends because the
767         // alternative is to stop the handshake here.
768         c.Config.FrameSize = pick(config.FrameSize, int(tune.FrameMax))
769
770         // Save this off for resetDeadline()
771         c.Config.Heartbeat = time.Second * time.Duration(pick(
772                 int(config.Heartbeat/time.Second),
773                 int(tune.Heartbeat)))
774
775         // "The client should start sending heartbeats after receiving a
776         // Connection.Tune method"
777         go c.heartbeater(c.Config.Heartbeat, c.NotifyClose(make(chan *Error, 1)))
778
779         if err := c.send(&methodFrame{
780                 ChannelId: 0,
781                 Method: &connectionTuneOk{
782                         ChannelMax: uint16(c.Config.ChannelMax),
783                         FrameMax:   uint32(c.Config.FrameSize),
784                         Heartbeat:  uint16(c.Config.Heartbeat / time.Second),
785                 },
786         }); err != nil {
787                 return err
788         }
789
790         return c.openVhost(config)
791 }
792
793 func (c *Connection) openVhost(config Config) error {
794         req := &connectionOpen{VirtualHost: config.Vhost}
795         res := &connectionOpenOk{}
796
797         if err := c.call(req, res); err != nil {
798                 // Cannot be closed yet, but we know it's a vhost problem
799                 return ErrVhost
800         }
801
802         c.Config.Vhost = config.Vhost
803
804         return c.openComplete()
805 }
806
807 // openComplete performs any final Connection initialization dependent on the
808 // connection handshake and clears any state needed for TLS and AMQP handshaking.
809 func (c *Connection) openComplete() error {
810         // We clear the deadlines and let the heartbeater reset the read deadline if requested.
811         // RabbitMQ uses TCP flow control at this point for pushback so Writes can
812         // intentionally block.
813         if deadliner, ok := c.conn.(interface {
814                 SetDeadline(time.Time) error
815         }); ok {
816                 _ = deadliner.SetDeadline(time.Time{})
817         }
818
819         c.allocator = newAllocator(1, c.Config.ChannelMax)
820         return nil
821 }
822
823 func max(a, b int) int {
824         if a > b {
825                 return a
826         }
827         return b
828 }
829
830 func min(a, b int) int {
831         if a < b {
832                 return a
833         }
834         return b
835 }
836
837 func pick(client, server int) int {
838         if client == 0 || server == 0 {
839                 return max(client, server)
840         }
841         return min(client, server)
842 }