barometer: update DMA's vendoring packages
[barometer.git] / src / dma / vendor / github.com / go-redis / redis / ring.go
index ef85511..250e5f6 100644 (file)
@@ -68,6 +68,8 @@ type RingOptions struct {
        WriteTimeout time.Duration
 
        PoolSize           int
+       MinIdleConns       int
+       MaxConnAge         time.Duration
        PoolTimeout        time.Duration
        IdleTimeout        time.Duration
        IdleCheckFrequency time.Duration
@@ -108,6 +110,8 @@ func (opt *RingOptions) clientOptions() *Options {
                WriteTimeout: opt.WriteTimeout,
 
                PoolSize:           opt.PoolSize,
+               MinIdleConns:       opt.MinIdleConns,
+               MaxConnAge:         opt.MaxConnAge,
                PoolTimeout:        opt.PoolTimeout,
                IdleTimeout:        opt.IdleTimeout,
                IdleCheckFrequency: opt.IdleCheckFrequency,
@@ -315,12 +319,12 @@ func (c *ringShards) Close() error {
 
 //------------------------------------------------------------------------------
 
-// Ring is a Redis client that uses constistent hashing to distribute
+// Ring is a Redis client that uses consistent hashing to distribute
 // keys across multiple Redis servers (shards). It's safe for
 // concurrent use by multiple goroutines.
 //
 // Ring monitors the state of each shard and removes dead shards from
-// the ring. When shard comes online it is added back to the ring. This
+// the ring. When shard comes online it is added back to the ring. This
 // gives you maximum availability and partition tolerance, but no
 // consistency between different shards or even clients. Each client
 // uses shards that are available to the client and does not do any
@@ -338,6 +342,7 @@ type Ring struct {
        shards        *ringShards
        cmdsInfoCache *cmdsInfoCache
 
+       process         func(Cmder) error
        processPipeline func([]Cmder) error
 }
 
@@ -350,6 +355,7 @@ func NewRing(opt *RingOptions) *Ring {
        }
        ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
 
+       ring.process = ring.defaultProcess
        ring.processPipeline = ring.defaultProcessPipeline
        ring.cmdable.setProcessor(ring.Process)
 
@@ -404,7 +410,7 @@ func (c *Ring) PoolStats() *PoolStats {
                acc.Misses += s.Misses
                acc.Timeouts += s.Timeouts
                acc.TotalConns += s.TotalConns
-               acc.FreeConns += s.FreeConns
+               acc.IdleConns += s.IdleConns
        }
        return &acc
 }
@@ -512,20 +518,44 @@ func (c *Ring) cmdShard(cmd Cmder) (*ringShard, error) {
        return c.shards.GetByKey(firstKey)
 }
 
-func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
-       c.ForEachShard(func(c *Client) error {
-               c.WrapProcess(fn)
-               return nil
-       })
+// Do creates a Cmd from the args and processes the cmd.
+func (c *Ring) Do(args ...interface{}) *Cmd {
+       cmd := NewCmd(args...)
+       c.Process(cmd)
+       return cmd
+}
+
+func (c *Ring) WrapProcess(
+       fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
+) {
+       c.process = fn(c.process)
 }
 
 func (c *Ring) Process(cmd Cmder) error {
-       shard, err := c.cmdShard(cmd)
-       if err != nil {
-               cmd.setErr(err)
-               return err
+       return c.process(cmd)
+}
+
+func (c *Ring) defaultProcess(cmd Cmder) error {
+       for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
+               if attempt > 0 {
+                       time.Sleep(c.retryBackoff(attempt))
+               }
+
+               shard, err := c.cmdShard(cmd)
+               if err != nil {
+                       cmd.setErr(err)
+                       return err
+               }
+
+               err = shard.Client.Process(cmd)
+               if err == nil {
+                       return nil
+               }
+               if !internal.IsRetryableError(err, cmd.readTimeout() == nil) {
+                       return err
+               }
        }
-       return shard.Client.Process(cmd)
+       return cmd.Err()
 }
 
 func (c *Ring) Pipeline() Pipeliner {
@@ -562,43 +592,49 @@ func (c *Ring) defaultProcessPipeline(cmds []Cmder) error {
                        time.Sleep(c.retryBackoff(attempt))
                }
 
+               var mu sync.Mutex
                var failedCmdsMap map[string][]Cmder
+               var wg sync.WaitGroup
 
                for hash, cmds := range cmdsMap {
-                       shard, err := c.shards.GetByHash(hash)
-                       if err != nil {
-                               setCmdsErr(cmds, err)
-                               continue
-                       }
+                       wg.Add(1)
+                       go func(hash string, cmds []Cmder) {
+                               defer wg.Done()
+
+                               shard, err := c.shards.GetByHash(hash)
+                               if err != nil {
+                                       setCmdsErr(cmds, err)
+                                       return
+                               }
 
-                       cn, err := shard.Client.getConn()
-                       if err != nil {
-                               setCmdsErr(cmds, err)
-                               continue
-                       }
+                               cn, err := shard.Client.getConn()
+                               if err != nil {
+                                       setCmdsErr(cmds, err)
+                                       return
+                               }
 
-                       canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
-                       if err == nil || internal.IsRedisError(err) {
-                               shard.Client.connPool.Put(cn)
-                               continue
-                       }
-                       shard.Client.connPool.Remove(cn)
+                               canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
+                               shard.Client.releaseConnStrict(cn, err)
 
-                       if canRetry && internal.IsRetryableError(err, true) {
-                               if failedCmdsMap == nil {
-                                       failedCmdsMap = make(map[string][]Cmder)
+                               if canRetry && internal.IsRetryableError(err, true) {
+                                       mu.Lock()
+                                       if failedCmdsMap == nil {
+                                               failedCmdsMap = make(map[string][]Cmder)
+                                       }
+                                       failedCmdsMap[hash] = cmds
+                                       mu.Unlock()
                                }
-                               failedCmdsMap[hash] = cmds
-                       }
+                       }(hash, cmds)
                }
 
+               wg.Wait()
                if len(failedCmdsMap) == 0 {
                        break
                }
                cmdsMap = failedCmdsMap
        }
 
-       return firstCmdsErr(cmds)
+       return cmdsFirstErr(cmds)
 }
 
 func (c *Ring) TxPipeline() Pipeliner {