10 "github.com/go-redis/redis/internal"
11 "github.com/go-redis/redis/internal/pool"
12 "github.com/go-redis/redis/internal/proto"
15 // Nil reply Redis returns when key does not exist.
19 SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
22 func SetLogger(logger *log.Logger) {
23 internal.Logger = logger
26 type baseClient struct {
31 process func(Cmder) error
32 processPipeline func([]Cmder) error
33 processTxPipeline func([]Cmder) error
35 onClose func() error // hook called when client is closed
38 func (c *baseClient) init() {
39 c.process = c.defaultProcess
40 c.processPipeline = c.defaultProcessPipeline
41 c.processTxPipeline = c.defaultProcessTxPipeline
44 func (c *baseClient) String() string {
45 return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
48 func (c *baseClient) newConn() (*pool.Conn, error) {
49 cn, err := c.connPool.NewConn()
54 if cn.InitedAt.IsZero() {
55 if err := c.initConn(cn); err != nil {
56 _ = c.connPool.CloseConn(cn)
64 func (c *baseClient) getConn() (*pool.Conn, error) {
66 err := c.limiter.Allow()
72 cn, err := c._getConn()
75 c.limiter.ReportResult(err)
82 func (c *baseClient) _getConn() (*pool.Conn, error) {
83 cn, err := c.connPool.Get()
88 if cn.InitedAt.IsZero() {
99 func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
100 if c.limiter != nil {
101 c.limiter.ReportResult(err)
104 if internal.IsBadConn(err, false) {
105 c.connPool.Remove(cn)
111 func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
112 if c.limiter != nil {
113 c.limiter.ReportResult(err)
116 if err == nil || internal.IsRedisError(err) {
119 c.connPool.Remove(cn)
123 func (c *baseClient) initConn(cn *pool.Conn) error {
124 cn.InitedAt = time.Now()
126 if c.opt.Password == "" &&
129 c.opt.OnConnect == nil {
133 conn := newConn(c.opt, cn)
134 _, err := conn.Pipelined(func(pipe Pipeliner) error {
135 if c.opt.Password != "" {
136 pipe.Auth(c.opt.Password)
140 pipe.Select(c.opt.DB)
153 if c.opt.OnConnect != nil {
154 return c.opt.OnConnect(conn)
159 // Do creates a Cmd from the args and processes the cmd.
160 func (c *baseClient) Do(args ...interface{}) *Cmd {
161 cmd := NewCmd(args...)
166 // WrapProcess wraps function that processes Redis commands.
167 func (c *baseClient) WrapProcess(
168 fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
170 c.process = fn(c.process)
173 func (c *baseClient) Process(cmd Cmder) error {
174 return c.process(cmd)
177 func (c *baseClient) defaultProcess(cmd Cmder) error {
178 for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
180 time.Sleep(c.retryBackoff(attempt))
183 cn, err := c.getConn()
186 if internal.IsRetryableError(err, true) {
192 err = cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
193 return writeCmd(wr, cmd)
196 c.releaseConn(cn, err)
198 if internal.IsRetryableError(err, true) {
204 err = cn.WithReader(c.cmdTimeout(cmd), func(rd *proto.Reader) error {
205 return cmd.readReply(rd)
207 c.releaseConn(cn, err)
208 if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
218 func (c *baseClient) retryBackoff(attempt int) time.Duration {
219 return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
222 func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
223 if timeout := cmd.readTimeout(); timeout != nil {
228 return t + 10*time.Second
230 return c.opt.ReadTimeout
233 // Close closes the client, releasing any open resources.
235 // It is rare to Close a Client, as the Client is meant to be
236 // long-lived and shared between many goroutines.
237 func (c *baseClient) Close() error {
239 if c.onClose != nil {
240 if err := c.onClose(); err != nil && firstErr == nil {
244 if err := c.connPool.Close(); err != nil && firstErr == nil {
250 func (c *baseClient) getAddr() string {
254 func (c *baseClient) WrapProcessPipeline(
255 fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
257 c.processPipeline = fn(c.processPipeline)
258 c.processTxPipeline = fn(c.processTxPipeline)
261 func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error {
262 return c.generalProcessPipeline(cmds, c.pipelineProcessCmds)
265 func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error {
266 return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds)
269 type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error)
271 func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error {
272 for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
274 time.Sleep(c.retryBackoff(attempt))
277 cn, err := c.getConn()
279 setCmdsErr(cmds, err)
283 canRetry, err := p(cn, cmds)
284 c.releaseConnStrict(cn, err)
286 if !canRetry || !internal.IsRetryableError(err, true) {
290 return cmdsFirstErr(cmds)
293 func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
294 err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
295 return writeCmd(wr, cmds...)
298 setCmdsErr(cmds, err)
302 err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
303 return pipelineReadCmds(rd, cmds)
308 func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
309 for _, cmd := range cmds {
310 err := cmd.readReply(rd)
311 if err != nil && !internal.IsRedisError(err) {
318 func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
319 err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
320 return txPipelineWriteMulti(wr, cmds)
323 setCmdsErr(cmds, err)
327 err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
328 err := txPipelineReadQueued(rd, cmds)
330 setCmdsErr(cmds, err)
333 return pipelineReadCmds(rd, cmds)
338 func txPipelineWriteMulti(wr *proto.Writer, cmds []Cmder) error {
339 multiExec := make([]Cmder, 0, len(cmds)+2)
340 multiExec = append(multiExec, NewStatusCmd("MULTI"))
341 multiExec = append(multiExec, cmds...)
342 multiExec = append(multiExec, NewSliceCmd("EXEC"))
343 return writeCmd(wr, multiExec...)
346 func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error {
347 // Parse queued replies.
348 var statusCmd StatusCmd
349 err := statusCmd.readReply(rd)
355 err = statusCmd.readReply(rd)
356 if err != nil && !internal.IsRedisError(err) {
361 // Parse number of replies.
362 line, err := rd.ReadLine()
371 case proto.ErrorReply:
372 return proto.ParseErrorReply(line)
373 case proto.ArrayReply:
376 err := fmt.Errorf("redis: expected '*', but got line %q", line)
383 //------------------------------------------------------------------------------
385 // Client is a Redis client representing a pool of zero or more
386 // underlying connections. It's safe for concurrent use by multiple
395 // NewClient returns a client to the Redis Server specified by Options.
396 func NewClient(opt *Options) *Client {
400 baseClient: baseClient{
402 connPool: newConnPool(opt),
411 func (c *Client) init() {
412 c.cmdable.setProcessor(c.Process)
415 func (c *Client) Context() context.Context {
419 return context.Background()
422 func (c *Client) WithContext(ctx context.Context) *Client {
431 func (c *Client) clone() *Client {
437 // Options returns read-only Options that were used to create the client.
438 func (c *Client) Options() *Options {
442 func (c *Client) SetLimiter(l Limiter) *Client {
447 type PoolStats pool.Stats
449 // PoolStats returns connection pool stats.
450 func (c *Client) PoolStats() *PoolStats {
451 stats := c.connPool.Stats()
452 return (*PoolStats)(stats)
455 func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
456 return c.Pipeline().Pipelined(fn)
459 func (c *Client) Pipeline() Pipeliner {
461 exec: c.processPipeline,
463 pipe.statefulCmdable.setProcessor(pipe.Process)
467 func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
468 return c.TxPipeline().Pipelined(fn)
471 // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
472 func (c *Client) TxPipeline() Pipeliner {
474 exec: c.processTxPipeline,
476 pipe.statefulCmdable.setProcessor(pipe.Process)
480 func (c *Client) pubSub() *PubSub {
484 newConn: func(channels []string) (*pool.Conn, error) {
487 closeConn: c.connPool.CloseConn,
493 // Subscribe subscribes the client to the specified channels.
494 // Channels can be omitted to create empty subscription.
495 // Note that this method does not wait on a response from Redis, so the
496 // subscription may not be active immediately. To force the connection to wait,
497 // you may call the Receive() method on the returned *PubSub like so:
499 // sub := client.Subscribe(queryResp)
500 // iface, err := sub.Receive()
505 // // Should be *Subscription, but others are possible if other actions have been
506 // // taken on sub since it was created.
507 // switch iface.(type) {
508 // case *Subscription:
509 // // subscribe succeeded
511 // // received first message
518 // ch := sub.Channel()
519 func (c *Client) Subscribe(channels ...string) *PubSub {
521 if len(channels) > 0 {
522 _ = pubsub.Subscribe(channels...)
527 // PSubscribe subscribes the client to the given patterns.
528 // Patterns can be omitted to create empty subscription.
529 func (c *Client) PSubscribe(channels ...string) *PubSub {
531 if len(channels) > 0 {
532 _ = pubsub.PSubscribe(channels...)
537 //------------------------------------------------------------------------------
539 // Conn is like Client, but its pool contains single connection.
545 func newConn(opt *Options, cn *pool.Conn) *Conn {
547 baseClient: baseClient{
549 connPool: pool.NewSingleConnPool(cn),
553 c.statefulCmdable.setProcessor(c.Process)
557 func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
558 return c.Pipeline().Pipelined(fn)
561 func (c *Conn) Pipeline() Pipeliner {
563 exec: c.processPipeline,
565 pipe.statefulCmdable.setProcessor(pipe.Process)
569 func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
570 return c.TxPipeline().Pipelined(fn)
573 // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
574 func (c *Conn) TxPipeline() Pipeliner {
576 exec: c.processTxPipeline,
578 pipe.statefulCmdable.setProcessor(pipe.Process)