barometer: update DMA's vendoring packages
[barometer.git] / src / dma / vendor / github.com / go-redis / redis / cluster.go
1 package redis
2
3 import (
4         "context"
5         "crypto/tls"
6         "fmt"
7         "math"
8         "math/rand"
9         "net"
10         "runtime"
11         "sort"
12         "sync"
13         "sync/atomic"
14         "time"
15
16         "github.com/go-redis/redis/internal"
17         "github.com/go-redis/redis/internal/hashtag"
18         "github.com/go-redis/redis/internal/pool"
19         "github.com/go-redis/redis/internal/proto"
20 )
21
22 var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
23
24 // ClusterOptions are used to configure a cluster client and should be
25 // passed to NewClusterClient.
26 type ClusterOptions struct {
27         // A seed list of host:port addresses of cluster nodes.
28         Addrs []string
29
30         // The maximum number of retries before giving up. Command is retried
31         // on network errors and MOVED/ASK redirects.
32         // Default is 8 retries.
33         MaxRedirects int
34
35         // Enables read-only commands on slave nodes.
36         ReadOnly bool
37         // Allows routing read-only commands to the closest master or slave node.
38         // It automatically enables ReadOnly.
39         RouteByLatency bool
40         // Allows routing read-only commands to the random master or slave node.
41         // It automatically enables ReadOnly.
42         RouteRandomly bool
43
44         // Optional function that returns cluster slots information.
45         // It is useful to manually create cluster of standalone Redis servers
46         // and load-balance read/write operations between master and slaves.
47         // It can use service like ZooKeeper to maintain configuration information
48         // and Cluster.ReloadState to manually trigger state reloading.
49         ClusterSlots func() ([]ClusterSlot, error)
50
51         // Optional hook that is called when a new node is created.
52         OnNewNode func(*Client)
53
54         // Following options are copied from Options struct.
55
56         OnConnect func(*Conn) error
57
58         Password string
59
60         MaxRetries      int
61         MinRetryBackoff time.Duration
62         MaxRetryBackoff time.Duration
63
64         DialTimeout  time.Duration
65         ReadTimeout  time.Duration
66         WriteTimeout time.Duration
67
68         // PoolSize applies per cluster node and not for the whole cluster.
69         PoolSize           int
70         MinIdleConns       int
71         MaxConnAge         time.Duration
72         PoolTimeout        time.Duration
73         IdleTimeout        time.Duration
74         IdleCheckFrequency time.Duration
75
76         TLSConfig *tls.Config
77 }
78
79 func (opt *ClusterOptions) init() {
80         if opt.MaxRedirects == -1 {
81                 opt.MaxRedirects = 0
82         } else if opt.MaxRedirects == 0 {
83                 opt.MaxRedirects = 8
84         }
85
86         if (opt.RouteByLatency || opt.RouteRandomly) && opt.ClusterSlots == nil {
87                 opt.ReadOnly = true
88         }
89
90         if opt.PoolSize == 0 {
91                 opt.PoolSize = 5 * runtime.NumCPU()
92         }
93
94         switch opt.ReadTimeout {
95         case -1:
96                 opt.ReadTimeout = 0
97         case 0:
98                 opt.ReadTimeout = 3 * time.Second
99         }
100         switch opt.WriteTimeout {
101         case -1:
102                 opt.WriteTimeout = 0
103         case 0:
104                 opt.WriteTimeout = opt.ReadTimeout
105         }
106
107         switch opt.MinRetryBackoff {
108         case -1:
109                 opt.MinRetryBackoff = 0
110         case 0:
111                 opt.MinRetryBackoff = 8 * time.Millisecond
112         }
113         switch opt.MaxRetryBackoff {
114         case -1:
115                 opt.MaxRetryBackoff = 0
116         case 0:
117                 opt.MaxRetryBackoff = 512 * time.Millisecond
118         }
119 }
120
121 func (opt *ClusterOptions) clientOptions() *Options {
122         const disableIdleCheck = -1
123
124         return &Options{
125                 OnConnect: opt.OnConnect,
126
127                 MaxRetries:      opt.MaxRetries,
128                 MinRetryBackoff: opt.MinRetryBackoff,
129                 MaxRetryBackoff: opt.MaxRetryBackoff,
130                 Password:        opt.Password,
131                 readOnly:        opt.ReadOnly,
132
133                 DialTimeout:  opt.DialTimeout,
134                 ReadTimeout:  opt.ReadTimeout,
135                 WriteTimeout: opt.WriteTimeout,
136
137                 PoolSize:           opt.PoolSize,
138                 MinIdleConns:       opt.MinIdleConns,
139                 MaxConnAge:         opt.MaxConnAge,
140                 PoolTimeout:        opt.PoolTimeout,
141                 IdleTimeout:        opt.IdleTimeout,
142                 IdleCheckFrequency: disableIdleCheck,
143
144                 TLSConfig: opt.TLSConfig,
145         }
146 }
147
148 //------------------------------------------------------------------------------
149
150 type clusterNode struct {
151         Client *Client
152
153         latency    uint32 // atomic
154         generation uint32 // atomic
155         loading    uint32 // atomic
156 }
157
158 func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
159         opt := clOpt.clientOptions()
160         opt.Addr = addr
161         node := clusterNode{
162                 Client: NewClient(opt),
163         }
164
165         node.latency = math.MaxUint32
166         if clOpt.RouteByLatency {
167                 go node.updateLatency()
168         }
169
170         if clOpt.OnNewNode != nil {
171                 clOpt.OnNewNode(node.Client)
172         }
173
174         return &node
175 }
176
177 func (n *clusterNode) String() string {
178         return n.Client.String()
179 }
180
181 func (n *clusterNode) Close() error {
182         return n.Client.Close()
183 }
184
185 func (n *clusterNode) updateLatency() {
186         const probes = 10
187
188         var latency uint32
189         for i := 0; i < probes; i++ {
190                 start := time.Now()
191                 n.Client.Ping()
192                 probe := uint32(time.Since(start) / time.Microsecond)
193                 latency = (latency + probe) / 2
194         }
195         atomic.StoreUint32(&n.latency, latency)
196 }
197
198 func (n *clusterNode) Latency() time.Duration {
199         latency := atomic.LoadUint32(&n.latency)
200         return time.Duration(latency) * time.Microsecond
201 }
202
203 func (n *clusterNode) MarkAsLoading() {
204         atomic.StoreUint32(&n.loading, uint32(time.Now().Unix()))
205 }
206
207 func (n *clusterNode) Loading() bool {
208         const minute = int64(time.Minute / time.Second)
209
210         loading := atomic.LoadUint32(&n.loading)
211         if loading == 0 {
212                 return false
213         }
214         if time.Now().Unix()-int64(loading) < minute {
215                 return true
216         }
217         atomic.StoreUint32(&n.loading, 0)
218         return false
219 }
220
221 func (n *clusterNode) Generation() uint32 {
222         return atomic.LoadUint32(&n.generation)
223 }
224
225 func (n *clusterNode) SetGeneration(gen uint32) {
226         for {
227                 v := atomic.LoadUint32(&n.generation)
228                 if gen < v || atomic.CompareAndSwapUint32(&n.generation, v, gen) {
229                         break
230                 }
231         }
232 }
233
234 //------------------------------------------------------------------------------
235
236 type clusterNodes struct {
237         opt *ClusterOptions
238
239         mu           sync.RWMutex
240         allAddrs     []string
241         allNodes     map[string]*clusterNode
242         clusterAddrs []string
243         closed       bool
244
245         _generation uint32 // atomic
246 }
247
248 func newClusterNodes(opt *ClusterOptions) *clusterNodes {
249         return &clusterNodes{
250                 opt: opt,
251
252                 allAddrs: opt.Addrs,
253                 allNodes: make(map[string]*clusterNode),
254         }
255 }
256
257 func (c *clusterNodes) Close() error {
258         c.mu.Lock()
259         defer c.mu.Unlock()
260
261         if c.closed {
262                 return nil
263         }
264         c.closed = true
265
266         var firstErr error
267         for _, node := range c.allNodes {
268                 if err := node.Client.Close(); err != nil && firstErr == nil {
269                         firstErr = err
270                 }
271         }
272
273         c.allNodes = nil
274         c.clusterAddrs = nil
275
276         return firstErr
277 }
278
279 func (c *clusterNodes) Addrs() ([]string, error) {
280         var addrs []string
281         c.mu.RLock()
282         closed := c.closed
283         if !closed {
284                 if len(c.clusterAddrs) > 0 {
285                         addrs = c.clusterAddrs
286                 } else {
287                         addrs = c.allAddrs
288                 }
289         }
290         c.mu.RUnlock()
291
292         if closed {
293                 return nil, pool.ErrClosed
294         }
295         if len(addrs) == 0 {
296                 return nil, errClusterNoNodes
297         }
298         return addrs, nil
299 }
300
301 func (c *clusterNodes) NextGeneration() uint32 {
302         return atomic.AddUint32(&c._generation, 1)
303 }
304
305 // GC removes unused nodes.
306 func (c *clusterNodes) GC(generation uint32) {
307         var collected []*clusterNode
308         c.mu.Lock()
309         for addr, node := range c.allNodes {
310                 if node.Generation() >= generation {
311                         continue
312                 }
313
314                 c.clusterAddrs = remove(c.clusterAddrs, addr)
315                 delete(c.allNodes, addr)
316                 collected = append(collected, node)
317         }
318         c.mu.Unlock()
319
320         for _, node := range collected {
321                 _ = node.Client.Close()
322         }
323 }
324
325 func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
326         var node *clusterNode
327         var err error
328         c.mu.RLock()
329         if c.closed {
330                 err = pool.ErrClosed
331         } else {
332                 node = c.allNodes[addr]
333         }
334         c.mu.RUnlock()
335         return node, err
336 }
337
338 func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
339         node, err := c.Get(addr)
340         if err != nil {
341                 return nil, err
342         }
343         if node != nil {
344                 return node, nil
345         }
346
347         c.mu.Lock()
348         defer c.mu.Unlock()
349
350         if c.closed {
351                 return nil, pool.ErrClosed
352         }
353
354         node, ok := c.allNodes[addr]
355         if ok {
356                 return node, err
357         }
358
359         node = newClusterNode(c.opt, addr)
360
361         c.allAddrs = appendIfNotExists(c.allAddrs, addr)
362         c.clusterAddrs = append(c.clusterAddrs, addr)
363         c.allNodes[addr] = node
364
365         return node, err
366 }
367
368 func (c *clusterNodes) All() ([]*clusterNode, error) {
369         c.mu.RLock()
370         defer c.mu.RUnlock()
371
372         if c.closed {
373                 return nil, pool.ErrClosed
374         }
375
376         cp := make([]*clusterNode, 0, len(c.allNodes))
377         for _, node := range c.allNodes {
378                 cp = append(cp, node)
379         }
380         return cp, nil
381 }
382
383 func (c *clusterNodes) Random() (*clusterNode, error) {
384         addrs, err := c.Addrs()
385         if err != nil {
386                 return nil, err
387         }
388
389         n := rand.Intn(len(addrs))
390         return c.GetOrCreate(addrs[n])
391 }
392
393 //------------------------------------------------------------------------------
394
395 type clusterSlot struct {
396         start, end int
397         nodes      []*clusterNode
398 }
399
400 type clusterSlotSlice []*clusterSlot
401
402 func (p clusterSlotSlice) Len() int {
403         return len(p)
404 }
405
406 func (p clusterSlotSlice) Less(i, j int) bool {
407         return p[i].start < p[j].start
408 }
409
410 func (p clusterSlotSlice) Swap(i, j int) {
411         p[i], p[j] = p[j], p[i]
412 }
413
414 type clusterState struct {
415         nodes   *clusterNodes
416         Masters []*clusterNode
417         Slaves  []*clusterNode
418
419         slots []*clusterSlot
420
421         generation uint32
422         createdAt  time.Time
423 }
424
425 func newClusterState(
426         nodes *clusterNodes, slots []ClusterSlot, origin string,
427 ) (*clusterState, error) {
428         c := clusterState{
429                 nodes: nodes,
430
431                 slots: make([]*clusterSlot, 0, len(slots)),
432
433                 generation: nodes.NextGeneration(),
434                 createdAt:  time.Now(),
435         }
436
437         originHost, _, _ := net.SplitHostPort(origin)
438         isLoopbackOrigin := isLoopback(originHost)
439
440         for _, slot := range slots {
441                 var nodes []*clusterNode
442                 for i, slotNode := range slot.Nodes {
443                         addr := slotNode.Addr
444                         if !isLoopbackOrigin {
445                                 addr = replaceLoopbackHost(addr, originHost)
446                         }
447
448                         node, err := c.nodes.GetOrCreate(addr)
449                         if err != nil {
450                                 return nil, err
451                         }
452
453                         node.SetGeneration(c.generation)
454                         nodes = append(nodes, node)
455
456                         if i == 0 {
457                                 c.Masters = appendUniqueNode(c.Masters, node)
458                         } else {
459                                 c.Slaves = appendUniqueNode(c.Slaves, node)
460                         }
461                 }
462
463                 c.slots = append(c.slots, &clusterSlot{
464                         start: slot.Start,
465                         end:   slot.End,
466                         nodes: nodes,
467                 })
468         }
469
470         sort.Sort(clusterSlotSlice(c.slots))
471
472         time.AfterFunc(time.Minute, func() {
473                 nodes.GC(c.generation)
474         })
475
476         return &c, nil
477 }
478
479 func replaceLoopbackHost(nodeAddr, originHost string) string {
480         nodeHost, nodePort, err := net.SplitHostPort(nodeAddr)
481         if err != nil {
482                 return nodeAddr
483         }
484
485         nodeIP := net.ParseIP(nodeHost)
486         if nodeIP == nil {
487                 return nodeAddr
488         }
489
490         if !nodeIP.IsLoopback() {
491                 return nodeAddr
492         }
493
494         // Use origin host which is not loopback and node port.
495         return net.JoinHostPort(originHost, nodePort)
496 }
497
498 func isLoopback(host string) bool {
499         ip := net.ParseIP(host)
500         if ip == nil {
501                 return true
502         }
503         return ip.IsLoopback()
504 }
505
506 func (c *clusterState) slotMasterNode(slot int) (*clusterNode, error) {
507         nodes := c.slotNodes(slot)
508         if len(nodes) > 0 {
509                 return nodes[0], nil
510         }
511         return c.nodes.Random()
512 }
513
514 func (c *clusterState) slotSlaveNode(slot int) (*clusterNode, error) {
515         nodes := c.slotNodes(slot)
516         switch len(nodes) {
517         case 0:
518                 return c.nodes.Random()
519         case 1:
520                 return nodes[0], nil
521         case 2:
522                 if slave := nodes[1]; !slave.Loading() {
523                         return slave, nil
524                 }
525                 return nodes[0], nil
526         default:
527                 var slave *clusterNode
528                 for i := 0; i < 10; i++ {
529                         n := rand.Intn(len(nodes)-1) + 1
530                         slave = nodes[n]
531                         if !slave.Loading() {
532                                 return slave, nil
533                         }
534                 }
535
536                 // All slaves are loading - use master.
537                 return nodes[0], nil
538         }
539 }
540
541 func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
542         const threshold = time.Millisecond
543
544         nodes := c.slotNodes(slot)
545         if len(nodes) == 0 {
546                 return c.nodes.Random()
547         }
548
549         var node *clusterNode
550         for _, n := range nodes {
551                 if n.Loading() {
552                         continue
553                 }
554                 if node == nil || node.Latency()-n.Latency() > threshold {
555                         node = n
556                 }
557         }
558         return node, nil
559 }
560
561 func (c *clusterState) slotRandomNode(slot int) *clusterNode {
562         nodes := c.slotNodes(slot)
563         n := rand.Intn(len(nodes))
564         return nodes[n]
565 }
566
567 func (c *clusterState) slotNodes(slot int) []*clusterNode {
568         i := sort.Search(len(c.slots), func(i int) bool {
569                 return c.slots[i].end >= slot
570         })
571         if i >= len(c.slots) {
572                 return nil
573         }
574         x := c.slots[i]
575         if slot >= x.start && slot <= x.end {
576                 return x.nodes
577         }
578         return nil
579 }
580
581 //------------------------------------------------------------------------------
582
583 type clusterStateHolder struct {
584         load func() (*clusterState, error)
585
586         state     atomic.Value
587         reloading uint32 // atomic
588 }
589
590 func newClusterStateHolder(fn func() (*clusterState, error)) *clusterStateHolder {
591         return &clusterStateHolder{
592                 load: fn,
593         }
594 }
595
596 func (c *clusterStateHolder) Reload() (*clusterState, error) {
597         state, err := c.load()
598         if err != nil {
599                 return nil, err
600         }
601         c.state.Store(state)
602         return state, nil
603 }
604
605 func (c *clusterStateHolder) LazyReload() {
606         if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
607                 return
608         }
609         go func() {
610                 defer atomic.StoreUint32(&c.reloading, 0)
611
612                 _, err := c.Reload()
613                 if err != nil {
614                         return
615                 }
616                 time.Sleep(100 * time.Millisecond)
617         }()
618 }
619
620 func (c *clusterStateHolder) Get() (*clusterState, error) {
621         v := c.state.Load()
622         if v != nil {
623                 state := v.(*clusterState)
624                 if time.Since(state.createdAt) > time.Minute {
625                         c.LazyReload()
626                 }
627                 return state, nil
628         }
629         return c.Reload()
630 }
631
632 func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
633         state, err := c.Reload()
634         if err == nil {
635                 return state, nil
636         }
637         return c.Get()
638 }
639
640 //------------------------------------------------------------------------------
641
642 // ClusterClient is a Redis Cluster client representing a pool of zero
643 // or more underlying connections. It's safe for concurrent use by
644 // multiple goroutines.
645 type ClusterClient struct {
646         cmdable
647
648         ctx context.Context
649
650         opt           *ClusterOptions
651         nodes         *clusterNodes
652         state         *clusterStateHolder
653         cmdsInfoCache *cmdsInfoCache
654
655         process           func(Cmder) error
656         processPipeline   func([]Cmder) error
657         processTxPipeline func([]Cmder) error
658 }
659
660 // NewClusterClient returns a Redis Cluster client as described in
661 // http://redis.io/topics/cluster-spec.
662 func NewClusterClient(opt *ClusterOptions) *ClusterClient {
663         opt.init()
664
665         c := &ClusterClient{
666                 opt:   opt,
667                 nodes: newClusterNodes(opt),
668         }
669         c.state = newClusterStateHolder(c.loadState)
670         c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
671
672         c.process = c.defaultProcess
673         c.processPipeline = c.defaultProcessPipeline
674         c.processTxPipeline = c.defaultProcessTxPipeline
675
676         c.init()
677         if opt.IdleCheckFrequency > 0 {
678                 go c.reaper(opt.IdleCheckFrequency)
679         }
680
681         return c
682 }
683
684 func (c *ClusterClient) init() {
685         c.cmdable.setProcessor(c.Process)
686 }
687
688 // ReloadState reloads cluster state. If available it calls ClusterSlots func
689 // to get cluster slots information.
690 func (c *ClusterClient) ReloadState() error {
691         _, err := c.state.Reload()
692         return err
693 }
694
695 func (c *ClusterClient) Context() context.Context {
696         if c.ctx != nil {
697                 return c.ctx
698         }
699         return context.Background()
700 }
701
702 func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
703         if ctx == nil {
704                 panic("nil context")
705         }
706         c2 := c.copy()
707         c2.ctx = ctx
708         return c2
709 }
710
711 func (c *ClusterClient) copy() *ClusterClient {
712         cp := *c
713         cp.init()
714         return &cp
715 }
716
717 // Options returns read-only Options that were used to create the client.
718 func (c *ClusterClient) Options() *ClusterOptions {
719         return c.opt
720 }
721
722 func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
723         return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
724 }
725
726 func (c *ClusterClient) cmdsInfo() (map[string]*CommandInfo, error) {
727         addrs, err := c.nodes.Addrs()
728         if err != nil {
729                 return nil, err
730         }
731
732         var firstErr error
733         for _, addr := range addrs {
734                 node, err := c.nodes.Get(addr)
735                 if err != nil {
736                         return nil, err
737                 }
738                 if node == nil {
739                         continue
740                 }
741
742                 info, err := node.Client.Command().Result()
743                 if err == nil {
744                         return info, nil
745                 }
746                 if firstErr == nil {
747                         firstErr = err
748                 }
749         }
750         return nil, firstErr
751 }
752
753 func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
754         cmdsInfo, err := c.cmdsInfoCache.Get()
755         if err != nil {
756                 return nil
757         }
758
759         info := cmdsInfo[name]
760         if info == nil {
761                 internal.Logf("info for cmd=%s not found", name)
762         }
763         return info
764 }
765
766 func cmdSlot(cmd Cmder, pos int) int {
767         if pos == 0 {
768                 return hashtag.RandomSlot()
769         }
770         firstKey := cmd.stringArg(pos)
771         return hashtag.Slot(firstKey)
772 }
773
774 func (c *ClusterClient) cmdSlot(cmd Cmder) int {
775         args := cmd.Args()
776         if args[0] == "cluster" && args[1] == "getkeysinslot" {
777                 return args[2].(int)
778         }
779
780         cmdInfo := c.cmdInfo(cmd.Name())
781         return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
782 }
783
784 func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
785         state, err := c.state.Get()
786         if err != nil {
787                 return 0, nil, err
788         }
789
790         cmdInfo := c.cmdInfo(cmd.Name())
791         slot := c.cmdSlot(cmd)
792
793         if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
794                 if c.opt.RouteByLatency {
795                         node, err := state.slotClosestNode(slot)
796                         return slot, node, err
797                 }
798
799                 if c.opt.RouteRandomly {
800                         node := state.slotRandomNode(slot)
801                         return slot, node, nil
802                 }
803
804                 node, err := state.slotSlaveNode(slot)
805                 return slot, node, err
806         }
807
808         node, err := state.slotMasterNode(slot)
809         return slot, node, err
810 }
811
812 func (c *ClusterClient) slotMasterNode(slot int) (*clusterNode, error) {
813         state, err := c.state.Get()
814         if err != nil {
815                 return nil, err
816         }
817
818         nodes := state.slotNodes(slot)
819         if len(nodes) > 0 {
820                 return nodes[0], nil
821         }
822         return c.nodes.Random()
823 }
824
825 func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
826         if len(keys) == 0 {
827                 return fmt.Errorf("redis: Watch requires at least one key")
828         }
829
830         slot := hashtag.Slot(keys[0])
831         for _, key := range keys[1:] {
832                 if hashtag.Slot(key) != slot {
833                         err := fmt.Errorf("redis: Watch requires all keys to be in the same slot")
834                         return err
835                 }
836         }
837
838         node, err := c.slotMasterNode(slot)
839         if err != nil {
840                 return err
841         }
842
843         for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
844                 if attempt > 0 {
845                         time.Sleep(c.retryBackoff(attempt))
846                 }
847
848                 err = node.Client.Watch(fn, keys...)
849                 if err == nil {
850                         break
851                 }
852                 if err != Nil {
853                         c.state.LazyReload()
854                 }
855
856                 moved, ask, addr := internal.IsMovedError(err)
857                 if moved || ask {
858                         node, err = c.nodes.GetOrCreate(addr)
859                         if err != nil {
860                                 return err
861                         }
862                         continue
863                 }
864
865                 if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
866                         node, err = c.slotMasterNode(slot)
867                         if err != nil {
868                                 return err
869                         }
870                         continue
871                 }
872
873                 if internal.IsRetryableError(err, true) {
874                         continue
875                 }
876
877                 return err
878         }
879
880         return err
881 }
882
883 // Close closes the cluster client, releasing any open resources.
884 //
885 // It is rare to Close a ClusterClient, as the ClusterClient is meant
886 // to be long-lived and shared between many goroutines.
887 func (c *ClusterClient) Close() error {
888         return c.nodes.Close()
889 }
890
891 // Do creates a Cmd from the args and processes the cmd.
892 func (c *ClusterClient) Do(args ...interface{}) *Cmd {
893         cmd := NewCmd(args...)
894         c.Process(cmd)
895         return cmd
896 }
897
898 func (c *ClusterClient) WrapProcess(
899         fn func(oldProcess func(Cmder) error) func(Cmder) error,
900 ) {
901         c.process = fn(c.process)
902 }
903
904 func (c *ClusterClient) Process(cmd Cmder) error {
905         return c.process(cmd)
906 }
907
908 func (c *ClusterClient) defaultProcess(cmd Cmder) error {
909         var node *clusterNode
910         var ask bool
911         for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
912                 if attempt > 0 {
913                         time.Sleep(c.retryBackoff(attempt))
914                 }
915
916                 if node == nil {
917                         var err error
918                         _, node, err = c.cmdSlotAndNode(cmd)
919                         if err != nil {
920                                 cmd.setErr(err)
921                                 break
922                         }
923                 }
924
925                 var err error
926                 if ask {
927                         pipe := node.Client.Pipeline()
928                         _ = pipe.Process(NewCmd("ASKING"))
929                         _ = pipe.Process(cmd)
930                         _, err = pipe.Exec()
931                         _ = pipe.Close()
932                         ask = false
933                 } else {
934                         err = node.Client.Process(cmd)
935                 }
936
937                 // If there is no error - we are done.
938                 if err == nil {
939                         break
940                 }
941                 if err != Nil {
942                         c.state.LazyReload()
943                 }
944
945                 // If slave is loading - pick another node.
946                 if c.opt.ReadOnly && internal.IsLoadingError(err) {
947                         node.MarkAsLoading()
948                         node = nil
949                         continue
950                 }
951
952                 var moved bool
953                 var addr string
954                 moved, ask, addr = internal.IsMovedError(err)
955                 if moved || ask {
956                         node, err = c.nodes.GetOrCreate(addr)
957                         if err != nil {
958                                 break
959                         }
960                         continue
961                 }
962
963                 if err == pool.ErrClosed || internal.IsReadOnlyError(err) {
964                         node = nil
965                         continue
966                 }
967
968                 if internal.IsRetryableError(err, true) {
969                         // First retry the same node.
970                         if attempt == 0 {
971                                 continue
972                         }
973
974                         // Second try random node.
975                         node, err = c.nodes.Random()
976                         if err != nil {
977                                 break
978                         }
979                         continue
980                 }
981
982                 break
983         }
984
985         return cmd.Err()
986 }
987
988 // ForEachMaster concurrently calls the fn on each master node in the cluster.
989 // It returns the first error if any.
990 func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
991         state, err := c.state.ReloadOrGet()
992         if err != nil {
993                 return err
994         }
995
996         var wg sync.WaitGroup
997         errCh := make(chan error, 1)
998         for _, master := range state.Masters {
999                 wg.Add(1)
1000                 go func(node *clusterNode) {
1001                         defer wg.Done()
1002                         err := fn(node.Client)
1003                         if err != nil {
1004                                 select {
1005                                 case errCh <- err:
1006                                 default:
1007                                 }
1008                         }
1009                 }(master)
1010         }
1011         wg.Wait()
1012
1013         select {
1014         case err := <-errCh:
1015                 return err
1016         default:
1017                 return nil
1018         }
1019 }
1020
1021 // ForEachSlave concurrently calls the fn on each slave node in the cluster.
1022 // It returns the first error if any.
1023 func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
1024         state, err := c.state.ReloadOrGet()
1025         if err != nil {
1026                 return err
1027         }
1028
1029         var wg sync.WaitGroup
1030         errCh := make(chan error, 1)
1031         for _, slave := range state.Slaves {
1032                 wg.Add(1)
1033                 go func(node *clusterNode) {
1034                         defer wg.Done()
1035                         err := fn(node.Client)
1036                         if err != nil {
1037                                 select {
1038                                 case errCh <- err:
1039                                 default:
1040                                 }
1041                         }
1042                 }(slave)
1043         }
1044         wg.Wait()
1045
1046         select {
1047         case err := <-errCh:
1048                 return err
1049         default:
1050                 return nil
1051         }
1052 }
1053
1054 // ForEachNode concurrently calls the fn on each known node in the cluster.
1055 // It returns the first error if any.
1056 func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
1057         state, err := c.state.ReloadOrGet()
1058         if err != nil {
1059                 return err
1060         }
1061
1062         var wg sync.WaitGroup
1063         errCh := make(chan error, 1)
1064         worker := func(node *clusterNode) {
1065                 defer wg.Done()
1066                 err := fn(node.Client)
1067                 if err != nil {
1068                         select {
1069                         case errCh <- err:
1070                         default:
1071                         }
1072                 }
1073         }
1074
1075         for _, node := range state.Masters {
1076                 wg.Add(1)
1077                 go worker(node)
1078         }
1079         for _, node := range state.Slaves {
1080                 wg.Add(1)
1081                 go worker(node)
1082         }
1083
1084         wg.Wait()
1085         select {
1086         case err := <-errCh:
1087                 return err
1088         default:
1089                 return nil
1090         }
1091 }
1092
1093 // PoolStats returns accumulated connection pool stats.
1094 func (c *ClusterClient) PoolStats() *PoolStats {
1095         var acc PoolStats
1096
1097         state, _ := c.state.Get()
1098         if state == nil {
1099                 return &acc
1100         }
1101
1102         for _, node := range state.Masters {
1103                 s := node.Client.connPool.Stats()
1104                 acc.Hits += s.Hits
1105                 acc.Misses += s.Misses
1106                 acc.Timeouts += s.Timeouts
1107
1108                 acc.TotalConns += s.TotalConns
1109                 acc.IdleConns += s.IdleConns
1110                 acc.StaleConns += s.StaleConns
1111         }
1112
1113         for _, node := range state.Slaves {
1114                 s := node.Client.connPool.Stats()
1115                 acc.Hits += s.Hits
1116                 acc.Misses += s.Misses
1117                 acc.Timeouts += s.Timeouts
1118
1119                 acc.TotalConns += s.TotalConns
1120                 acc.IdleConns += s.IdleConns
1121                 acc.StaleConns += s.StaleConns
1122         }
1123
1124         return &acc
1125 }
1126
1127 func (c *ClusterClient) loadState() (*clusterState, error) {
1128         if c.opt.ClusterSlots != nil {
1129                 slots, err := c.opt.ClusterSlots()
1130                 if err != nil {
1131                         return nil, err
1132                 }
1133                 return newClusterState(c.nodes, slots, "")
1134         }
1135
1136         addrs, err := c.nodes.Addrs()
1137         if err != nil {
1138                 return nil, err
1139         }
1140
1141         var firstErr error
1142         for _, addr := range addrs {
1143                 node, err := c.nodes.GetOrCreate(addr)
1144                 if err != nil {
1145                         if firstErr == nil {
1146                                 firstErr = err
1147                         }
1148                         continue
1149                 }
1150
1151                 slots, err := node.Client.ClusterSlots().Result()
1152                 if err != nil {
1153                         if firstErr == nil {
1154                                 firstErr = err
1155                         }
1156                         continue
1157                 }
1158
1159                 return newClusterState(c.nodes, slots, node.Client.opt.Addr)
1160         }
1161
1162         return nil, firstErr
1163 }
1164
1165 // reaper closes idle connections to the cluster.
1166 func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
1167         ticker := time.NewTicker(idleCheckFrequency)
1168         defer ticker.Stop()
1169
1170         for range ticker.C {
1171                 nodes, err := c.nodes.All()
1172                 if err != nil {
1173                         break
1174                 }
1175
1176                 for _, node := range nodes {
1177                         _, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
1178                         if err != nil {
1179                                 internal.Logf("ReapStaleConns failed: %s", err)
1180                         }
1181                 }
1182         }
1183 }
1184
1185 func (c *ClusterClient) Pipeline() Pipeliner {
1186         pipe := Pipeline{
1187                 exec: c.processPipeline,
1188         }
1189         pipe.statefulCmdable.setProcessor(pipe.Process)
1190         return &pipe
1191 }
1192
1193 func (c *ClusterClient) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
1194         return c.Pipeline().Pipelined(fn)
1195 }
1196
1197 func (c *ClusterClient) WrapProcessPipeline(
1198         fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
1199 ) {
1200         c.processPipeline = fn(c.processPipeline)
1201 }
1202
1203 func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error {
1204         cmdsMap := newCmdsMap()
1205         err := c.mapCmdsByNode(cmds, cmdsMap)
1206         if err != nil {
1207                 setCmdsErr(cmds, err)
1208                 return err
1209         }
1210
1211         for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1212                 if attempt > 0 {
1213                         time.Sleep(c.retryBackoff(attempt))
1214                 }
1215
1216                 failedCmds := newCmdsMap()
1217                 var wg sync.WaitGroup
1218
1219                 for node, cmds := range cmdsMap.m {
1220                         wg.Add(1)
1221                         go func(node *clusterNode, cmds []Cmder) {
1222                                 defer wg.Done()
1223
1224                                 cn, err := node.Client.getConn()
1225                                 if err != nil {
1226                                         if err == pool.ErrClosed {
1227                                                 c.mapCmdsByNode(cmds, failedCmds)
1228                                         } else {
1229                                                 setCmdsErr(cmds, err)
1230                                         }
1231                                         return
1232                                 }
1233
1234                                 err = c.pipelineProcessCmds(node, cn, cmds, failedCmds)
1235                                 node.Client.releaseConnStrict(cn, err)
1236                         }(node, cmds)
1237                 }
1238
1239                 wg.Wait()
1240                 if len(failedCmds.m) == 0 {
1241                         break
1242                 }
1243                 cmdsMap = failedCmds
1244         }
1245
1246         return cmdsFirstErr(cmds)
1247 }
1248
1249 type cmdsMap struct {
1250         mu sync.Mutex
1251         m  map[*clusterNode][]Cmder
1252 }
1253
1254 func newCmdsMap() *cmdsMap {
1255         return &cmdsMap{
1256                 m: make(map[*clusterNode][]Cmder),
1257         }
1258 }
1259
1260 func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
1261         state, err := c.state.Get()
1262         if err != nil {
1263                 setCmdsErr(cmds, err)
1264                 return err
1265         }
1266
1267         cmdsAreReadOnly := c.cmdsAreReadOnly(cmds)
1268         for _, cmd := range cmds {
1269                 var node *clusterNode
1270                 var err error
1271                 if cmdsAreReadOnly {
1272                         _, node, err = c.cmdSlotAndNode(cmd)
1273                 } else {
1274                         slot := c.cmdSlot(cmd)
1275                         node, err = state.slotMasterNode(slot)
1276                 }
1277                 if err != nil {
1278                         return err
1279                 }
1280                 cmdsMap.mu.Lock()
1281                 cmdsMap.m[node] = append(cmdsMap.m[node], cmd)
1282                 cmdsMap.mu.Unlock()
1283         }
1284         return nil
1285 }
1286
1287 func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
1288         for _, cmd := range cmds {
1289                 cmdInfo := c.cmdInfo(cmd.Name())
1290                 if cmdInfo == nil || !cmdInfo.ReadOnly {
1291                         return false
1292                 }
1293         }
1294         return true
1295 }
1296
1297 func (c *ClusterClient) pipelineProcessCmds(
1298         node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
1299 ) error {
1300         err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
1301                 return writeCmd(wr, cmds...)
1302         })
1303         if err != nil {
1304                 setCmdsErr(cmds, err)
1305                 failedCmds.mu.Lock()
1306                 failedCmds.m[node] = cmds
1307                 failedCmds.mu.Unlock()
1308                 return err
1309         }
1310
1311         err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
1312                 return c.pipelineReadCmds(node, rd, cmds, failedCmds)
1313         })
1314         return err
1315 }
1316
1317 func (c *ClusterClient) pipelineReadCmds(
1318         node *clusterNode, rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
1319 ) error {
1320         var firstErr error
1321         for _, cmd := range cmds {
1322                 err := cmd.readReply(rd)
1323                 if err == nil {
1324                         continue
1325                 }
1326
1327                 if c.checkMovedErr(cmd, err, failedCmds) {
1328                         continue
1329                 }
1330
1331                 if internal.IsRedisError(err) {
1332                         continue
1333                 }
1334
1335                 failedCmds.mu.Lock()
1336                 failedCmds.m[node] = append(failedCmds.m[node], cmd)
1337                 failedCmds.mu.Unlock()
1338                 if firstErr == nil {
1339                         firstErr = err
1340                 }
1341         }
1342         return firstErr
1343 }
1344
1345 func (c *ClusterClient) checkMovedErr(
1346         cmd Cmder, err error, failedCmds *cmdsMap,
1347 ) bool {
1348         moved, ask, addr := internal.IsMovedError(err)
1349
1350         if moved {
1351                 c.state.LazyReload()
1352
1353                 node, err := c.nodes.GetOrCreate(addr)
1354                 if err != nil {
1355                         return false
1356                 }
1357
1358                 failedCmds.mu.Lock()
1359                 failedCmds.m[node] = append(failedCmds.m[node], cmd)
1360                 failedCmds.mu.Unlock()
1361                 return true
1362         }
1363
1364         if ask {
1365                 node, err := c.nodes.GetOrCreate(addr)
1366                 if err != nil {
1367                         return false
1368                 }
1369
1370                 failedCmds.mu.Lock()
1371                 failedCmds.m[node] = append(failedCmds.m[node], NewCmd("ASKING"), cmd)
1372                 failedCmds.mu.Unlock()
1373                 return true
1374         }
1375
1376         return false
1377 }
1378
1379 // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
1380 func (c *ClusterClient) TxPipeline() Pipeliner {
1381         pipe := Pipeline{
1382                 exec: c.processTxPipeline,
1383         }
1384         pipe.statefulCmdable.setProcessor(pipe.Process)
1385         return &pipe
1386 }
1387
1388 func (c *ClusterClient) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
1389         return c.TxPipeline().Pipelined(fn)
1390 }
1391
1392 func (c *ClusterClient) defaultProcessTxPipeline(cmds []Cmder) error {
1393         state, err := c.state.Get()
1394         if err != nil {
1395                 return err
1396         }
1397
1398         cmdsMap := c.mapCmdsBySlot(cmds)
1399         for slot, cmds := range cmdsMap {
1400                 node, err := state.slotMasterNode(slot)
1401                 if err != nil {
1402                         setCmdsErr(cmds, err)
1403                         continue
1404                 }
1405                 cmdsMap := map[*clusterNode][]Cmder{node: cmds}
1406
1407                 for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
1408                         if attempt > 0 {
1409                                 time.Sleep(c.retryBackoff(attempt))
1410                         }
1411
1412                         failedCmds := newCmdsMap()
1413                         var wg sync.WaitGroup
1414
1415                         for node, cmds := range cmdsMap {
1416                                 wg.Add(1)
1417                                 go func(node *clusterNode, cmds []Cmder) {
1418                                         defer wg.Done()
1419
1420                                         cn, err := node.Client.getConn()
1421                                         if err != nil {
1422                                                 if err == pool.ErrClosed {
1423                                                         c.mapCmdsByNode(cmds, failedCmds)
1424                                                 } else {
1425                                                         setCmdsErr(cmds, err)
1426                                                 }
1427                                                 return
1428                                         }
1429
1430                                         err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
1431                                         node.Client.releaseConnStrict(cn, err)
1432                                 }(node, cmds)
1433                         }
1434
1435                         wg.Wait()
1436                         if len(failedCmds.m) == 0 {
1437                                 break
1438                         }
1439                         cmdsMap = failedCmds.m
1440                 }
1441         }
1442
1443         return cmdsFirstErr(cmds)
1444 }
1445
1446 func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
1447         cmdsMap := make(map[int][]Cmder)
1448         for _, cmd := range cmds {
1449                 slot := c.cmdSlot(cmd)
1450                 cmdsMap[slot] = append(cmdsMap[slot], cmd)
1451         }
1452         return cmdsMap
1453 }
1454
1455 func (c *ClusterClient) txPipelineProcessCmds(
1456         node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
1457 ) error {
1458         err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
1459                 return txPipelineWriteMulti(wr, cmds)
1460         })
1461         if err != nil {
1462                 setCmdsErr(cmds, err)
1463                 failedCmds.mu.Lock()
1464                 failedCmds.m[node] = cmds
1465                 failedCmds.mu.Unlock()
1466                 return err
1467         }
1468
1469         err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
1470                 err := c.txPipelineReadQueued(rd, cmds, failedCmds)
1471                 if err != nil {
1472                         setCmdsErr(cmds, err)
1473                         return err
1474                 }
1475                 return pipelineReadCmds(rd, cmds)
1476         })
1477         return err
1478 }
1479
1480 func (c *ClusterClient) txPipelineReadQueued(
1481         rd *proto.Reader, cmds []Cmder, failedCmds *cmdsMap,
1482 ) error {
1483         // Parse queued replies.
1484         var statusCmd StatusCmd
1485         if err := statusCmd.readReply(rd); err != nil {
1486                 return err
1487         }
1488
1489         for _, cmd := range cmds {
1490                 err := statusCmd.readReply(rd)
1491                 if err == nil {
1492                         continue
1493                 }
1494
1495                 if c.checkMovedErr(cmd, err, failedCmds) || internal.IsRedisError(err) {
1496                         continue
1497                 }
1498
1499                 return err
1500         }
1501
1502         // Parse number of replies.
1503         line, err := rd.ReadLine()
1504         if err != nil {
1505                 if err == Nil {
1506                         err = TxFailedErr
1507                 }
1508                 return err
1509         }
1510
1511         switch line[0] {
1512         case proto.ErrorReply:
1513                 err := proto.ParseErrorReply(line)
1514                 for _, cmd := range cmds {
1515                         if !c.checkMovedErr(cmd, err, failedCmds) {
1516                                 break
1517                         }
1518                 }
1519                 return err
1520         case proto.ArrayReply:
1521                 // ok
1522         default:
1523                 err := fmt.Errorf("redis: expected '*', but got line %q", line)
1524                 return err
1525         }
1526
1527         return nil
1528 }
1529
1530 func (c *ClusterClient) pubSub() *PubSub {
1531         var node *clusterNode
1532         pubsub := &PubSub{
1533                 opt: c.opt.clientOptions(),
1534
1535                 newConn: func(channels []string) (*pool.Conn, error) {
1536                         if node != nil {
1537                                 panic("node != nil")
1538                         }
1539
1540                         slot := hashtag.Slot(channels[0])
1541
1542                         var err error
1543                         node, err = c.slotMasterNode(slot)
1544                         if err != nil {
1545                                 return nil, err
1546                         }
1547
1548                         cn, err := node.Client.newConn()
1549                         if err != nil {
1550                                 return nil, err
1551                         }
1552
1553                         return cn, nil
1554                 },
1555                 closeConn: func(cn *pool.Conn) error {
1556                         err := node.Client.connPool.CloseConn(cn)
1557                         node = nil
1558                         return err
1559                 },
1560         }
1561         pubsub.init()
1562
1563         return pubsub
1564 }
1565
1566 // Subscribe subscribes the client to the specified channels.
1567 // Channels can be omitted to create empty subscription.
1568 func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
1569         pubsub := c.pubSub()
1570         if len(channels) > 0 {
1571                 _ = pubsub.Subscribe(channels...)
1572         }
1573         return pubsub
1574 }
1575
1576 // PSubscribe subscribes the client to the given patterns.
1577 // Patterns can be omitted to create empty subscription.
1578 func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
1579         pubsub := c.pubSub()
1580         if len(channels) > 0 {
1581                 _ = pubsub.PSubscribe(channels...)
1582         }
1583         return pubsub
1584 }
1585
1586 func appendUniqueNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
1587         for _, n := range nodes {
1588                 if n == node {
1589                         return nodes
1590                 }
1591         }
1592         return append(nodes, node)
1593 }
1594
1595 func appendIfNotExists(ss []string, es ...string) []string {
1596 loop:
1597         for _, e := range es {
1598                 for _, s := range ss {
1599                         if s == e {
1600                                 continue loop
1601                         }
1602                 }
1603                 ss = append(ss, e)
1604         }
1605         return ss
1606 }
1607
1608 func remove(ss []string, es ...string) []string {
1609         if len(es) == 0 {
1610                 return ss[:0]
1611         }
1612         for _, e := range es {
1613                 for i, s := range ss {
1614                         if s == e {
1615                                 ss = append(ss[:i], ss[i+1:]...)
1616                                 break
1617                         }
1618                 }
1619         }
1620         return ss
1621 }