barometer: update DMA's vendoring packages
[barometer.git] / src / dma / vendor / github.com / go-redis / redis / pipeline.go
1 package redis
2
3 import (
4         "sync"
5
6         "github.com/go-redis/redis/internal/pool"
7 )
8
9 type pipelineExecer func([]Cmder) error
10
11 type Pipeliner interface {
12         StatefulCmdable
13         Do(args ...interface{}) *Cmd
14         Process(cmd Cmder) error
15         Close() error
16         Discard() error
17         Exec() ([]Cmder, error)
18 }
19
20 var _ Pipeliner = (*Pipeline)(nil)
21
22 // Pipeline implements pipelining as described in
23 // http://redis.io/topics/pipelining. It's safe for concurrent use
24 // by multiple goroutines.
25 type Pipeline struct {
26         statefulCmdable
27
28         exec pipelineExecer
29
30         mu     sync.Mutex
31         cmds   []Cmder
32         closed bool
33 }
34
35 func (c *Pipeline) Do(args ...interface{}) *Cmd {
36         cmd := NewCmd(args...)
37         _ = c.Process(cmd)
38         return cmd
39 }
40
41 // Process queues the cmd for later execution.
42 func (c *Pipeline) Process(cmd Cmder) error {
43         c.mu.Lock()
44         c.cmds = append(c.cmds, cmd)
45         c.mu.Unlock()
46         return nil
47 }
48
49 // Close closes the pipeline, releasing any open resources.
50 func (c *Pipeline) Close() error {
51         c.mu.Lock()
52         c.discard()
53         c.closed = true
54         c.mu.Unlock()
55         return nil
56 }
57
58 // Discard resets the pipeline and discards queued commands.
59 func (c *Pipeline) Discard() error {
60         c.mu.Lock()
61         err := c.discard()
62         c.mu.Unlock()
63         return err
64 }
65
66 func (c *Pipeline) discard() error {
67         if c.closed {
68                 return pool.ErrClosed
69         }
70         c.cmds = c.cmds[:0]
71         return nil
72 }
73
74 // Exec executes all previously queued commands using one
75 // client-server roundtrip.
76 //
77 // Exec always returns list of commands and error of the first failed
78 // command if any.
79 func (c *Pipeline) Exec() ([]Cmder, error) {
80         c.mu.Lock()
81         defer c.mu.Unlock()
82
83         if c.closed {
84                 return nil, pool.ErrClosed
85         }
86
87         if len(c.cmds) == 0 {
88                 return nil, nil
89         }
90
91         cmds := c.cmds
92         c.cmds = nil
93
94         return cmds, c.exec(cmds)
95 }
96
97 func (c *Pipeline) pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
98         if err := fn(c); err != nil {
99                 return nil, err
100         }
101         cmds, err := c.Exec()
102         _ = c.Close()
103         return cmds, err
104 }
105
106 func (c *Pipeline) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
107         return c.pipelined(fn)
108 }
109
110 func (c *Pipeline) Pipeline() Pipeliner {
111         return c
112 }
113
114 func (c *Pipeline) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
115         return c.pipelined(fn)
116 }
117
118 func (c *Pipeline) TxPipeline() Pipeliner {
119         return c
120 }