13 "github.com/go-redis/redis/internal"
14 "github.com/go-redis/redis/internal/consistenthash"
15 "github.com/go-redis/redis/internal/hashtag"
16 "github.com/go-redis/redis/internal/pool"
19 // Hash is type of hash function used in consistent hash.
20 type Hash consistenthash.Hash
22 var errRingShardsDown = errors.New("redis: all ring shards are down")
24 // RingOptions are used to configure a ring client and should be
26 type RingOptions struct {
27 // Map of name => host:port addresses of ring shards.
28 Addrs map[string]string
30 // Frequency of PING commands sent to check shards availability.
31 // Shard is considered down after 3 subsequent failed checks.
32 HeartbeatFrequency time.Duration
34 // Hash function used in consistent hash.
35 // Default is crc32.ChecksumIEEE.
38 // Number of replicas in consistent hash.
39 // Default is 100 replicas.
41 // Higher number of replicas will provide less deviation, that is keys will be
42 // distributed to nodes more evenly.
44 // Following is deviation for common nreplicas:
45 // --------------------------------------------------------
46 // | nreplicas | standard error | 99% confidence interval |
47 // | 10 | 0.3152 | (0.37, 1.98) |
48 // | 100 | 0.0997 | (0.76, 1.28) |
49 // | 1000 | 0.0316 | (0.92, 1.09) |
50 // --------------------------------------------------------
52 // See https://arxiv.org/abs/1406.2294 for reference
55 // Following options are copied from Options struct.
57 OnConnect func(*Conn) error
63 MinRetryBackoff time.Duration
64 MaxRetryBackoff time.Duration
66 DialTimeout time.Duration
67 ReadTimeout time.Duration
68 WriteTimeout time.Duration
72 MaxConnAge time.Duration
73 PoolTimeout time.Duration
74 IdleTimeout time.Duration
75 IdleCheckFrequency time.Duration
78 func (opt *RingOptions) init() {
79 if opt.HeartbeatFrequency == 0 {
80 opt.HeartbeatFrequency = 500 * time.Millisecond
83 if opt.HashReplicas == 0 {
84 opt.HashReplicas = 100
87 switch opt.MinRetryBackoff {
89 opt.MinRetryBackoff = 0
91 opt.MinRetryBackoff = 8 * time.Millisecond
93 switch opt.MaxRetryBackoff {
95 opt.MaxRetryBackoff = 0
97 opt.MaxRetryBackoff = 512 * time.Millisecond
101 func (opt *RingOptions) clientOptions() *Options {
103 OnConnect: opt.OnConnect,
106 Password: opt.Password,
108 DialTimeout: opt.DialTimeout,
109 ReadTimeout: opt.ReadTimeout,
110 WriteTimeout: opt.WriteTimeout,
112 PoolSize: opt.PoolSize,
113 MinIdleConns: opt.MinIdleConns,
114 MaxConnAge: opt.MaxConnAge,
115 PoolTimeout: opt.PoolTimeout,
116 IdleTimeout: opt.IdleTimeout,
117 IdleCheckFrequency: opt.IdleCheckFrequency,
121 //------------------------------------------------------------------------------
123 type ringShard struct {
128 func (shard *ringShard) String() string {
135 return fmt.Sprintf("%s is %s", shard.Client, state)
138 func (shard *ringShard) IsDown() bool {
140 return atomic.LoadInt32(&shard.down) >= threshold
143 func (shard *ringShard) IsUp() bool {
144 return !shard.IsDown()
147 // Vote votes to set shard state and returns true if state was changed.
148 func (shard *ringShard) Vote(up bool) bool {
150 changed := shard.IsDown()
151 atomic.StoreInt32(&shard.down, 0)
159 atomic.AddInt32(&shard.down, 1)
160 return shard.IsDown()
163 //------------------------------------------------------------------------------
165 type ringShards struct {
169 hash *consistenthash.Map
170 shards map[string]*ringShard // read only
171 list []*ringShard // read only
176 func newRingShards(opt *RingOptions) *ringShards {
180 hash: newConsistentHash(opt),
181 shards: make(map[string]*ringShard),
185 func (c *ringShards) Add(name string, cl *Client) {
186 shard := &ringShard{Client: cl}
188 c.shards[name] = shard
189 c.list = append(c.list, shard)
192 func (c *ringShards) List() []*ringShard {
199 func (c *ringShards) Hash(key string) string {
201 hash := c.hash.Get(key)
206 func (c *ringShards) GetByKey(key string) (*ringShard, error) {
207 key = hashtag.Key(key)
213 return nil, pool.ErrClosed
216 hash := c.hash.Get(key)
219 return nil, errRingShardsDown
222 shard := c.shards[hash]
228 func (c *ringShards) GetByHash(name string) (*ringShard, error) {
234 shard := c.shards[name]
239 func (c *ringShards) Random() (*ringShard, error) {
240 return c.GetByKey(strconv.Itoa(rand.Int()))
243 // heartbeat monitors state of each shard in the ring.
244 func (c *ringShards) Heartbeat(frequency time.Duration) {
245 ticker := time.NewTicker(frequency)
260 for _, shard := range shards {
261 err := shard.Client.Ping().Err()
262 if shard.Vote(err == nil || err == pool.ErrPoolTimeout) {
263 internal.Logf("ring shard state changed: %s", shard)
274 // rebalance removes dead shards from the Ring.
275 func (c *ringShards) rebalance() {
276 hash := newConsistentHash(c.opt)
278 for name, shard := range c.shards {
291 func (c *ringShards) Len() int {
298 func (c *ringShards) Close() error {
308 for _, shard := range c.shards {
309 if err := shard.Client.Close(); err != nil && firstErr == nil {
320 //------------------------------------------------------------------------------
322 // Ring is a Redis client that uses consistent hashing to distribute
323 // keys across multiple Redis servers (shards). It's safe for
324 // concurrent use by multiple goroutines.
326 // Ring monitors the state of each shard and removes dead shards from
327 // the ring. When a shard comes online it is added back to the ring. This
328 // gives you maximum availability and partition tolerance, but no
329 // consistency between different shards or even clients. Each client
330 // uses shards that are available to the client and does not do any
331 // coordination when shard state is changed.
333 // Ring should be used when you need multiple Redis servers for caching
334 // and can tolerate losing data when one of the servers dies.
335 // Otherwise you should use Redis Cluster.
343 cmdsInfoCache *cmdsInfoCache
345 process func(Cmder) error
346 processPipeline func([]Cmder) error
349 func NewRing(opt *RingOptions) *Ring {
354 shards: newRingShards(opt),
356 ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
358 ring.process = ring.defaultProcess
359 ring.processPipeline = ring.defaultProcessPipeline
360 ring.cmdable.setProcessor(ring.Process)
362 for name, addr := range opt.Addrs {
363 clopt := opt.clientOptions()
365 ring.shards.Add(name, NewClient(clopt))
368 go ring.shards.Heartbeat(opt.HeartbeatFrequency)
373 func (c *Ring) Context() context.Context {
377 return context.Background()
380 func (c *Ring) WithContext(ctx context.Context) *Ring {
389 func (c *Ring) copy() *Ring {
394 // Options returns read-only Options that were used to create the client.
395 func (c *Ring) Options() *RingOptions {
399 func (c *Ring) retryBackoff(attempt int) time.Duration {
400 return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
403 // PoolStats returns accumulated connection pool stats.
404 func (c *Ring) PoolStats() *PoolStats {
405 shards := c.shards.List()
407 for _, shard := range shards {
408 s := shard.Client.connPool.Stats()
410 acc.Misses += s.Misses
411 acc.Timeouts += s.Timeouts
412 acc.TotalConns += s.TotalConns
413 acc.IdleConns += s.IdleConns
418 // Len returns the current number of shards in the ring.
419 func (c *Ring) Len() int {
420 return c.shards.Len()
423 // Subscribe subscribes the client to the specified channels.
424 func (c *Ring) Subscribe(channels ...string) *PubSub {
425 if len(channels) == 0 {
426 panic("at least one channel is required")
429 shard, err := c.shards.GetByKey(channels[0])
431 // TODO: return PubSub with sticky error
434 return shard.Client.Subscribe(channels...)
437 // PSubscribe subscribes the client to the given patterns.
438 func (c *Ring) PSubscribe(channels ...string) *PubSub {
439 if len(channels) == 0 {
440 panic("at least one channel is required")
443 shard, err := c.shards.GetByKey(channels[0])
445 // TODO: return PubSub with sticky error
448 return shard.Client.PSubscribe(channels...)
451 // ForEachShard concurrently calls the fn on each live shard in the ring.
452 // It returns the first error if any.
453 func (c *Ring) ForEachShard(fn func(client *Client) error) error {
454 shards := c.shards.List()
455 var wg sync.WaitGroup
456 errCh := make(chan error, 1)
457 for _, shard := range shards {
463 go func(shard *ringShard) {
465 err := fn(shard.Client)
484 func (c *Ring) cmdsInfo() (map[string]*CommandInfo, error) {
485 shards := c.shards.List()
486 firstErr := errRingShardsDown
487 for _, shard := range shards {
488 cmdsInfo, err := shard.Client.Command().Result()
499 func (c *Ring) cmdInfo(name string) *CommandInfo {
500 cmdsInfo, err := c.cmdsInfoCache.Get()
504 info := cmdsInfo[name]
506 internal.Logf("info for cmd=%s not found", name)
511 func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
512 cmdInfo := c.cmdInfo(cmd.Name())
513 pos := cmdFirstKeyPos(cmd, cmdInfo)
515 return c.shards.Random()
517 firstKey := cmd.stringArg(pos)
518 return c.shards.GetByKey(firstKey)
521 // Do creates a Cmd from the args and processes the cmd.
522 func (c *Ring) Do(args ...interface{}) *Cmd {
523 cmd := NewCmd(args...)
528 func (c *Ring) WrapProcess(
529 fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
531 c.process = fn(c.process)
534 func (c *Ring) Process(cmd Cmder) error {
535 return c.process(cmd)
538 func (c *Ring) defaultProcess(cmd Cmder) error {
539 for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
541 time.Sleep(c.retryBackoff(attempt))
544 shard, err := c.cmdShard(cmd)
550 err = shard.Client.Process(cmd)
554 if !internal.IsRetryableError(err, cmd.readTimeout() == nil) {
561 func (c *Ring) Pipeline() Pipeliner {
563 exec: c.processPipeline,
565 pipe.cmdable.setProcessor(pipe.Process)
569 func (c *Ring) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
570 return c.Pipeline().Pipelined(fn)
573 func (c *Ring) WrapProcessPipeline(
574 fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
576 c.processPipeline = fn(c.processPipeline)
579 func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
580 cmdsMap := make(map[string][]Cmder)
581 for _, cmd := range cmds {
582 cmdInfo := c.cmdInfo(cmd.Name())
583 hash := cmd.stringArg(cmdFirstKeyPos(cmd, cmdInfo))
585 hash = c.shards.Hash(hashtag.Key(hash))
587 cmdsMap[hash] = append(cmdsMap[hash], cmd)
590 for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
592 time.Sleep(c.retryBackoff(attempt))
596 var failedCmdsMap map[string][]Cmder
597 var wg sync.WaitGroup
599 for hash, cmds := range cmdsMap {
601 go func(hash string, cmds []Cmder) {
604 shard, err := c.shards.GetByHash(hash)
606 setCmdsErr(cmds, err)
610 cn, err := shard.Client.getConn()
612 setCmdsErr(cmds, err)
616 canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
617 shard.Client.releaseConnStrict(cn, err)
619 if canRetry && internal.IsRetryableError(err, true) {
621 if failedCmdsMap == nil {
622 failedCmdsMap = make(map[string][]Cmder)
624 failedCmdsMap[hash] = cmds
631 if len(failedCmdsMap) == 0 {
634 cmdsMap = failedCmdsMap
637 return cmdsFirstErr(cmds)
640 func (c *Ring) TxPipeline() Pipeliner {
641 panic("not implemented")
644 func (c *Ring) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
645 panic("not implemented")
648 // Close closes the ring client, releasing any open resources.
650 // It is rare to Close a Ring, as the Ring is meant to be long-lived
651 // and shared between many goroutines.
652 func (c *Ring) Close() error {
653 return c.shards.Close()
656 func newConsistentHash(opt *RingOptions) *consistenthash.Map {
657 return consistenthash.New(opt.HashReplicas, consistenthash.Hash(opt.Hash))