barometer: update DMA's vendoring packages
[barometer.git] / src / dma / vendor / github.com / go-redis / redis / ring.go
1 package redis
2
3 import (
4         "context"
5         "errors"
6         "fmt"
7         "math/rand"
8         "strconv"
9         "sync"
10         "sync/atomic"
11         "time"
12
13         "github.com/go-redis/redis/internal"
14         "github.com/go-redis/redis/internal/consistenthash"
15         "github.com/go-redis/redis/internal/hashtag"
16         "github.com/go-redis/redis/internal/pool"
17 )
18
19 // Hash is type of hash function used in consistent hash.
20 type Hash consistenthash.Hash
21
22 var errRingShardsDown = errors.New("redis: all ring shards are down")
23
24 // RingOptions are used to configure a ring client and should be
25 // passed to NewRing.
26 type RingOptions struct {
27         // Map of name => host:port addresses of ring shards.
28         Addrs map[string]string
29
30         // Frequency of PING commands sent to check shards availability.
31         // Shard is considered down after 3 subsequent failed checks.
32         HeartbeatFrequency time.Duration
33
34         // Hash function used in consistent hash.
35         // Default is crc32.ChecksumIEEE.
36         Hash Hash
37
38         // Number of replicas in consistent hash.
39         // Default is 100 replicas.
40         //
41         // Higher number of replicas will provide less deviation, that is keys will be
42         // distributed to nodes more evenly.
43         //
44         // Following is deviation for common nreplicas:
45         //  --------------------------------------------------------
46         //  | nreplicas | standard error | 99% confidence interval |
47         //  |     10    |     0.3152     |      (0.37, 1.98)       |
48         //  |    100    |     0.0997     |      (0.76, 1.28)       |
49         //  |   1000    |     0.0316     |      (0.92, 1.09)       |
50         //  --------------------------------------------------------
51         //
52         //  See https://arxiv.org/abs/1406.2294 for reference
53         HashReplicas int
54
55         // Following options are copied from Options struct.
56
57         OnConnect func(*Conn) error
58
59         DB       int
60         Password string
61
62         MaxRetries      int
63         MinRetryBackoff time.Duration
64         MaxRetryBackoff time.Duration
65
66         DialTimeout  time.Duration
67         ReadTimeout  time.Duration
68         WriteTimeout time.Duration
69
70         PoolSize           int
71         MinIdleConns       int
72         MaxConnAge         time.Duration
73         PoolTimeout        time.Duration
74         IdleTimeout        time.Duration
75         IdleCheckFrequency time.Duration
76 }
77
78 func (opt *RingOptions) init() {
79         if opt.HeartbeatFrequency == 0 {
80                 opt.HeartbeatFrequency = 500 * time.Millisecond
81         }
82
83         if opt.HashReplicas == 0 {
84                 opt.HashReplicas = 100
85         }
86
87         switch opt.MinRetryBackoff {
88         case -1:
89                 opt.MinRetryBackoff = 0
90         case 0:
91                 opt.MinRetryBackoff = 8 * time.Millisecond
92         }
93         switch opt.MaxRetryBackoff {
94         case -1:
95                 opt.MaxRetryBackoff = 0
96         case 0:
97                 opt.MaxRetryBackoff = 512 * time.Millisecond
98         }
99 }
100
101 func (opt *RingOptions) clientOptions() *Options {
102         return &Options{
103                 OnConnect: opt.OnConnect,
104
105                 DB:       opt.DB,
106                 Password: opt.Password,
107
108                 DialTimeout:  opt.DialTimeout,
109                 ReadTimeout:  opt.ReadTimeout,
110                 WriteTimeout: opt.WriteTimeout,
111
112                 PoolSize:           opt.PoolSize,
113                 MinIdleConns:       opt.MinIdleConns,
114                 MaxConnAge:         opt.MaxConnAge,
115                 PoolTimeout:        opt.PoolTimeout,
116                 IdleTimeout:        opt.IdleTimeout,
117                 IdleCheckFrequency: opt.IdleCheckFrequency,
118         }
119 }
120
121 //------------------------------------------------------------------------------
122
123 type ringShard struct {
124         Client *Client
125         down   int32
126 }
127
128 func (shard *ringShard) String() string {
129         var state string
130         if shard.IsUp() {
131                 state = "up"
132         } else {
133                 state = "down"
134         }
135         return fmt.Sprintf("%s is %s", shard.Client, state)
136 }
137
138 func (shard *ringShard) IsDown() bool {
139         const threshold = 3
140         return atomic.LoadInt32(&shard.down) >= threshold
141 }
142
143 func (shard *ringShard) IsUp() bool {
144         return !shard.IsDown()
145 }
146
147 // Vote votes to set shard state and returns true if state was changed.
148 func (shard *ringShard) Vote(up bool) bool {
149         if up {
150                 changed := shard.IsDown()
151                 atomic.StoreInt32(&shard.down, 0)
152                 return changed
153         }
154
155         if shard.IsDown() {
156                 return false
157         }
158
159         atomic.AddInt32(&shard.down, 1)
160         return shard.IsDown()
161 }
162
163 //------------------------------------------------------------------------------
164
165 type ringShards struct {
166         opt *RingOptions
167
168         mu     sync.RWMutex
169         hash   *consistenthash.Map
170         shards map[string]*ringShard // read only
171         list   []*ringShard          // read only
172         len    int
173         closed bool
174 }
175
176 func newRingShards(opt *RingOptions) *ringShards {
177         return &ringShards{
178                 opt: opt,
179
180                 hash:   newConsistentHash(opt),
181                 shards: make(map[string]*ringShard),
182         }
183 }
184
185 func (c *ringShards) Add(name string, cl *Client) {
186         shard := &ringShard{Client: cl}
187         c.hash.Add(name)
188         c.shards[name] = shard
189         c.list = append(c.list, shard)
190 }
191
192 func (c *ringShards) List() []*ringShard {
193         c.mu.RLock()
194         list := c.list
195         c.mu.RUnlock()
196         return list
197 }
198
199 func (c *ringShards) Hash(key string) string {
200         c.mu.RLock()
201         hash := c.hash.Get(key)
202         c.mu.RUnlock()
203         return hash
204 }
205
206 func (c *ringShards) GetByKey(key string) (*ringShard, error) {
207         key = hashtag.Key(key)
208
209         c.mu.RLock()
210
211         if c.closed {
212                 c.mu.RUnlock()
213                 return nil, pool.ErrClosed
214         }
215
216         hash := c.hash.Get(key)
217         if hash == "" {
218                 c.mu.RUnlock()
219                 return nil, errRingShardsDown
220         }
221
222         shard := c.shards[hash]
223         c.mu.RUnlock()
224
225         return shard, nil
226 }
227
228 func (c *ringShards) GetByHash(name string) (*ringShard, error) {
229         if name == "" {
230                 return c.Random()
231         }
232
233         c.mu.RLock()
234         shard := c.shards[name]
235         c.mu.RUnlock()
236         return shard, nil
237 }
238
239 func (c *ringShards) Random() (*ringShard, error) {
240         return c.GetByKey(strconv.Itoa(rand.Int()))
241 }
242
243 // heartbeat monitors state of each shard in the ring.
244 func (c *ringShards) Heartbeat(frequency time.Duration) {
245         ticker := time.NewTicker(frequency)
246         defer ticker.Stop()
247         for range ticker.C {
248                 var rebalance bool
249
250                 c.mu.RLock()
251
252                 if c.closed {
253                         c.mu.RUnlock()
254                         break
255                 }
256
257                 shards := c.list
258                 c.mu.RUnlock()
259
260                 for _, shard := range shards {
261                         err := shard.Client.Ping().Err()
262                         if shard.Vote(err == nil || err == pool.ErrPoolTimeout) {
263                                 internal.Logf("ring shard state changed: %s", shard)
264                                 rebalance = true
265                         }
266                 }
267
268                 if rebalance {
269                         c.rebalance()
270                 }
271         }
272 }
273
274 // rebalance removes dead shards from the Ring.
275 func (c *ringShards) rebalance() {
276         hash := newConsistentHash(c.opt)
277         var shardsNum int
278         for name, shard := range c.shards {
279                 if shard.IsUp() {
280                         hash.Add(name)
281                         shardsNum++
282                 }
283         }
284
285         c.mu.Lock()
286         c.hash = hash
287         c.len = shardsNum
288         c.mu.Unlock()
289 }
290
291 func (c *ringShards) Len() int {
292         c.mu.RLock()
293         l := c.len
294         c.mu.RUnlock()
295         return l
296 }
297
298 func (c *ringShards) Close() error {
299         c.mu.Lock()
300         defer c.mu.Unlock()
301
302         if c.closed {
303                 return nil
304         }
305         c.closed = true
306
307         var firstErr error
308         for _, shard := range c.shards {
309                 if err := shard.Client.Close(); err != nil && firstErr == nil {
310                         firstErr = err
311                 }
312         }
313         c.hash = nil
314         c.shards = nil
315         c.list = nil
316
317         return firstErr
318 }
319
320 //------------------------------------------------------------------------------
321
322 // Ring is a Redis client that uses consistent hashing to distribute
323 // keys across multiple Redis servers (shards). It's safe for
324 // concurrent use by multiple goroutines.
325 //
326 // Ring monitors the state of each shard and removes dead shards from
327 // the ring. When a shard comes online it is added back to the ring. This
328 // gives you maximum availability and partition tolerance, but no
329 // consistency between different shards or even clients. Each client
330 // uses shards that are available to the client and does not do any
331 // coordination when shard state is changed.
332 //
333 // Ring should be used when you need multiple Redis servers for caching
334 // and can tolerate losing data when one of the servers dies.
335 // Otherwise you should use Redis Cluster.
336 type Ring struct {
337         cmdable
338
339         ctx context.Context
340
341         opt           *RingOptions
342         shards        *ringShards
343         cmdsInfoCache *cmdsInfoCache
344
345         process         func(Cmder) error
346         processPipeline func([]Cmder) error
347 }
348
349 func NewRing(opt *RingOptions) *Ring {
350         opt.init()
351
352         ring := &Ring{
353                 opt:    opt,
354                 shards: newRingShards(opt),
355         }
356         ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
357
358         ring.process = ring.defaultProcess
359         ring.processPipeline = ring.defaultProcessPipeline
360         ring.cmdable.setProcessor(ring.Process)
361
362         for name, addr := range opt.Addrs {
363                 clopt := opt.clientOptions()
364                 clopt.Addr = addr
365                 ring.shards.Add(name, NewClient(clopt))
366         }
367
368         go ring.shards.Heartbeat(opt.HeartbeatFrequency)
369
370         return ring
371 }
372
373 func (c *Ring) Context() context.Context {
374         if c.ctx != nil {
375                 return c.ctx
376         }
377         return context.Background()
378 }
379
380 func (c *Ring) WithContext(ctx context.Context) *Ring {
381         if ctx == nil {
382                 panic("nil context")
383         }
384         c2 := c.copy()
385         c2.ctx = ctx
386         return c2
387 }
388
389 func (c *Ring) copy() *Ring {
390         cp := *c
391         return &cp
392 }
393
394 // Options returns read-only Options that were used to create the client.
395 func (c *Ring) Options() *RingOptions {
396         return c.opt
397 }
398
399 func (c *Ring) retryBackoff(attempt int) time.Duration {
400         return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
401 }
402
403 // PoolStats returns accumulated connection pool stats.
404 func (c *Ring) PoolStats() *PoolStats {
405         shards := c.shards.List()
406         var acc PoolStats
407         for _, shard := range shards {
408                 s := shard.Client.connPool.Stats()
409                 acc.Hits += s.Hits
410                 acc.Misses += s.Misses
411                 acc.Timeouts += s.Timeouts
412                 acc.TotalConns += s.TotalConns
413                 acc.IdleConns += s.IdleConns
414         }
415         return &acc
416 }
417
418 // Len returns the current number of shards in the ring.
419 func (c *Ring) Len() int {
420         return c.shards.Len()
421 }
422
423 // Subscribe subscribes the client to the specified channels.
424 func (c *Ring) Subscribe(channels ...string) *PubSub {
425         if len(channels) == 0 {
426                 panic("at least one channel is required")
427         }
428
429         shard, err := c.shards.GetByKey(channels[0])
430         if err != nil {
431                 // TODO: return PubSub with sticky error
432                 panic(err)
433         }
434         return shard.Client.Subscribe(channels...)
435 }
436
437 // PSubscribe subscribes the client to the given patterns.
438 func (c *Ring) PSubscribe(channels ...string) *PubSub {
439         if len(channels) == 0 {
440                 panic("at least one channel is required")
441         }
442
443         shard, err := c.shards.GetByKey(channels[0])
444         if err != nil {
445                 // TODO: return PubSub with sticky error
446                 panic(err)
447         }
448         return shard.Client.PSubscribe(channels...)
449 }
450
451 // ForEachShard concurrently calls the fn on each live shard in the ring.
452 // It returns the first error if any.
453 func (c *Ring) ForEachShard(fn func(client *Client) error) error {
454         shards := c.shards.List()
455         var wg sync.WaitGroup
456         errCh := make(chan error, 1)
457         for _, shard := range shards {
458                 if shard.IsDown() {
459                         continue
460                 }
461
462                 wg.Add(1)
463                 go func(shard *ringShard) {
464                         defer wg.Done()
465                         err := fn(shard.Client)
466                         if err != nil {
467                                 select {
468                                 case errCh <- err:
469                                 default:
470                                 }
471                         }
472                 }(shard)
473         }
474         wg.Wait()
475
476         select {
477         case err := <-errCh:
478                 return err
479         default:
480                 return nil
481         }
482 }
483
484 func (c *Ring) cmdsInfo() (map[string]*CommandInfo, error) {
485         shards := c.shards.List()
486         firstErr := errRingShardsDown
487         for _, shard := range shards {
488                 cmdsInfo, err := shard.Client.Command().Result()
489                 if err == nil {
490                         return cmdsInfo, nil
491                 }
492                 if firstErr == nil {
493                         firstErr = err
494                 }
495         }
496         return nil, firstErr
497 }
498
499 func (c *Ring) cmdInfo(name string) *CommandInfo {
500         cmdsInfo, err := c.cmdsInfoCache.Get()
501         if err != nil {
502                 return nil
503         }
504         info := cmdsInfo[name]
505         if info == nil {
506                 internal.Logf("info for cmd=%s not found", name)
507         }
508         return info
509 }
510
511 func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
512         cmdInfo := c.cmdInfo(cmd.Name())
513         pos := cmdFirstKeyPos(cmd, cmdInfo)
514         if pos == 0 {
515                 return c.shards.Random()
516         }
517         firstKey := cmd.stringArg(pos)
518         return c.shards.GetByKey(firstKey)
519 }
520
521 // Do creates a Cmd from the args and processes the cmd.
522 func (c *Ring) Do(args ...interface{}) *Cmd {
523         cmd := NewCmd(args...)
524         c.Process(cmd)
525         return cmd
526 }
527
528 func (c *Ring) WrapProcess(
529         fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
530 ) {
531         c.process = fn(c.process)
532 }
533
534 func (c *Ring) Process(cmd Cmder) error {
535         return c.process(cmd)
536 }
537
538 func (c *Ring) defaultProcess(cmd Cmder) error {
539         for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
540                 if attempt > 0 {
541                         time.Sleep(c.retryBackoff(attempt))
542                 }
543
544                 shard, err := c.cmdShard(cmd)
545                 if err != nil {
546                         cmd.setErr(err)
547                         return err
548                 }
549
550                 err = shard.Client.Process(cmd)
551                 if err == nil {
552                         return nil
553                 }
554                 if !internal.IsRetryableError(err, cmd.readTimeout() == nil) {
555                         return err
556                 }
557         }
558         return cmd.Err()
559 }
560
561 func (c *Ring) Pipeline() Pipeliner {
562         pipe := Pipeline{
563                 exec: c.processPipeline,
564         }
565         pipe.cmdable.setProcessor(pipe.Process)
566         return &pipe
567 }
568
569 func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
570         return c.Pipeline().Pipelined(fn)
571 }
572
573 func (c *Ring) WrapProcessPipeline(
574         fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
575 ) {
576         c.processPipeline = fn(c.processPipeline)
577 }
578
579 func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
580         cmdsMap := make(map[string][]Cmder)
581         for _, cmd := range cmds {
582                 cmdInfo := c.cmdInfo(cmd.Name())
583                 hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
584                 if hash != "" {
585                         hash = c.shards.Hash(hashtag.Key(hash))
586                 }
587                 cmdsMap[hash] = append(cmdsMap[hash], cmd)
588         }
589
590         for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
591                 if attempt > 0 {
592                         time.Sleep(c.retryBackoff(attempt))
593                 }
594
595                 var mu sync.Mutex
596                 var failedCmdsMap map[string][]Cmder
597                 var wg sync.WaitGroup
598
599                 for hash, cmds := range cmdsMap {
600                         wg.Add(1)
601                         go func(hash string, cmds []Cmder) {
602                                 defer wg.Done()
603
604                                 shard, err := c.shards.GetByHash(hash)
605                                 if err != nil {
606                                         setCmdsErr(cmds, err)
607                                         return
608                                 }
609
610                                 cn, err := shard.Client.getConn()
611                                 if err != nil {
612                                         setCmdsErr(cmds, err)
613                                         return
614                                 }
615
616                                 canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
617                                 shard.Client.releaseConnStrict(cn, err)
618
619                                 if canRetry && internal.IsRetryableError(err, true) {
620                                         mu.Lock()
621                                         if failedCmdsMap == nil {
622                                                 failedCmdsMap = make(map[string][]Cmder)
623                                         }
624                                         failedCmdsMap[hash] = cmds
625                                         mu.Unlock()
626                                 }
627                         }(hash, cmds)
628                 }
629
630                 wg.Wait()
631                 if len(failedCmdsMap) == 0 {
632                         break
633                 }
634                 cmdsMap = failedCmdsMap
635         }
636
637         return cmdsFirstErr(cmds)
638 }
639
640 func (c *Ring) TxPipeline() Pipeliner {
641         panic("not implemented")
642 }
643
644 func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
645         panic("not implemented")
646 }
647
648 // Close closes the ring client, releasing any open resources.
649 //
650 // It is rare to Close a Ring, as the Ring is meant to be long-lived
651 // and shared between many goroutines.
652 func (c *Ring) Close() error {
653         return c.shards.Close()
654 }
655
656 func newConsistentHash(opt *RingOptions) *consistenthash.Map {
657         return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash))
658 }