barometer: update DMA's vendoring packages
[barometer.git] / src / dma / vendor / github.com / go-redis / redis / cluster.go
index 7a1af14..0cecc62 100644 (file)
@@ -3,11 +3,11 @@ package redis
 import (
        "context"
        "crypto/tls"
-       "errors"
        "fmt"
        "math"
        "math/rand"
        "net"
+       "runtime"
        "sort"
        "sync"
        "sync/atomic"
@@ -17,7 +17,6 @@ import (
        "github.com/go-redis/redis/internal/hashtag"
        "github.com/go-redis/redis/internal/pool"
        "github.com/go-redis/redis/internal/proto"
-       "github.com/go-redis/redis/internal/singleflight"
 )
 
 var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
@@ -49,14 +48,18 @@ type ClusterOptions struct {
        // and Cluster.ReloadState to manually trigger state reloading.
        ClusterSlots func() ([]ClusterSlot, error)
 
+       // Optional hook that is called when a new node is created.
+       OnNewNode func(*Client)
+
        // Following options are copied from Options struct.
 
        OnConnect func(*Conn) error
 
+       Password string
+
        MaxRetries      int
        MinRetryBackoff time.Duration
        MaxRetryBackoff time.Duration
-       Password        string
 
        DialTimeout  time.Duration
        ReadTimeout  time.Duration
@@ -64,6 +67,8 @@ type ClusterOptions struct {
 
        // PoolSize applies per cluster node and not for the whole cluster.
        PoolSize           int
+       MinIdleConns       int
+       MaxConnAge         time.Duration
        PoolTimeout        time.Duration
        IdleTimeout        time.Duration
        IdleCheckFrequency time.Duration
@@ -78,10 +83,14 @@ func (opt *ClusterOptions) init() {
                opt.MaxRedirects = 8
        }
 
-       if opt.RouteByLatency || opt.RouteRandomly {
+       if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil {
                opt.ReadOnly = true
        }
 
+       if opt.PoolSize == 0 {
+               opt.PoolSize = 5 * runtime.NumCPU()
+       }
+
        switch opt.ReadTimeout {
        case -1:
                opt.ReadTimeout = 0
@@ -125,10 +134,11 @@ func (opt *ClusterOptions) clientOptions() *Options {
                ReadTimeout:  opt.ReadTimeout,
                WriteTimeout: opt.WriteTimeout,
 
-               PoolSize:    opt.PoolSize,
-               PoolTimeout: opt.PoolTimeout,
-               IdleTimeout: opt.IdleTimeout,
-
+               PoolSize:           opt.PoolSize,
+               MinIdleConns:       opt.MinIdleConns,
+               MaxConnAge:         opt.MaxConnAge,
+               PoolTimeout:        opt.PoolTimeout,
+               IdleTimeout:        opt.IdleTimeout,
                IdleCheckFrequency: disableIdleCheck,
 
                TLSConfig: opt.TLSConfig,
@@ -157,6 +167,10 @@ func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
                go node.updateLatency()
        }
 
+       if clOpt.OnNewNode != nil {
+               clOpt.OnNewNode(node.Client)
+       }
+
        return &node
 }
 
@@ -228,8 +242,6 @@ type clusterNodes struct {
        clusterAddrs []string
        closed       bool
 
-       nodeCreateGroup singleflight.Group
-
        _generation uint32 // atomic
 }
 
@@ -332,11 +344,6 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
                return node, nil
        }
 
-       v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
-               node := newClusterNode(c.opt, addr)
-               return node, nil
-       })
-
        c.mu.Lock()
        defer c.mu.Unlock()
 
@@ -346,15 +353,13 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
 
        node, ok := c.allNodes[addr]
        if ok {
-               _ = v.(*clusterNode).Close()
                return node, err
        }
-       node = v.(*clusterNode)
+
+       node = newClusterNode(c.opt, addr)
 
        c.allAddrs = appendIfNotExists(c.allAddrs, addr)
-       if err == nil {
-               c.clusterAddrs = append(c.clusterAddrs, addr)
-       }
+       c.clusterAddrs = append(c.clusterAddrs, addr)
        c.allNodes[addr] = node
 
        return node, err
@@ -429,13 +434,15 @@ func newClusterState(
                createdAt:  time.Now(),
        }
 
-       isLoopbackOrigin := isLoopbackAddr(origin)
+       originHost, _, _ := net.SplitHostPort(origin)
+       isLoopbackOrigin := isLoopback(originHost)
+
        for _, slot := range slots {
                var nodes []*clusterNode
                for i, slotNode := range slot.Nodes {
                        addr := slotNode.Addr
-                       if !isLoopbackOrigin && useOriginAddr(origin, addr) {
-                               addr = origin
+                       if !isLoopbackOrigin {
+                               addr = replaceLoopbackHost(addr, originHost)
                        }
 
                        node, err := c.nodes.GetOrCreate(addr)
@@ -469,6 +476,33 @@ func newClusterState(
        return &c, nil
 }
 
+func replaceLoopbackHost(nodeAddr, originHost string) string {
+       nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
+       if err != nil {
+               return nodeAddr
+       }
+
+       nodeIP := net.ParseIP(nodeHost)
+       if nodeIP == nil {
+               return nodeAddr
+       }
+
+       if !nodeIP.IsLoopback() {
+               return nodeAddr
+       }
+
+       // Use origin host which is not loopback and node port.
+       return net.JoinHostPort(originHost, nodePort)
+}
+
+func isLoopback(host string) bool {
+       ip := net.ParseIP(host)
+       if ip == nil {
+               return true
+       }
+       return ip.IsLoopback()
+}
+
 func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
        nodes := c.slotNodes(slot)
        if len(nodes) > 0 {
@@ -495,10 +529,12 @@ func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
                        n := rand.Intn(len(nodes)-1) + 1
                        slave = nodes[n]
                        if !slave.Loading() {
-                               break
+                               return slave, nil
                        }
                }
-               return slave, nil
+
+               // All slaves are loading - use master.
+               return nodes[0], nil
        }
 }
 
@@ -542,23 +578,12 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
        return nil
 }
 
-func (c *clusterState) IsConsistent() bool {
-       if c.nodes.opt.ClusterSlots != nil {
-               return true
-       }
-       return len(c.Masters) <= len(c.Slaves)
-}
-
 //------------------------------------------------------------------------------
 
 type clusterStateHolder struct {
        load func() (*clusterState, error)
 
-       state atomic.Value
-
-       firstErrMu sync.RWMutex
-       firstErr   error
-
+       state     atomic.Value
        reloading uint32 // atomic
 }
 
@@ -569,24 +594,8 @@ func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder
 }
 
 func (c *clusterStateHolder) Reload() (*clusterState, error) {
-       state, err := c.reload()
-       if err != nil {
-               return nil, err
-       }
-       if !state.IsConsistent() {
-               time.AfterFunc(time.Second, c.LazyReload)
-       }
-       return state, nil
-}
-
-func (c *clusterStateHolder) reload() (*clusterState, error) {
        state, err := c.load()
        if err != nil {
-               c.firstErrMu.Lock()
-               if c.firstErr == nil {
-                       c.firstErr = err
-               }
-               c.firstErrMu.Unlock()
                return nil, err
        }
        c.state.Store(state)
@@ -600,16 +609,11 @@ func (c *clusterStateHolder) LazyReload() {
        go func() {
                defer atomic.StoreUint32(&c.reloading, 0)
 
-               for {
-                       state, err := c.reload()
-                       if err != nil {
-                               return
-                       }
-                       time.Sleep(100 * time.Millisecond)
-                       if state.IsConsistent() {
-                               return
-                       }
+               _, err := c.Reload()
+               if err != nil {
+                       return
                }
+               time.Sleep(100 * time.Millisecond)
        }()
 }
 
@@ -622,15 +626,7 @@ func (c *clusterStateHolder) Get() (*clusterState, error) {
                }
                return state, nil
        }
-
-       c.firstErrMu.RLock()
-       err := c.firstErr
-       c.firstErrMu.RUnlock()
-       if err != nil {
-               return nil, err
-       }
-
-       return nil, errors.New("redis: cluster has no state")
+       return c.Reload()
 }
 
 func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
@@ -678,10 +674,6 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
        c.processTxPipeline = c.defaultProcessTxPipeline
 
        c.init()
-
-       _, _ = c.state.Reload()
-       _, _ = c.cmdsInfoCache.Get()
-
        if opt.IdleCheckFrequency > 0 {
                go c.reaper(opt.IdleCheckFrequency)
        }
@@ -689,17 +681,17 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
        return c
 }
 
-// ReloadState reloads cluster state. It calls ClusterSlots func
+func (c *ClusterClient) init() {
+       c.cmdable.setProcessor(c.Process)
+}
+
+// ReloadState reloads cluster state. If available it calls ClusterSlots func
 // to get cluster slots information.
 func (c *ClusterClient) ReloadState() error {
        _, err := c.state.Reload()
        return err
 }
 
-func (c *ClusterClient) init() {
-       c.cmdable.setProcessor(c.Process)
-}
-
 func (c *ClusterClient) Context() context.Context {
        if c.ctx != nil {
                return c.ctx
@@ -780,6 +772,11 @@ func cmdSlot(cmd Cmder, pos int) int {
 }
 
 func (c *ClusterClient) cmdSlot(cmd Cmder) int {
+       args := cmd.Args()
+       if args[0] == "cluster" && args[1] == "getkeysinslot" {
+               return args[2].(int)
+       }
+
        cmdInfo := c.cmdInfo(cmd.Name())
        return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
 }
@@ -791,9 +788,9 @@ func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
        }
 
        cmdInfo := c.cmdInfo(cmd.Name())
-       slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
+       slot := c.cmdSlot(cmd)
 
-       if cmdInfo != nil && cmdInfo.ReadOnly && c.opt.ReadOnly {
+       if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
                if c.opt.RouteByLatency {
                        node, err := state.slotClosestNode(slot)
                        return slot, node, err
@@ -852,15 +849,12 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
                if err == nil {
                        break
                }
-
-               if internal.IsRetryableError(err, true) {
+               if err != Nil {
                        c.state.LazyReload()
-                       continue
                }
 
                moved, ask, addr := internal.IsMovedError(err)
                if moved || ask {
-                       c.state.LazyReload()
                        node, err = c.nodes.GetOrCreate(addr)
                        if err != nil {
                                return err
@@ -868,7 +862,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
                        continue
                }
 
-               if err == pool.ErrClosed {
+               if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
                        node, err = c.slotMasterNode(slot)
                        if err != nil {
                                return err
@@ -876,6 +870,10 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
                        continue
                }
 
+               if internal.IsRetryableError(err, true) {
+                       continue
+               }
+
                return err
        }
 
@@ -890,6 +888,13 @@ func (c *ClusterClient) Close() error {
        return c.nodes.Close()
 }
 
+// Do creates a Cmd from the args and processes the cmd.
+func (c *ClusterClient) Do(args ...interface{}) *Cmd {
+       cmd := NewCmd(args...)
+       c.Process(cmd)
+       return cmd
+}
+
 func (c *ClusterClient) WrapProcess(
        fn func(oldProcess func(Cmder) error) func(Cmder) error,
 ) {
@@ -933,26 +938,14 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
                if err == nil {
                        break
                }
+               if err != Nil {
+                       c.state.LazyReload()
+               }
 
-               // If slave is loading - read from master.
+               // If slave is loading - pick another node.
                if c.opt.ReadOnly && internal.IsLoadingError(err) {
                        node.MarkAsLoading()
-                       continue
-               }
-
-               if internal.IsRetryableError(err, true) {
-                       c.state.LazyReload()
-
-                       // First retry the same node.
-                       if attempt == 0 {
-                               continue
-                       }
-
-                       // Second try random node.
-                       node, err = c.nodes.Random()
-                       if err != nil {
-                               break
-                       }
+                       node = nil
                        continue
                }
 
@@ -960,8 +953,6 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
                var addr string
                moved, ask, addr = internal.IsMovedError(err)
                if moved || ask {
-                       c.state.LazyReload()
-
                        node, err = c.nodes.GetOrCreate(addr)
                        if err != nil {
                                break
@@ -969,11 +960,25 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
                        continue
                }
 
-               if err == pool.ErrClosed {
+               if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
                        node = nil
                        continue
                }
 
+               if internal.IsRetryableError(err, true) {
+                       // First retry the same node.
+                       if attempt == 0 {
+                               continue
+                       }
+
+                       // Second try random node.
+                       node, err = c.nodes.Random()
+                       if err != nil {
+                               break
+                       }
+                       continue
+               }
+
                break
        }
 
@@ -1101,7 +1106,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
                acc.Timeouts += s.Timeouts
 
                acc.TotalConns += s.TotalConns
-               acc.FreeConns += s.FreeConns
+               acc.IdleConns += s.IdleConns
                acc.StaleConns += s.StaleConns
        }
 
@@ -1112,7 +1117,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
                acc.Timeouts += s.Timeouts
 
                acc.TotalConns += s.TotalConns
-               acc.FreeConns += s.FreeConns
+               acc.IdleConns += s.IdleConns
                acc.StaleConns += s.StaleConns
        }
 
@@ -1196,7 +1201,8 @@ func (c *ClusterClient) WrapProcessPipeline(
 }
 
 func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
-       cmdsMap, err := c.mapCmdsByNode(cmds)
+       cmdsMap := newCmdsMap()
+       err := c.mapCmdsByNode(cmds, cmdsMap)
        if err != nil {
                setCmdsErr(cmds, err)
                return err
@@ -1207,44 +1213,57 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
                        time.Sleep(c.retryBackoff(attempt))
                }
 
-               failedCmds := make(map[*clusterNode][]Cmder)
+               failedCmds := newCmdsMap()
+               var wg sync.WaitGroup
 
-               for node, cmds := range cmdsMap {
-                       cn, err := node.Client.getConn()
-                       if err != nil {
-                               if err == pool.ErrClosed {
-                                       c.remapCmds(cmds, failedCmds)
-                               } else {
-                                       setCmdsErr(cmds, err)
+               for node, cmds := range cmdsMap.m {
+                       wg.Add(1)
+                       go func(node *clusterNode, cmds []Cmder) {
+                               defer wg.Done()
+
+                               cn, err := node.Client.getConn()
+                               if err != nil {
+                                       if err == pool.ErrClosed {
+                                               c.mapCmdsByNode(cmds, failedCmds)
+                                       } else {
+                                               setCmdsErr(cmds, err)
+                                       }
+                                       return
                                }
-                               continue
-                       }
 
-                       err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
-                       if err == nil || internal.IsRedisError(err) {
-                               node.Client.connPool.Put(cn)
-                       } else {
-                               node.Client.connPool.Remove(cn)
-                       }
+                               err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
+                               node.Client.releaseConnStrict(cn, err)
+                       }(node, cmds)
                }
 
-               if len(failedCmds) == 0 {
+               wg.Wait()
+               if len(failedCmds.m) == 0 {
                        break
                }
                cmdsMap = failedCmds
        }
 
-       return firstCmdsErr(cmds)
+       return cmdsFirstErr(cmds)
 }
 
-func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, error) {
+type cmdsMap struct {
+       mu sync.Mutex
+       m  map[*clusterNode][]Cmder
+}
+
+func newCmdsMap() *cmdsMap {
+       return &cmdsMap{
+               m: make(map[*clusterNode][]Cmder),
+       }
+}
+
+func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
        state, err := c.state.Get()
        if err != nil {
                setCmdsErr(cmds, err)
-               return nil, err
+               return err
        }
 
-       cmdsMap := make(map[*clusterNode][]Cmder)
        cmdsAreReadOnly := c.cmdsAreReadOnly(cmds)
        for _, cmd := range cmds {
                var node *clusterNode
@@ -1256,11 +1275,13 @@ func (c *ClusterClient) mapCmdsByNode(cmds []Cmder) (map[*clusterNode][]Cmder, e
                        node, err = state.slotMasterNode(slot)
                }
                if err != nil {
-                       return nil, err
+                       return err
                }
-               cmdsMap[node] = append(cmdsMap[node], cmd)
+               cmdsMap.mu.Lock()
+               cmdsMap.m[node] = append(cmdsMap.m[node], cmd)
+               cmdsMap.mu.Unlock()
        }
-       return cmdsMap, nil
+       return nil
 }
 
 func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
@@ -1273,41 +1294,32 @@ func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
        return true
 }
 
-func (c *ClusterClient) remapCmds(cmds []Cmder, failedCmds map[*clusterNode][]Cmder) {
-       remappedCmds, err := c.mapCmdsByNode(cmds)
-       if err != nil {
-               setCmdsErr(cmds, err)
-               return
-       }
-
-       for node, cmds := range remappedCmds {
-               failedCmds[node] = cmds
-       }
-}
-
 func (c *ClusterClient) pipelineProcessCmds(
-       node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+       node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
 ) error {
-       cn.SetWriteTimeout(c.opt.WriteTimeout)
-
-       err := writeCmd(cn, cmds...)
+       err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+               return writeCmd(wr, cmds...)
+       })
        if err != nil {
                setCmdsErr(cmds, err)
-               failedCmds[node] = cmds
+               failedCmds.mu.Lock()
+               failedCmds.m[node] = cmds
+               failedCmds.mu.Unlock()
                return err
        }
 
-       // Set read timeout for all commands.
-       cn.SetReadTimeout(c.opt.ReadTimeout)
-
-       return c.pipelineReadCmds(cn, cmds, failedCmds)
+       err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
+               return c.pipelineReadCmds(node, rd, cmds, failedCmds)
+       })
+       return err
 }
 
 func (c *ClusterClient) pipelineReadCmds(
-       cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+       node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
 ) error {
+       var firstErr error
        for _, cmd := range cmds {
-               err := cmd.readReply(cn)
+               err := cmd.readReply(rd)
                if err == nil {
                        continue
                }
@@ -1320,13 +1332,18 @@ func (c *ClusterClient) pipelineReadCmds(
                        continue
                }
 
-               return err
+               failedCmds.mu.Lock()
+               failedCmds.m[node] = append(failedCmds.m[node], cmd)
+               failedCmds.mu.Unlock()
+               if firstErr == nil {
+                       firstErr = err
+               }
        }
-       return nil
+       return firstErr
 }
 
 func (c *ClusterClient) checkMovedErr(
-       cmd Cmder, err error, failedCmds map[*clusterNode][]Cmder,
+       cmd Cmder, err error, failedCmds *cmdsMap,
 ) bool {
        moved, ask, addr := internal.IsMovedError(err)
 
@@ -1338,7 +1355,9 @@ func (c *ClusterClient) checkMovedErr(
                        return false
                }
 
-               failedCmds[node] = append(failedCmds[node], cmd)
+               failedCmds.mu.Lock()
+               failedCmds.m[node] = append(failedCmds.m[node], cmd)
+               failedCmds.mu.Unlock()
                return true
        }
 
@@ -1348,7 +1367,9 @@ func (c *ClusterClient) checkMovedErr(
                        return false
                }
 
-               failedCmds[node] = append(failedCmds[node], NewCmd("ASKING"), cmd)
+               failedCmds.mu.Lock()
+               failedCmds.m[node] = append(failedCmds.m[node], NewCmd("ASKING"), cmd)
+               failedCmds.mu.Unlock()
                return true
        }
 
@@ -1388,35 +1409,38 @@ func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
                                time.Sleep(c.retryBackoff(attempt))
                        }
 
-                       failedCmds := make(map[*clusterNode][]Cmder)
+                       failedCmds := newCmdsMap()
+                       var wg sync.WaitGroup
 
                        for node, cmds := range cmdsMap {
-                               cn, err := node.Client.getConn()
-                               if err != nil {
-                                       if err == pool.ErrClosed {
-                                               c.remapCmds(cmds, failedCmds)
-                                       } else {
-                                               setCmdsErr(cmds, err)
+                               wg.Add(1)
+                               go func(node *clusterNode, cmds []Cmder) {
+                                       defer wg.Done()
+
+                                       cn, err := node.Client.getConn()
+                                       if err != nil {
+                                               if err == pool.ErrClosed {
+                                                       c.mapCmdsByNode(cmds, failedCmds)
+                                               } else {
+                                                       setCmdsErr(cmds, err)
+                                               }
+                                               return
                                        }
-                                       continue
-                               }
 
-                               err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
-                               if err == nil || internal.IsRedisError(err) {
-                                       node.Client.connPool.Put(cn)
-                               } else {
-                                       node.Client.connPool.Remove(cn)
-                               }
+                                       err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
+                                       node.Client.releaseConnStrict(cn, err)
+                               }(node, cmds)
                        }
 
-                       if len(failedCmds) == 0 {
+                       wg.Wait()
+                       if len(failedCmds.m) == 0 {
                                break
                        }
-                       cmdsMap = failedCmds
+                       cmdsMap = failedCmds.m
                }
        }
 
-       return firstCmdsErr(cmds)
+       return cmdsFirstErr(cmds)
 }
 
 func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
@@ -1429,37 +1453,41 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
 }
 
 func (c *ClusterClient) txPipelineProcessCmds(
-       node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+       node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
 ) error {
-       cn.SetWriteTimeout(c.opt.WriteTimeout)
-       if err := txPipelineWriteMulti(cn, cmds); err != nil {
-               setCmdsErr(cmds, err)
-               failedCmds[node] = cmds
-               return err
-       }
-
-       // Set read timeout for all commands.
-       cn.SetReadTimeout(c.opt.ReadTimeout)
-
-       if err := c.txPipelineReadQueued(cn, cmds, failedCmds); err != nil {
+       err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
+               return txPipelineWriteMulti(wr, cmds)
+       })
+       if err != nil {
                setCmdsErr(cmds, err)
+               failedCmds.mu.Lock()
+               failedCmds.m[node] = cmds
+               failedCmds.mu.Unlock()
                return err
        }
 
-       return pipelineReadCmds(cn, cmds)
+       err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
+               err := c.txPipelineReadQueued(rd, cmds, failedCmds)
+               if err != nil {
+                       setCmdsErr(cmds, err)
+                       return err
+               }
+               return pipelineReadCmds(rd, cmds)
+       })
+       return err
 }
 
 func (c *ClusterClient) txPipelineReadQueued(
-       cn *pool.Conn, cmds []Cmder, failedCmds map[*clusterNode][]Cmder,
+       rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
 ) error {
        // Parse queued replies.
        var statusCmd StatusCmd
-       if err := statusCmd.readReply(cn); err != nil {
+       if err := statusCmd.readReply(rd); err != nil {
                return err
        }
 
        for _, cmd := range cmds {
-               err := statusCmd.readReply(cn)
+               err := statusCmd.readReply(rd)
                if err == nil {
                        continue
                }
@@ -1472,7 +1500,7 @@ func (c *ClusterClient) txPipelineReadQueued(
        }
 
        // Parse number of replies.
-       line, err := cn.Rd.ReadLine()
+       line, err := rd.ReadLine()
        if err != nil {
                if err == Nil {
                        err = TxFailedErr
@@ -1499,40 +1527,46 @@ func (c *ClusterClient) txPipelineReadQueued(
        return nil
 }
 
-func (c *ClusterClient) pubSub(channels []string) *PubSub {
+func (c *ClusterClient) pubSub() *PubSub {
        var node *clusterNode
        pubsub := &PubSub{
                opt: c.opt.clientOptions(),
 
                newConn: func(channels []string) (*pool.Conn, error) {
-                       if node == nil {
-                               var slot int
-                               if len(channels) > 0 {
-                                       slot = hashtag.Slot(channels[0])
-                               } else {
-                                       slot = -1
-                               }
+                       if node != nil {
+                               panic("node != nil")
+                       }
 
-                               masterNode, err := c.slotMasterNode(slot)
-                               if err != nil {
-                                       return nil, err
-                               }
-                               node = masterNode
+                       slot := hashtag.Slot(channels[0])
+
+                       var err error
+                       node, err = c.slotMasterNode(slot)
+                       if err != nil {
+                               return nil, err
                        }
-                       return node.Client.newConn()
+
+                       cn, err := node.Client.newConn()
+                       if err != nil {
+                               return nil, err
+                       }
+
+                       return cn, nil
                },
                closeConn: func(cn *pool.Conn) error {
-                       return node.Client.connPool.CloseConn(cn)
+                       err := node.Client.connPool.CloseConn(cn)
+                       node = nil
+                       return err
                },
        }
        pubsub.init()
+
        return pubsub
 }
 
 // Subscribe subscribes the client to the specified channels.
 // Channels can be omitted to create empty subscription.
 func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
-       pubsub := c.pubSub(channels)
+       pubsub := c.pubSub()
        if len(channels) > 0 {
                _ = pubsub.Subscribe(channels...)
        }
@@ -1542,50 +1576,13 @@ func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
 // PSubscribe subscribes the client to the given patterns.
 // Patterns can be omitted to create empty subscription.
 func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
-       pubsub := c.pubSub(channels)
+       pubsub := c.pubSub()
        if len(channels) > 0 {
                _ = pubsub.PSubscribe(channels...)
        }
        return pubsub
 }
 
-func useOriginAddr(originAddr, nodeAddr string) bool {
-       nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
-       if err != nil {
-               return false
-       }
-
-       nodeIP := net.ParseIP(nodeHost)
-       if nodeIP == nil {
-               return false
-       }
-
-       if !nodeIP.IsLoopback() {
-               return false
-       }
-
-       _, originPort, err := net.SplitHostPort(originAddr)
-       if err != nil {
-               return false
-       }
-
-       return nodePort == originPort
-}
-
-func isLoopbackAddr(addr string) bool {
-       host, _, err := net.SplitHostPort(addr)
-       if err != nil {
-               return false
-       }
-
-       ip := net.ParseIP(host)
-       if ip == nil {
-               return false
-       }
-
-       return ip.IsLoopback()
-}
-
 func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
        for _, n := range nodes {
                if n == node {