SetReadDeadline(time.Time) error
}
-// defaultDial establishes a connection when config.Dial is not provided
-func defaultDial(network, addr string) (net.Conn, error) {
- conn, err := net.DialTimeout(network, addr, defaultConnectionTimeout)
- if err != nil {
- return nil, err
- }
+// DefaultDial establishes a connection when config.Dial is not provided
+func DefaultDial(connectionTimeout time.Duration) func(network, addr string) (net.Conn, error) {
+ return func(network, addr string) (net.Conn, error) {
+ conn, err := net.DialTimeout(network, addr, connectionTimeout)
+ if err != nil {
+ return nil, err
+ }
- // Heartbeating hasn't started yet, don't stall forever on a dead server.
- // A deadline is set for TLS and AMQP handshaking. After AMQP is established,
- // the deadline is cleared in openComplete.
- if err := conn.SetDeadline(time.Now().Add(defaultConnectionTimeout)); err != nil {
- return nil, err
- }
+ // Heartbeating hasn't started yet, don't stall forever on a dead server.
+ // A deadline is set for TLS and AMQP handshaking. After AMQP is established,
+ // the deadline is cleared in openComplete.
+ if err := conn.SetDeadline(time.Now().Add(connectionTimeout)); err != nil {
+ return nil, err
+ }
- return conn, nil
+ return conn, nil
+ }
}
// Dial accepts a string in the AMQP URI format and returns a new Connection
dialer := config.Dial
if dialer == nil {
- dialer = defaultDial
+ dialer = DefaultDial(defaultConnectionTimeout)
}
conn, err = dialer("tcp", addr)
client := tls.Client(conn, config.TLSClientConfig)
if err := client.Handshake(); err != nil {
+
conn.Close()
return nil, err
}
will also be closed.
*/
func (c *Connection) Close() error {
- if c.isClosed() {
+ if c.IsClosed() {
return ErrClosed
}
}
func (c *Connection) closeWith(err *Error) error {
- if c.isClosed() {
+ if c.IsClosed() {
return ErrClosed
}
)
}
-func (c *Connection) isClosed() bool {
+// IsClosed returns true if the connection is marked as closed, otherwise false
+// is returned.
+func (c *Connection) IsClosed() bool {
return (atomic.LoadInt32(&c.closed) == 1)
}
func (c *Connection) send(f frame) error {
- if c.isClosed() {
+ if c.IsClosed() {
return ErrClosed
}
c.m.Lock()
defer c.m.Unlock()
- if c.isClosed() {
+ if c.IsClosed() {
return nil, ErrClosed
}