barometer: update DMA's vendoring packages
[barometer.git] / src / dma / vendor / github.com / go-redis / redis / redis.go
1 package redis
2
3 import (
4         "context"
5         "fmt"
6         "log"
7         "os"
8         "time"
9
10         "github.com/go-redis/redis/internal"
11         "github.com/go-redis/redis/internal/pool"
12         "github.com/go-redis/redis/internal/proto"
13 )
14
15 // Nil reply Redis returns when key does not exist.
16 const Nil = proto.Nil
17
18 func init() {
19         SetLogger(log.New(os.Stderr, "redis: ", log.LstdFlags|log.Lshortfile))
20 }
21
22 func SetLogger(logger *log.Logger) {
23         internal.Logger = logger
24 }
25
26 type baseClient struct {
27         opt      *Options
28         connPool pool.Pooler
29         limiter  Limiter
30
31         process           func(Cmder) error
32         processPipeline   func([]Cmder) error
33         processTxPipeline func([]Cmder) error
34
35         onClose func() error // hook called when client is closed
36 }
37
38 func (c *baseClient) init() {
39         c.process = c.defaultProcess
40         c.processPipeline = c.defaultProcessPipeline
41         c.processTxPipeline = c.defaultProcessTxPipeline
42 }
43
44 func (c *baseClient) String() string {
45         return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
46 }
47
48 func (c *baseClient) newConn() (*pool.Conn, error) {
49         cn, err := c.connPool.NewConn()
50         if err != nil {
51                 return nil, err
52         }
53
54         if cn.InitedAt.IsZero() {
55                 if err := c.initConn(cn); err != nil {
56                         _ = c.connPool.CloseConn(cn)
57                         return nil, err
58                 }
59         }
60
61         return cn, nil
62 }
63
64 func (c *baseClient) getConn() (*pool.Conn, error) {
65         if c.limiter != nil {
66                 err := c.limiter.Allow()
67                 if err != nil {
68                         return nil, err
69                 }
70         }
71
72         cn, err := c._getConn()
73         if err != nil {
74                 if c.limiter != nil {
75                         c.limiter.ReportResult(err)
76                 }
77                 return nil, err
78         }
79         return cn, nil
80 }
81
82 func (c *baseClient) _getConn() (*pool.Conn, error) {
83         cn, err := c.connPool.Get()
84         if err != nil {
85                 return nil, err
86         }
87
88         if cn.InitedAt.IsZero() {
89                 err := c.initConn(cn)
90                 if err != nil {
91                         c.connPool.Remove(cn)
92                         return nil, err
93                 }
94         }
95
96         return cn, nil
97 }
98
99 func (c *baseClient) releaseConn(cn *pool.Conn, err error) {
100         if c.limiter != nil {
101                 c.limiter.ReportResult(err)
102         }
103
104         if internal.IsBadConn(err, false) {
105                 c.connPool.Remove(cn)
106         } else {
107                 c.connPool.Put(cn)
108         }
109 }
110
111 func (c *baseClient) releaseConnStrict(cn *pool.Conn, err error) {
112         if c.limiter != nil {
113                 c.limiter.ReportResult(err)
114         }
115
116         if err == nil || internal.IsRedisError(err) {
117                 c.connPool.Put(cn)
118         } else {
119                 c.connPool.Remove(cn)
120         }
121 }
122
123 func (c *baseClient) initConn(cn *pool.Conn) error {
124         cn.InitedAt = time.Now()
125
126         if c.opt.Password == "" &&
127                 c.opt.DB == 0 &&
128                 !c.opt.readOnly &&
129                 c.opt.OnConnect == nil {
130                 return nil
131         }
132
133         conn := newConn(c.opt, cn)
134         _, err := conn.Pipelined(func(pipe Pipeliner) error {
135                 if c.opt.Password != "" {
136                         pipe.Auth(c.opt.Password)
137                 }
138
139                 if c.opt.DB > 0 {
140                         pipe.Select(c.opt.DB)
141                 }
142
143                 if c.opt.readOnly {
144                         pipe.ReadOnly()
145                 }
146
147                 return nil
148         })
149         if err != nil {
150                 return err
151         }
152
153         if c.opt.OnConnect != nil {
154                 return c.opt.OnConnect(conn)
155         }
156         return nil
157 }
158
159 // Do creates a Cmd from the args and processes the cmd.
160 func (c *baseClient) Do(args ...interface{}) *Cmd {
161         cmd := NewCmd(args...)
162         _ = c.Process(cmd)
163         return cmd
164 }
165
166 // WrapProcess wraps function that processes Redis commands.
167 func (c *baseClient) WrapProcess(
168         fn func(oldProcess func(cmd Cmder) error) func(cmd Cmder) error,
169 ) {
170         c.process = fn(c.process)
171 }
172
173 func (c *baseClient) Process(cmd Cmder) error {
174         return c.process(cmd)
175 }
176
177 func (c *baseClient) defaultProcess(cmd Cmder) error {
178         for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
179                 if attempt > 0 {
180                         time.Sleep(c.retryBackoff(attempt))
181                 }
182
183                 cn, err := c.getConn()
184                 if err != nil {
185                         cmd.setErr(err)
186                         if internal.IsRetryableError(err, true) {
187                                 continue
188                         }
189                         return err
190                 }
191
192                 err = cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
193                         return writeCmd(wr, cmd)
194                 })
195                 if err != nil {
196                         c.releaseConn(cn, err)
197                         cmd.setErr(err)
198                         if internal.IsRetryableError(err, true) {
199                                 continue
200                         }
201                         return err
202                 }
203
204                 err = cn.WithReader(c.cmdTimeout(cmd), func(rd *proto.Reader) error {
205                         return cmd.readReply(rd)
206                 })
207                 c.releaseConn(cn, err)
208                 if err != nil && internal.IsRetryableError(err, cmd.readTimeout() == nil) {
209                         continue
210                 }
211
212                 return err
213         }
214
215         return cmd.Err()
216 }
217
218 func (c *baseClient) retryBackoff(attempt int) time.Duration {
219         return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
220 }
221
222 func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
223         if timeout := cmd.readTimeout(); timeout != nil {
224                 t := *timeout
225                 if t == 0 {
226                         return 0
227                 }
228                 return t + 10*time.Second
229         }
230         return c.opt.ReadTimeout
231 }
232
233 // Close closes the client, releasing any open resources.
234 //
235 // It is rare to Close a Client, as the Client is meant to be
236 // long-lived and shared between many goroutines.
237 func (c *baseClient) Close() error {
238         var firstErr error
239         if c.onClose != nil {
240                 if err := c.onClose(); err != nil && firstErr == nil {
241                         firstErr = err
242                 }
243         }
244         if err := c.connPool.Close(); err != nil && firstErr == nil {
245                 firstErr = err
246         }
247         return firstErr
248 }
249
250 func (c *baseClient) getAddr() string {
251         return c.opt.Addr
252 }
253
254 func (c *baseClient) WrapProcessPipeline(
255         fn func(oldProcess func([]Cmder) error) func([]Cmder) error,
256 ) {
257         c.processPipeline = fn(c.processPipeline)
258         c.processTxPipeline = fn(c.processTxPipeline)
259 }
260
261 func (c *baseClient) defaultProcessPipeline(cmds []Cmder) error {
262         return c.generalProcessPipeline(cmds, c.pipelineProcessCmds)
263 }
264
265 func (c *baseClient) defaultProcessTxPipeline(cmds []Cmder) error {
266         return c.generalProcessPipeline(cmds, c.txPipelineProcessCmds)
267 }
268
269 type pipelineProcessor func(*pool.Conn, []Cmder) (bool, error)
270
271 func (c *baseClient) generalProcessPipeline(cmds []Cmder, p pipelineProcessor) error {
272         for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
273                 if attempt > 0 {
274                         time.Sleep(c.retryBackoff(attempt))
275                 }
276
277                 cn, err := c.getConn()
278                 if err != nil {
279                         setCmdsErr(cmds, err)
280                         return err
281                 }
282
283                 canRetry, err := p(cn, cmds)
284                 c.releaseConnStrict(cn, err)
285
286                 if !canRetry || !internal.IsRetryableError(err, true) {
287                         break
288                 }
289         }
290         return cmdsFirstErr(cmds)
291 }
292
293 func (c *baseClient) pipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
294         err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
295                 return writeCmd(wr, cmds...)
296         })
297         if err != nil {
298                 setCmdsErr(cmds, err)
299                 return true, err
300         }
301
302         err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
303                 return pipelineReadCmds(rd, cmds)
304         })
305         return true, err
306 }
307
308 func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
309         for _, cmd := range cmds {
310                 err := cmd.readReply(rd)
311                 if err != nil && !internal.IsRedisError(err) {
312                         return err
313                 }
314         }
315         return nil
316 }
317
318 func (c *baseClient) txPipelineProcessCmds(cn *pool.Conn, cmds []Cmder) (bool, error) {
319         err := cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
320                 return txPipelineWriteMulti(wr, cmds)
321         })
322         if err != nil {
323                 setCmdsErr(cmds, err)
324                 return true, err
325         }
326
327         err = cn.WithReader(c.opt.ReadTimeout, func(rd *proto.Reader) error {
328                 err := txPipelineReadQueued(rd, cmds)
329                 if err != nil {
330                         setCmdsErr(cmds, err)
331                         return err
332                 }
333                 return pipelineReadCmds(rd, cmds)
334         })
335         return false, err
336 }
337
338 func txPipelineWriteMulti(wr *proto.Writer, cmds []Cmder) error {
339         multiExec := make([]Cmder, 0, len(cmds)+2)
340         multiExec = append(multiExec, NewStatusCmd("MULTI"))
341         multiExec = append(multiExec, cmds...)
342         multiExec = append(multiExec, NewSliceCmd("EXEC"))
343         return writeCmd(wr, multiExec...)
344 }
345
346 func txPipelineReadQueued(rd *proto.Reader, cmds []Cmder) error {
347         // Parse queued replies.
348         var statusCmd StatusCmd
349         err := statusCmd.readReply(rd)
350         if err != nil {
351                 return err
352         }
353
354         for range cmds {
355                 err = statusCmd.readReply(rd)
356                 if err != nil && !internal.IsRedisError(err) {
357                         return err
358                 }
359         }
360
361         // Parse number of replies.
362         line, err := rd.ReadLine()
363         if err != nil {
364                 if err == Nil {
365                         err = TxFailedErr
366                 }
367                 return err
368         }
369
370         switch line[0] {
371         case proto.ErrorReply:
372                 return proto.ParseErrorReply(line)
373         case proto.ArrayReply:
374                 // ok
375         default:
376                 err := fmt.Errorf("redis: expected '*', but got line %q", line)
377                 return err
378         }
379
380         return nil
381 }
382
383 //------------------------------------------------------------------------------
384
385 // Client is a Redis client representing a pool of zero or more
386 // underlying connections. It's safe for concurrent use by multiple
387 // goroutines.
388 type Client struct {
389         baseClient
390         cmdable
391
392         ctx context.Context
393 }
394
395 // NewClient returns a client to the Redis Server specified by Options.
396 func NewClient(opt *Options) *Client {
397         opt.init()
398
399         c := Client{
400                 baseClient: baseClient{
401                         opt:      opt,
402                         connPool: newConnPool(opt),
403                 },
404         }
405         c.baseClient.init()
406         c.init()
407
408         return &c
409 }
410
411 func (c *Client) init() {
412         c.cmdable.setProcessor(c.Process)
413 }
414
415 func (c *Client) Context() context.Context {
416         if c.ctx != nil {
417                 return c.ctx
418         }
419         return context.Background()
420 }
421
422 func (c *Client) WithContext(ctx context.Context) *Client {
423         if ctx == nil {
424                 panic("nil context")
425         }
426         c2 := c.clone()
427         c2.ctx = ctx
428         return c2
429 }
430
431 func (c *Client) clone() *Client {
432         cp := *c
433         cp.init()
434         return &cp
435 }
436
437 // Options returns read-only Options that were used to create the client.
438 func (c *Client) Options() *Options {
439         return c.opt
440 }
441
442 func (c *Client) SetLimiter(l Limiter) *Client {
443         c.limiter = l
444         return c
445 }
446
447 type PoolStats pool.Stats
448
449 // PoolStats returns connection pool stats.
450 func (c *Client) PoolStats() *PoolStats {
451         stats := c.connPool.Stats()
452         return (*PoolStats)(stats)
453 }
454
455 func (c *Client) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
456         return c.Pipeline().Pipelined(fn)
457 }
458
459 func (c *Client) Pipeline() Pipeliner {
460         pipe := Pipeline{
461                 exec: c.processPipeline,
462         }
463         pipe.statefulCmdable.setProcessor(pipe.Process)
464         return &pipe
465 }
466
467 func (c *Client) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
468         return c.TxPipeline().Pipelined(fn)
469 }
470
471 // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
472 func (c *Client) TxPipeline() Pipeliner {
473         pipe := Pipeline{
474                 exec: c.processTxPipeline,
475         }
476         pipe.statefulCmdable.setProcessor(pipe.Process)
477         return &pipe
478 }
479
480 func (c *Client) pubSub() *PubSub {
481         pubsub := &PubSub{
482                 opt: c.opt,
483
484                 newConn: func(channels []string) (*pool.Conn, error) {
485                         return c.newConn()
486                 },
487                 closeConn: c.connPool.CloseConn,
488         }
489         pubsub.init()
490         return pubsub
491 }
492
493 // Subscribe subscribes the client to the specified channels.
494 // Channels can be omitted to create empty subscription.
495 // Note that this method does not wait on a response from Redis, so the
496 // subscription may not be active immediately. To force the connection to wait,
497 // you may call the Receive() method on the returned *PubSub like so:
498 //
499 //    sub := client.Subscribe(queryResp)
500 //    iface, err := sub.Receive()
501 //    if err != nil {
502 //        // handle error
503 //    }
504 //
505 //    // Should be *Subscription, but others are possible if other actions have been
506 //    // taken on sub since it was created.
507 //    switch iface.(type) {
508 //    case *Subscription:
509 //        // subscribe succeeded
510 //    case *Message:
511 //        // received first message
512 //    case *Pong:
513 //        // pong received
514 //    default:
515 //        // handle error
516 //    }
517 //
518 //    ch := sub.Channel()
519 func (c *Client) Subscribe(channels ...string) *PubSub {
520         pubsub := c.pubSub()
521         if len(channels) > 0 {
522                 _ = pubsub.Subscribe(channels...)
523         }
524         return pubsub
525 }
526
527 // PSubscribe subscribes the client to the given patterns.
528 // Patterns can be omitted to create empty subscription.
529 func (c *Client) PSubscribe(channels ...string) *PubSub {
530         pubsub := c.pubSub()
531         if len(channels) > 0 {
532                 _ = pubsub.PSubscribe(channels...)
533         }
534         return pubsub
535 }
536
537 //------------------------------------------------------------------------------
538
539 // Conn is like Client, but its pool contains single connection.
540 type Conn struct {
541         baseClient
542         statefulCmdable
543 }
544
545 func newConn(opt *Options, cn *pool.Conn) *Conn {
546         c := Conn{
547                 baseClient: baseClient{
548                         opt:      opt,
549                         connPool: pool.NewSingleConnPool(cn),
550                 },
551         }
552         c.baseClient.init()
553         c.statefulCmdable.setProcessor(c.Process)
554         return &c
555 }
556
557 func (c *Conn) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
558         return c.Pipeline().Pipelined(fn)
559 }
560
561 func (c *Conn) Pipeline() Pipeliner {
562         pipe := Pipeline{
563                 exec: c.processPipeline,
564         }
565         pipe.statefulCmdable.setProcessor(pipe.Process)
566         return &pipe
567 }
568
569 func (c *Conn) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
570         return c.TxPipeline().Pipelined(fn)
571 }
572
573 // TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
574 func (c *Conn) TxPipeline() Pipeliner {
575         pipe := Pipeline{
576                 exec: c.processTxPipeline,
577         }
578         pipe.statefulCmdable.setProcessor(pipe.Process)
579         return &pipe
580 }