16 "github.com/go-redis/redis/internal"
17 "github.com/go-redis/redis/internal/hashtag"
18 "github.com/go-redis/redis/internal/pool"
19 "github.com/go-redis/redis/internal/proto"
22 var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
24 // ClusterOptions are used to configure a cluster client and should be
25 // passed to NewClusterClient.
26 type ClusterOptions struct {
27 // A seed list of host:port addresses of cluster nodes.
30 // The maximum number of retries before giving up. Command is retried
31 // on network errors and MOVED/ASK redirects.
32 // Default is 8 retries.
35 // Enables read-only commands on slave nodes.
37 // Allows routing read-only commands to the closest master or slave node.
38 // It automatically enables ReadOnly.
40 // Allows routing read-only commands to the random master or slave node.
41 // It automatically enables ReadOnly.
44 // Optional function that returns cluster slots information.
45 // It is useful to manually create cluster of standalone Redis servers
46 // and load-balance read/write operations between master and slaves.
47 // It can use service like ZooKeeper to maintain configuration information
48 // and Cluster.ReloadState to manually trigger state reloading.
49 ClusterSlots func() ([]ClusterSlot, error)
51 // Optional hook that is called when a new node is created.
52 OnNewNode func(*Client)
54 // Following options are copied from Options struct.
56 OnConnect func(*Conn) error
61 MinRetryBackoff time.Duration
62 MaxRetryBackoff time.Duration
64 DialTimeout time.Duration
65 ReadTimeout time.Duration
66 WriteTimeout time.Duration
68 // PoolSize applies per cluster node and not for the whole cluster.
71 MaxConnAge time.Duration
72 PoolTimeout time.Duration
73 IdleTimeout time.Duration
74 IdleCheckFrequency time.Duration
79 func (opt *ClusterOptions) init() {
80 if opt.MaxRedirects == -1 {
82 } else if opt.MaxRedirects == 0 {
86 if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil {
90 if opt.PoolSize == 0 {
91 opt.PoolSize = 5 * runtime.NumCPU()
94 switch opt.ReadTimeout {
98 opt.ReadTimeout = 3 * time.Second
100 switch opt.WriteTimeout {
104 opt.WriteTimeout = opt.ReadTimeout
107 switch opt.MinRetryBackoff {
109 opt.MinRetryBackoff = 0
111 opt.MinRetryBackoff = 8 * time.Millisecond
113 switch opt.MaxRetryBackoff {
115 opt.MaxRetryBackoff = 0
117 opt.MaxRetryBackoff = 512 * time.Millisecond
121 func (opt *ClusterOptions) clientOptions() *Options {
122 const disableIdleCheck = -1
125 OnConnect: opt.OnConnect,
127 MaxRetries: opt.MaxRetries,
128 MinRetryBackoff: opt.MinRetryBackoff,
129 MaxRetryBackoff: opt.MaxRetryBackoff,
130 Password: opt.Password,
131 readOnly: opt.ReadOnly,
133 DialTimeout: opt.DialTimeout,
134 ReadTimeout: opt.ReadTimeout,
135 WriteTimeout: opt.WriteTimeout,
137 PoolSize: opt.PoolSize,
138 MinIdleConns: opt.MinIdleConns,
139 MaxConnAge: opt.MaxConnAge,
140 PoolTimeout: opt.PoolTimeout,
141 IdleTimeout: opt.IdleTimeout,
142 IdleCheckFrequency: disableIdleCheck,
144 TLSConfig: opt.TLSConfig,
148 //------------------------------------------------------------------------------
150 type clusterNode struct {
153 latency uint32 // atomic
154 generation uint32 // atomic
155 loading uint32 // atomic
158 func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
159 opt := clOpt.clientOptions()
162 Client: NewClient(opt),
165 node.latency = math.MaxUint32
166 if clOpt.RouteByLatency {
167 go node.updateLatency()
170 if clOpt.OnNewNode != nil {
171 clOpt.OnNewNode(node.Client)
177 func (n *clusterNode) String() string {
178 return n.Client.String()
181 func (n *clusterNode) Close() error {
182 return n.Client.Close()
185 func (n *clusterNode) updateLatency() {
189 for i := 0; i < probes; i++ {
192 probe := uint32(time.Since(start) / time.Microsecond)
193 latency = (latency + probe) / 2
195 atomic.StoreUint32(&n.latency, latency)
198 func (n *clusterNode) Latency() time.Duration {
199 latency := atomic.LoadUint32(&n.latency)
200 return time.Duration(latency) * time.Microsecond
203 func (n *clusterNode) MarkAsLoading() {
204 atomic.StoreUint32(&n.loading, uint32(time.Now().Unix()))
207 func (n *clusterNode) Loading() bool {
208 const minute = int64(time.Minute / time.Second)
210 loading := atomic.LoadUint32(&n.loading)
214 if time.Now().Unix()-int64(loading) < minute {
217 atomic.StoreUint32(&n.loading, 0)
221 func (n *clusterNode) Generation() uint32 {
222 return atomic.LoadUint32(&n.generation)
225 func (n *clusterNode) SetGeneration(gen uint32) {
227 v := atomic.LoadUint32(&n.generation)
228 if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) {
234 //------------------------------------------------------------------------------
236 type clusterNodes struct {
241 allNodes map[string]*clusterNode
242 clusterAddrs []string
245 _generation uint32 // atomic
248 func newClusterNodes(opt *ClusterOptions) *clusterNodes {
249 return &clusterNodes{
253 allNodes: make(map[string]*clusterNode),
257 func (c *clusterNodes) Close() error {
267 for _, node := range c.allNodes {
268 if err := node.Client.Close(); err != nil && firstErr == nil {
279 func (c *clusterNodes) Addrs() ([]string, error) {
284 if len(c.clusterAddrs) > 0 {
285 addrs = c.clusterAddrs
293 return nil, pool.ErrClosed
296 return nil, errClusterNoNodes
301 func (c *clusterNodes) NextGeneration() uint32 {
302 return atomic.AddUint32(&c._generation, 1)
305 // GC removes unused nodes.
306 func (c *clusterNodes) GC(generation uint32) {
307 var collected []*clusterNode
309 for addr, node := range c.allNodes {
310 if node.Generation() >= generation {
314 c.clusterAddrs = remove(c.clusterAddrs, addr)
315 delete(c.allNodes, addr)
316 collected = append(collected, node)
320 for _, node := range collected {
321 _ = node.Client.Close()
325 func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
326 var node *clusterNode
332 node = c.allNodes[addr]
338 func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
339 node, err := c.Get(addr)
351 return nil, pool.ErrClosed
354 node, ok := c.allNodes[addr]
359 node = newClusterNode(c.opt, addr)
361 c.allAddrs = appendIfNotExists(c.allAddrs, addr)
362 c.clusterAddrs = append(c.clusterAddrs, addr)
363 c.allNodes[addr] = node
368 func (c *clusterNodes) All() ([]*clusterNode, error) {
373 return nil, pool.ErrClosed
376 cp := make([]*clusterNode, 0, len(c.allNodes))
377 for _, node := range c.allNodes {
378 cp = append(cp, node)
383 func (c *clusterNodes) Random() (*clusterNode, error) {
384 addrs, err := c.Addrs()
389 n := rand.Intn(len(addrs))
390 return c.GetOrCreate(addrs[n])
393 //------------------------------------------------------------------------------
395 type clusterSlot struct {
400 type clusterSlotSlice []*clusterSlot
402 func (p clusterSlotSlice) Len() int {
406 func (p clusterSlotSlice) Less(i, j int) bool {
407 return p[i].start < p[j].start
410 func (p clusterSlotSlice) Swap(i, j int) {
411 p[i], p[j] = p[j], p[i]
414 type clusterState struct {
416 Masters []*clusterNode
417 Slaves []*clusterNode
425 func newClusterState(
426 nodes *clusterNodes, slots []ClusterSlot, origin string,
427 ) (*clusterState, error) {
431 slots: make([]*clusterSlot, 0, len(slots)),
433 generation: nodes.NextGeneration(),
434 createdAt: time.Now(),
437 originHost, _, _ := net.SplitHostPort(origin)
438 isLoopbackOrigin := isLoopback(originHost)
440 for _, slot := range slots {
441 var nodes []*clusterNode
442 for i, slotNode := range slot.Nodes {
443 addr := slotNode.Addr
444 if !isLoopbackOrigin {
445 addr = replaceLoopbackHost(addr, originHost)
448 node, err := c.nodes.GetOrCreate(addr)
453 node.SetGeneration(c.generation)
454 nodes = append(nodes, node)
457 c.Masters = appendUniqueNode(c.Masters, node)
459 c.Slaves = appendUniqueNode(c.Slaves, node)
463 c.slots = append(c.slots, &clusterSlot{
470 sort.Sort(clusterSlotSlice(c.slots))
472 time.AfterFunc(time.Minute, func() {
473 nodes.GC(c.generation)
479 func replaceLoopbackHost(nodeAddr, originHost string) string {
480 nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
485 nodeIP := net.ParseIP(nodeHost)
490 if !nodeIP.IsLoopback() {
494 // Use origin host which is not loopback and node port.
495 return net.JoinHostPort(originHost, nodePort)
498 func isLoopback(host string) bool {
499 ip := net.ParseIP(host)
503 return ip.IsLoopback()
506 func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
507 nodes := c.slotNodes(slot)
511 return c.nodes.Random()
514 func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
515 nodes := c.slotNodes(slot)
518 return c.nodes.Random()
522 if slave := nodes[1]; !slave.Loading() {
527 var slave *clusterNode
528 for i := 0; i < 10; i++ {
529 n := rand.Intn(len(nodes)-1) + 1
531 if !slave.Loading() {
536 // All slaves are loading - use master.
541 func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
542 const threshold = time.Millisecond
544 nodes := c.slotNodes(slot)
546 return c.nodes.Random()
549 var node *clusterNode
550 for _, n := range nodes {
554 if node == nil || node.Latency()-n.Latency() > threshold {
561 func (c *clusterState) slotRandomNode(slot int) *clusterNode {
562 nodes := c.slotNodes(slot)
563 n := rand.Intn(len(nodes))
567 func (c *clusterState) slotNodes(slot int) []*clusterNode {
568 i := sort.Search(len(c.slots), func(i int) bool {
569 return c.slots[i].end >= slot
571 if i >= len(c.slots) {
575 if slot >= x.start && slot <= x.end {
581 //------------------------------------------------------------------------------
583 type clusterStateHolder struct {
584 load func() (*clusterState, error)
587 reloading uint32 // atomic
590 func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder {
591 return &clusterStateHolder{
596 func (c *clusterStateHolder) Reload() (*clusterState, error) {
597 state, err := c.load()
605 func (c *clusterStateHolder) LazyReload() {
606 if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
610 defer atomic.StoreUint32(&c.reloading, 0)
616 time.Sleep(100 * time.Millisecond)
620 func (c *clusterStateHolder) Get() (*clusterState, error) {
623 state := v.(*clusterState)
624 if time.Since(state.createdAt) > time.Minute {
632 func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
633 state, err := c.Reload()
640 //------------------------------------------------------------------------------
642 // ClusterClient is a Redis Cluster client representing a pool of zero
643 // or more underlying connections. It's safe for concurrent use by
644 // multiple goroutines.
645 type ClusterClient struct {
652 state *clusterStateHolder
653 cmdsInfoCache *cmdsInfoCache
655 process func(Cmder) error
656 processPipeline func([]Cmder) error
657 processTxPipeline func([]Cmder) error
660 // NewClusterClient returns a Redis Cluster client as described in
661 // http://redis.io/topics/cluster-spec.
662 func NewClusterClient(opt *ClusterOptions) *ClusterClient {
667 nodes: newClusterNodes(opt),
669 c.state = newClusterStateHolder(c.loadState)
670 c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
672 c.process = c.defaultProcess
673 c.processPipeline = c.defaultProcessPipeline
674 c.processTxPipeline = c.defaultProcessTxPipeline
677 if opt.IdleCheckFrequency > 0 {
678 go c.reaper(opt.IdleCheckFrequency)
684 func (c *ClusterClient) init() {
685 c.cmdable.setProcessor(c.Process)
688 // ReloadState reloads cluster state. If available it calls ClusterSlots func
689 // to get cluster slots information.
690 func (c *ClusterClient) ReloadState() error {
691 _, err := c.state.Reload()
695 func (c *ClusterClient) Context() context.Context {
699 return context.Background()
702 func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
711 func (c *ClusterClient) copy() *ClusterClient {
717 // Options returns read-only Options that were used to create the client.
718 func (c *ClusterClient) Options() *ClusterOptions {
722 func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
723 return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
726 func (c *ClusterClient) cmdsInfo() (map[string]*CommandInfo, error) {
727 addrs, err := c.nodes.Addrs()
733 for _, addr := range addrs {
734 node, err := c.nodes.Get(addr)
742 info, err := node.Client.Command().Result()
753 func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
754 cmdsInfo, err := c.cmdsInfoCache.Get()
759 info := cmdsInfo[name]
761 internal.Logf("info for cmd=%s not found", name)
766 func cmdSlot(cmd Cmder, pos int) int {
768 return hashtag.RandomSlot()
770 firstKey := cmd.stringArg(pos)
771 return hashtag.Slot(firstKey)
774 func (c *ClusterClient) cmdSlot(cmd Cmder) int {
776 if args[0] == "cluster" && args[1] == "getkeysinslot" {
780 cmdInfo := c.cmdInfo(cmd.Name())
781 return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
784 func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
785 state, err := c.state.Get()
790 cmdInfo := c.cmdInfo(cmd.Name())
791 slot := c.cmdSlot(cmd)
793 if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
794 if c.opt.RouteByLatency {
795 node, err := state.slotClosestNode(slot)
796 return slot, node, err
799 if c.opt.RouteRandomly {
800 node := state.slotRandomNode(slot)
801 return slot, node, nil
804 node, err := state.slotSlaveNode(slot)
805 return slot, node, err
808 node, err := state.slotMasterNode(slot)
809 return slot, node, err
812 func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {
813 state, err := c.state.Get()
818 nodes := state.slotNodes(slot)
822 return c.nodes.Random()
825 func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
827 return fmt.Errorf("redis: Watch requires at least one key")
830 slot := hashtag.Slot(keys[0])
831 for _, key := range keys[1:] {
832 if hashtag.Slot(key) != slot {
833 err := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
838 node, err := c.slotMasterNode(slot)
843 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
845 time.Sleep(c.retryBackoff(attempt))
848 err = node.Client.Watch(fn, keys...)
856 moved, ask, addr := internal.IsMovedError(err)
858 node, err = c.nodes.GetOrCreate(addr)
865 if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
866 node, err = c.slotMasterNode(slot)
873 if internal.IsRetryableError(err, true) {
883 // Close closes the cluster client, releasing any open resources.
885 // It is rare to Close a ClusterClient, as the ClusterClient is meant
886 // to be long-lived and shared between many goroutines.
887 func (c *ClusterClient) Close() error {
888 return c.nodes.Close()
891 // Do creates a Cmd from the args and processes the cmd.
892 func (c *ClusterClient) Do(args ...interface{}) *Cmd {
893 cmd := NewCmd(args...)
898 func (c *ClusterClient) WrapProcess(
899 fn func(oldProcess func(Cmder) error) func(Cmder) error,
901 c.process = fn(c.process)
904 func (c *ClusterClient) Process(cmd Cmder) error {
905 return c.process(cmd)
908 func (c *ClusterClient) defaultProcess(cmd Cmder) error {
909 var node *clusterNode
911 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
913 time.Sleep(c.retryBackoff(attempt))
918 _, node, err = c.cmdSlotAndNode(cmd)
927 pipe := node.Client.Pipeline()
928 _ = pipe.Process(NewCmd("ASKING"))
929 _ = pipe.Process(cmd)
934 err = node.Client.Process(cmd)
937 // If there is no error - we are done.
945 // If slave is loading - pick another node.
946 if c.opt.ReadOnly && internal.IsLoadingError(err) {
954 moved, ask, addr = internal.IsMovedError(err)
956 node, err = c.nodes.GetOrCreate(addr)
963 if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
968 if internal.IsRetryableError(err, true) {
969 // First retry the same node.
974 // Second try random node.
975 node, err = c.nodes.Random()
988 // ForEachMaster concurrently calls the fn on each master node in the cluster.
989 // It returns the first error if any.
990 func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
991 state, err := c.state.ReloadOrGet()
996 var wg sync.WaitGroup
997 errCh := make(chan error, 1)
998 for _, master := range state.Masters {
1000 go func(node *clusterNode) {
1002 err := fn(node.Client)
1014 case err := <-errCh:
1021 // ForEachSlave concurrently calls the fn on each slave node in the cluster.
1022 // It returns the first error if any.
1023 func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
1024 state, err := c.state.ReloadOrGet()
1029 var wg sync.WaitGroup
1030 errCh := make(chan error, 1)
1031 for _, slave := range state.Slaves {
1033 go func(node *clusterNode) {
1035 err := fn(node.Client)
1047 case err := <-errCh:
1054 // ForEachNode concurrently calls the fn on each known node in the cluster.
1055 // It returns the first error if any.
1056 func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
1057 state, err := c.state.ReloadOrGet()
1062 var wg sync.WaitGroup
1063 errCh := make(chan error, 1)
1064 worker := func(node *clusterNode) {
1066 err := fn(node.Client)
1075 for _, node := range state.Masters {
1079 for _, node := range state.Slaves {
1086 case err := <-errCh:
1093 // PoolStats returns accumulated connection pool stats.
1094 func (c *ClusterClient) PoolStats() *PoolStats {
1097 state, _ := c.state.Get()
1102 for _, node := range state.Masters {
1103 s := node.Client.connPool.Stats()
1105 acc.Misses += s.Misses
1106 acc.Timeouts += s.Timeouts
1108 acc.TotalConns += s.TotalConns
1109 acc.IdleConns += s.IdleConns
1110 acc.StaleConns += s.StaleConns
1113 for _, node := range state.Slaves {
1114 s := node.Client.connPool.Stats()
1116 acc.Misses += s.Misses
1117 acc.Timeouts += s.Timeouts
1119 acc.TotalConns += s.TotalConns
1120 acc.IdleConns += s.IdleConns
1121 acc.StaleConns += s.StaleConns
1127 func (c *ClusterClient) loadState() (*clusterState, error) {
1128 if c.opt.ClusterSlots != nil {
1129 slots, err := c.opt.ClusterSlots()
1133 return newClusterState(c.nodes, slots, "")
1136 addrs, err := c.nodes.Addrs()
1142 for _, addr := range addrs {
1143 node, err := c.nodes.GetOrCreate(addr)
1145 if firstErr == nil {
1151 slots, err := node.Client.ClusterSlots().Result()
1153 if firstErr == nil {
1159 return newClusterState(c.nodes, slots, node.Client.opt.Addr)
1162 return nil, firstErr
1165 // reaper closes idle connections to the cluster.
1166 func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
1167 ticker := time.NewTicker(idleCheckFrequency)
1170 for range ticker.C {
1171 nodes, err := c.nodes.All()
1176 for _, node := range nodes {
1177 _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
1179 internal.Logf("ReapStaleConns failed: %s", err)
1185 func (c *ClusterClient) Pipeline() Pipeliner {
1187 exec: c.processPipeline,
1189 pipe.statefulCmdable.setProcessor(pipe.Process)
1193 func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
1194 return c.Pipeline().Pipelined(fn)
1197 func (c *ClusterClient) WrapProcessPipeline(
1198 fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
1200 c.processPipeline = fn(c.processPipeline)
1203 func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
1204 cmdsMap := newCmdsMap()
1205 err := c.mapCmdsByNode(cmds, cmdsMap)
1207 setCmdsErr(cmds, err)
1211 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1213 time.Sleep(c.retryBackoff(attempt))
1216 failedCmds := newCmdsMap()
1217 var wg sync.WaitGroup
1219 for node, cmds := range cmdsMap.m {
1221 go func(node *clusterNode, cmds []Cmder) {
1224 cn, err := node.Client.getConn()
1226 if err == pool.ErrClosed {
1227 c.mapCmdsByNode(cmds, failedCmds)
1229 setCmdsErr(cmds, err)
1234 err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
1235 node.Client.releaseConnStrict(cn, err)
1240 if len(failedCmds.m) == 0 {
1243 cmdsMap = failedCmds
1246 return cmdsFirstErr(cmds)
1249 type cmdsMap struct {
1251 m map[*clusterNode][]Cmder
1254 func newCmdsMap() *cmdsMap {
1256 m: make(map[*clusterNode][]Cmder),
1260 func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
1261 state, err := c.state.Get()
1263 setCmdsErr(cmds, err)
1267 cmdsAreReadOnly := c.cmdsAreReadOnly(cmds)
1268 for _, cmd := range cmds {
1269 var node *clusterNode
1271 if cmdsAreReadOnly {
1272 _, node, err = c.cmdSlotAndNode(cmd)
1274 slot := c.cmdSlot(cmd)
1275 node, err = state.slotMasterNode(slot)
1281 cmdsMap.m[node] = append(cmdsMap.m[node], cmd)
1287 func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
1288 for _, cmd := range cmds {
1289 cmdInfo := c.cmdInfo(cmd.Name())
1290 if cmdInfo == nil || !cmdInfo.ReadOnly {
1297 func (c *ClusterClient) pipelineProcessCmds(
1298 node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
1300 err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
1301 return writeCmd(wr, cmds...)
1304 setCmdsErr(cmds, err)
1305 failedCmds.mu.Lock()
1306 failedCmds.m[node] = cmds
1307 failedCmds.mu.Unlock()
1311 err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
1312 return c.pipelineReadCmds(node, rd, cmds, failedCmds)
1317 func (c *ClusterClient) pipelineReadCmds(
1318 node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
1321 for _, cmd := range cmds {
1322 err := cmd.readReply(rd)
1327 if c.checkMovedErr(cmd, err, failedCmds) {
1331 if internal.IsRedisError(err) {
1335 failedCmds.mu.Lock()
1336 failedCmds.m[node] = append(failedCmds.m[node], cmd)
1337 failedCmds.mu.Unlock()
1338 if firstErr == nil {
1345 func (c *ClusterClient) checkMovedErr(
1346 cmd Cmder, err error, failedCmds *cmdsMap,
1348 moved, ask, addr := internal.IsMovedError(err)
1351 c.state.LazyReload()
1353 node, err := c.nodes.GetOrCreate(addr)
1358 failedCmds.mu.Lock()
1359 failedCmds.m[node] = append(failedCmds.m[node], cmd)
1360 failedCmds.mu.Unlock()
1365 node, err := c.nodes.GetOrCreate(addr)
1370 failedCmds.mu.Lock()
1371 failedCmds.m[node] = append(failedCmds.m[node], NewCmd("ASKING"), cmd)
1372 failedCmds.mu.Unlock()
1379 // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
1380 func (c *ClusterClient) TxPipeline() Pipeliner {
1382 exec: c.processTxPipeline,
1384 pipe.statefulCmdable.setProcessor(pipe.Process)
1388 func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
1389 return c.TxPipeline().Pipelined(fn)
1392 func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
1393 state, err := c.state.Get()
1398 cmdsMap := c.mapCmdsBySlot(cmds)
1399 for slot, cmds := range cmdsMap {
1400 node, err := state.slotMasterNode(slot)
1402 setCmdsErr(cmds, err)
1405 cmdsMap := map[*clusterNode][]Cmder{node: cmds}
1407 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1409 time.Sleep(c.retryBackoff(attempt))
1412 failedCmds := newCmdsMap()
1413 var wg sync.WaitGroup
1415 for node, cmds := range cmdsMap {
1417 go func(node *clusterNode, cmds []Cmder) {
1420 cn, err := node.Client.getConn()
1422 if err == pool.ErrClosed {
1423 c.mapCmdsByNode(cmds, failedCmds)
1425 setCmdsErr(cmds, err)
1430 err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
1431 node.Client.releaseConnStrict(cn, err)
1436 if len(failedCmds.m) == 0 {
1439 cmdsMap = failedCmds.m
1443 return cmdsFirstErr(cmds)
1446 func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
1447 cmdsMap := make(map[int][]Cmder)
1448 for _, cmd := range cmds {
1449 slot := c.cmdSlot(cmd)
1450 cmdsMap[slot] = append(cmdsMap[slot], cmd)
1455 func (c *ClusterClient) txPipelineProcessCmds(
1456 node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
1458 err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
1459 return txPipelineWriteMulti(wr, cmds)
1462 setCmdsErr(cmds, err)
1463 failedCmds.mu.Lock()
1464 failedCmds.m[node] = cmds
1465 failedCmds.mu.Unlock()
1469 err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
1470 err := c.txPipelineReadQueued(rd, cmds, failedCmds)
1472 setCmdsErr(cmds, err)
1475 return pipelineReadCmds(rd, cmds)
1480 func (c *ClusterClient) txPipelineReadQueued(
1481 rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
1483 // Parse queued replies.
1484 var statusCmd StatusCmd
1485 if err := statusCmd.readReply(rd); err != nil {
1489 for _, cmd := range cmds {
1490 err := statusCmd.readReply(rd)
1495 if c.checkMovedErr(cmd, err, failedCmds) || internal.IsRedisError(err) {
1502 // Parse number of replies.
1503 line, err := rd.ReadLine()
1512 case proto.ErrorReply:
1513 err := proto.ParseErrorReply(line)
1514 for _, cmd := range cmds {
1515 if !c.checkMovedErr(cmd, err, failedCmds) {
1520 case proto.ArrayReply:
1523 err := fmt.Errorf("redis: expected '*', but got line %q", line)
1530 func (c *ClusterClient) pubSub() *PubSub {
1531 var node *clusterNode
1533 opt: c.opt.clientOptions(),
1535 newConn: func(channels []string) (*pool.Conn, error) {
1537 panic("node != nil")
1540 slot := hashtag.Slot(channels[0])
1543 node, err = c.slotMasterNode(slot)
1548 cn, err := node.Client.newConn()
1555 closeConn: func(cn *pool.Conn) error {
1556 err := node.Client.connPool.CloseConn(cn)
1566 // Subscribe subscribes the client to the specified channels.
1567 // Channels can be omitted to create empty subscription.
1568 func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
1569 pubsub := c.pubSub()
1570 if len(channels) > 0 {
1571 _ = pubsub.Subscribe(channels...)
1576 // PSubscribe subscribes the client to the given patterns.
1577 // Patterns can be omitted to create empty subscription.
1578 func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
1579 pubsub := c.pubSub()
1580 if len(channels) > 0 {
1581 _ = pubsub.PSubscribe(channels...)
1586 func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
1587 for _, n := range nodes {
1592 return append(nodes, node)
1595 func appendIfNotExists(ss []string, es ...string) []string {
1597 for _, e := range es {
1598 for _, s := range ss {
1608 func remove(ss []string, es ...string) []string {
1612 for _, e := range es {
1613 for i, s := range ss {
1615 ss = append(ss[:i], ss[i+1:]...)