WriteTimeout time.Duration
PoolSize int
+ MinIdleConns int
+ MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize,
+ MinIdleConns: opt.MinIdleConns,
+ MaxConnAge: opt.MaxConnAge,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
//------------------------------------------------------------------------------
-// Ring is a Redis client that uses constistent hashing to distribute
+// Ring is a Redis client that uses consistent hashing to distribute
// keys across multiple Redis servers (shards). It's safe for
// concurrent use by multiple goroutines.
//
// Ring monitors the state of each shard and removes dead shards from
-// the ring. When shard comes online it is added back to the ring. This
+// the ring. When a shard comes online it is added back to the ring. This
// gives you maximum availability and partition tolerance, but no
// consistency between different shards or even clients. Each client
// uses shards that are available to the client and does not do any
shards *ringShards
cmdsInfoCache *cmdsInfoCache
+ process func(Cmder) error
processPipeline func([]Cmder) error
}
}
ring.cmdsInfoCache = newCmdsInfoCache(ring.cmdsInfo)
+ ring.process = ring.defaultProcess
ring.processPipeline = ring.defaultProcessPipeline
ring.cmdable.setProcessor(ring.Process)
acc.Misses += s.Misses
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
- acc.FreeConns += s.FreeConns
+ acc.IdleConns += s.IdleConns
}
return &acc
}
return c.shards.GetByKey(firstKey)
}
-func (c *Ring) WrapProcess(fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error) {
- c.ForEachShard(func(c *Client) error {
- c.WrapProcess(fn)
- return nil
- })
+// Do creates a Cmd from the args and processes the cmd.
+func (c *Ring) Do(args ...interface{}) *Cmd {
+ cmd := NewCmd(args...)
+ c.Process(cmd)
+ return cmd
+}
+
+func (c *Ring) WrapProcess(
+ fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
+) {
+ c.process = fn(c.process)
}
func (c *Ring) Process(cmd Cmder) error {
- shard, err := c.cmdShard(cmd)
- if err != nil {
- cmd.setErr(err)
- return err
+ return c.process(cmd)
+}
+
+func (c *Ring) defaultProcess(cmd Cmder) error {
+ for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
+ if attempt > 0 {
+ time.Sleep(c.retryBackoff(attempt))
+ }
+
+ shard, err := c.cmdShard(cmd)
+ if err != nil {
+ cmd.setErr(err)
+ return err
+ }
+
+ err = shard.Client.Process(cmd)
+ if err == nil {
+ return nil
+ }
+ if !internal.IsRetryableError(err, cmd.readTimeout() == nil) {
+ return err
+ }
}
- return shard.Client.Process(cmd)
+ return cmd.Err()
}
func (c *Ring) Pipeline() Pipeliner {
time.Sleep(c.retryBackoff(attempt))
}
+ var mu sync.Mutex
var failedCmdsMap map[string][]Cmder
+ var wg sync.WaitGroup
for hash, cmds := range cmdsMap {
- shard, err := c.shards.GetByHash(hash)
- if err != nil {
- setCmdsErr(cmds, err)
- continue
- }
+ wg.Add(1)
+ go func(hash string, cmds []Cmder) {
+ defer wg.Done()
+
+ shard, err := c.shards.GetByHash(hash)
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return
+ }
- cn, err := shard.Client.getConn()
- if err != nil {
- setCmdsErr(cmds, err)
- continue
- }
+ cn, err := shard.Client.getConn()
+ if err != nil {
+ setCmdsErr(cmds, err)
+ return
+ }
- canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
- if err == nil || internal.IsRedisError(err) {
- shard.Client.connPool.Put(cn)
- continue
- }
- shard.Client.connPool.Remove(cn)
+ canRetry, err := shard.Client.pipelineProcessCmds(cn, cmds)
+ shard.Client.releaseConnStrict(cn, err)
- if canRetry && internal.IsRetryableError(err, true) {
- if failedCmdsMap == nil {
- failedCmdsMap = make(map[string][]Cmder)
+ if canRetry && internal.IsRetryableError(err, true) {
+ mu.Lock()
+ if failedCmdsMap == nil {
+ failedCmdsMap = make(map[string][]Cmder)
+ }
+ failedCmdsMap[hash] = cmds
+ mu.Unlock()
}
- failedCmdsMap[hash] = cmds
- }
+ }(hash, cmds)
}
+ wg.Wait()
if len(failedCmdsMap) == 0 {
break
}
cmdsMap = failedCmdsMap
}
- return firstCmdsErr(cmds)
+ return cmdsFirstErr(cmds)
}
func (c *Ring) TxPipeline() Pipeliner {