barometer: update DMA's vendoring packages
[barometer.git] / src / dma / vendor / github.com / streadway / amqp / connection.go
index ca1372d..b9d8e8e 100644 (file)
@@ -111,21 +111,23 @@ type readDeadliner interface {
        SetReadDeadline(time.Time) error
 }
 
-// defaultDial establishes a connection when config.Dial is not provided
-func defaultDial(network, addr string) (net.Conn, error) {
-       conn, err := net.DialTimeout(network, addr, defaultConnectionTimeout)
-       if err != nil {
-               return nil, err
-       }
+// DefaultDial establishes a connection when config.Dial is not provided
+func DefaultDial(connectionTimeout time.Duration) func(network, addr string) (net.Conn, error) {
+       return func(network, addr string) (net.Conn, error) {
+               conn, err := net.DialTimeout(network, addr, connectionTimeout)
+               if err != nil {
+                       return nil, err
+               }
 
-       // Heartbeating hasn't started yet, don't stall forever on a dead server.
-       // A deadline is set for TLS and AMQP handshaking. After AMQP is established,
-       // the deadline is cleared in openComplete.
-       if err := conn.SetDeadline(time.Now().Add(defaultConnectionTimeout)); err != nil {
-               return nil, err
-       }
+               // Heartbeating hasn't started yet, don't stall forever on a dead server.
+               // A deadline is set for TLS and AMQP handshaking. After AMQP is established,
+               // the deadline is cleared in openComplete.
+               if err := conn.SetDeadline(time.Now().Add(connectionTimeout)); err != nil {
+                       return nil, err
+               }
 
-       return conn, nil
+               return conn, nil
+       }
 }
 
 // Dial accepts a string in the AMQP URI format and returns a new Connection
@@ -180,7 +182,7 @@ func DialConfig(url string, config Config) (*Connection, error) {
 
        dialer := config.Dial
        if dialer == nil {
-               dialer = defaultDial
+               dialer = DefaultDial(defaultConnectionTimeout)
        }
 
        conn, err = dialer("tcp", addr)
@@ -201,6 +203,7 @@ func DialConfig(url string, config Config) (*Connection, error) {
 
                client := tls.Client(conn, config.TLSClientConfig)
                if err := client.Handshake(); err != nil {
+
                        conn.Close()
                        return nil, err
                }
@@ -317,7 +320,7 @@ including the underlying io, Channels, Notify listeners and Channel consumers
 will also be closed.
 */
 func (c *Connection) Close() error {
-       if c.isClosed() {
+       if c.IsClosed() {
                return ErrClosed
        }
 
@@ -332,7 +335,7 @@ func (c *Connection) Close() error {
 }
 
 func (c *Connection) closeWith(err *Error) error {
-       if c.isClosed() {
+       if c.IsClosed() {
                return ErrClosed
        }
 
@@ -346,12 +349,14 @@ func (c *Connection) closeWith(err *Error) error {
        )
 }
 
-func (c *Connection) isClosed() bool {
+// IsClosed returns true if the connection is marked as closed, otherwise false
+// is returned.
+func (c *Connection) IsClosed() bool {
        return (atomic.LoadInt32(&c.closed) == 1)
 }
 
 func (c *Connection) send(f frame) error {
-       if c.isClosed() {
+       if c.IsClosed() {
                return ErrClosed
        }
 
@@ -591,7 +596,7 @@ func (c *Connection) allocateChannel() (*Channel, error) {
        c.m.Lock()
        defer c.m.Unlock()
 
-       if c.isClosed() {
+       if c.IsClosed() {
                return nil, ErrClosed
        }