6 "github.com/go-redis/redis/internal/pool"
9 type pipelineExecer func([]Cmder) error
11 type Pipeliner interface {
13 Do(args ...interface{}) *Cmd
14 Process(cmd Cmder) error
17 Exec() ([]Cmder, error)
20 var _ Pipeliner = (*Pipeline)(nil)
22 // Pipeline implements pipelining as described in
23 // http://redis.io/topics/pipelining. It's safe for concurrent use
24 // by multiple goroutines.
25 type Pipeline struct {
35 func (c *Pipeline) Do(args ...interface{}) *Cmd {
36 cmd := NewCmd(args...)
41 // Process queues the cmd for later execution.
42 func (c *Pipeline) Process(cmd Cmder) error {
44 c.cmds = append(c.cmds, cmd)
49 // Close closes the pipeline, releasing any open resources.
50 func (c *Pipeline) Close() error {
58 // Discard resets the pipeline and discards queued commands.
59 func (c *Pipeline) Discard() error {
66 func (c *Pipeline) discard() error {
74 // Exec executes all previously queued commands using one
75 // client-server roundtrip.
77 // Exec always returns list of commands and error of the first failed
79 func (c *Pipeline) Exec() ([]Cmder, error) {
84 return nil, pool.ErrClosed
94 return cmds, c.exec(cmds)
97 func (c *Pipeline) pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
98 if err := fn(c); err != nil {
101 cmds, err := c.Exec()
106 func (c *Pipeline) Pipelined(fn func(Pipeliner) error) ([]Cmder, error) {
107 return c.pipelined(fn)
110 func (c *Pipeline) Pipeline() Pipeliner {
114 func (c *Pipeline) TxPipelined(fn func(Pipeliner) error) ([]Cmder, error) {
115 return c.pipelined(fn)
118 func (c *Pipeline) TxPipeline() Pipeliner {