barometer: update DMA's vendoring packages
[barometer.git] / src / dma / vendor / github.com / go-redis / redis / pubsub.go
1 package redis
2
3 import (
4         "errors"
5         "fmt"
6         "sync"
7         "time"
8
9         "github.com/go-redis/redis/internal"
10         "github.com/go-redis/redis/internal/pool"
11         "github.com/go-redis/redis/internal/proto"
12 )
13
14 var errPingTimeout = errors.New("redis: ping timeout")
15
16 // PubSub implements Pub/Sub commands bas described in
17 // http://redis.io/topics/pubsub. Message receiving is NOT safe
18 // for concurrent use by multiple goroutines.
19 //
20 // PubSub automatically reconnects to Redis Server and resubscribes
21 // to the channels in case of network errors.
22 type PubSub struct {
23         opt *Options
24
25         newConn   func([]string) (*pool.Conn, error)
26         closeConn func(*pool.Conn) error
27
28         mu       sync.Mutex
29         cn       *pool.Conn
30         channels map[string]struct{}
31         patterns map[string]struct{}
32         closed   bool
33         exit     chan struct{}
34
35         cmd *Cmd
36
37         chOnce sync.Once
38         ch     chan *Message
39         ping   chan struct{}
40 }
41
42 func (c *PubSub) init() {
43         c.exit = make(chan struct{})
44 }
45
46 func (c *PubSub) conn() (*pool.Conn, error) {
47         c.mu.Lock()
48         cn, err := c._conn(nil)
49         c.mu.Unlock()
50         return cn, err
51 }
52
53 func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) {
54         if c.closed {
55                 return nil, pool.ErrClosed
56         }
57         if c.cn != nil {
58                 return c.cn, nil
59         }
60
61         channels := mapKeys(c.channels)
62         channels = append(channels, newChannels...)
63
64         cn, err := c.newConn(channels)
65         if err != nil {
66                 return nil, err
67         }
68
69         if err := c.resubscribe(cn); err != nil {
70                 _ = c.closeConn(cn)
71                 return nil, err
72         }
73
74         c.cn = cn
75         return cn, nil
76 }
77
78 func (c *PubSub) writeCmd(cn *pool.Conn, cmd Cmder) error {
79         return cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
80                 return writeCmd(wr, cmd)
81         })
82 }
83
84 func (c *PubSub) resubscribe(cn *pool.Conn) error {
85         var firstErr error
86
87         if len(c.channels) > 0 {
88                 err := c._subscribe(cn, "subscribe", mapKeys(c.channels))
89                 if err != nil && firstErr == nil {
90                         firstErr = err
91                 }
92         }
93
94         if len(c.patterns) > 0 {
95                 err := c._subscribe(cn, "psubscribe", mapKeys(c.patterns))
96                 if err != nil && firstErr == nil {
97                         firstErr = err
98                 }
99         }
100
101         return firstErr
102 }
103
104 func mapKeys(m map[string]struct{}) []string {
105         s := make([]string, len(m))
106         i := 0
107         for k := range m {
108                 s[i] = k
109                 i++
110         }
111         return s
112 }
113
114 func (c *PubSub) _subscribe(
115         cn *pool.Conn, redisCmd string, channels []string,
116 ) error {
117         args := make([]interface{}, 0, 1+len(channels))
118         args = append(args, redisCmd)
119         for _, channel := range channels {
120                 args = append(args, channel)
121         }
122         cmd := NewSliceCmd(args...)
123         return c.writeCmd(cn, cmd)
124 }
125
126 func (c *PubSub) releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
127         c.mu.Lock()
128         c._releaseConn(cn, err, allowTimeout)
129         c.mu.Unlock()
130 }
131
132 func (c *PubSub) _releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
133         if c.cn != cn {
134                 return
135         }
136         if internal.IsBadConn(err, allowTimeout) {
137                 c._reconnect(err)
138         }
139 }
140
141 func (c *PubSub) _reconnect(reason error) {
142         _ = c._closeTheCn(reason)
143         _, _ = c._conn(nil)
144 }
145
146 func (c *PubSub) _closeTheCn(reason error) error {
147         if c.cn == nil {
148                 return nil
149         }
150         if !c.closed {
151                 internal.Logf("redis: discarding bad PubSub connection: %s", reason)
152         }
153         err := c.closeConn(c.cn)
154         c.cn = nil
155         return err
156 }
157
158 func (c *PubSub) Close() error {
159         c.mu.Lock()
160         defer c.mu.Unlock()
161
162         if c.closed {
163                 return pool.ErrClosed
164         }
165         c.closed = true
166         close(c.exit)
167
168         err := c._closeTheCn(pool.ErrClosed)
169         return err
170 }
171
172 // Subscribe the client to the specified channels. It returns
173 // empty subscription if there are no channels.
174 func (c *PubSub) Subscribe(channels ...string) error {
175         c.mu.Lock()
176         defer c.mu.Unlock()
177
178         err := c.subscribe("subscribe", channels...)
179         if c.channels == nil {
180                 c.channels = make(map[string]struct{})
181         }
182         for _, s := range channels {
183                 c.channels[s] = struct{}{}
184         }
185         return err
186 }
187
188 // PSubscribe the client to the given patterns. It returns
189 // empty subscription if there are no patterns.
190 func (c *PubSub) PSubscribe(patterns ...string) error {
191         c.mu.Lock()
192         defer c.mu.Unlock()
193
194         err := c.subscribe("psubscribe", patterns...)
195         if c.patterns == nil {
196                 c.patterns = make(map[string]struct{})
197         }
198         for _, s := range patterns {
199                 c.patterns[s] = struct{}{}
200         }
201         return err
202 }
203
204 // Unsubscribe the client from the given channels, or from all of
205 // them if none is given.
206 func (c *PubSub) Unsubscribe(channels ...string) error {
207         c.mu.Lock()
208         defer c.mu.Unlock()
209
210         for _, channel := range channels {
211                 delete(c.channels, channel)
212         }
213         err := c.subscribe("unsubscribe", channels...)
214         return err
215 }
216
217 // PUnsubscribe the client from the given patterns, or from all of
218 // them if none is given.
219 func (c *PubSub) PUnsubscribe(patterns ...string) error {
220         c.mu.Lock()
221         defer c.mu.Unlock()
222
223         for _, pattern := range patterns {
224                 delete(c.patterns, pattern)
225         }
226         err := c.subscribe("punsubscribe", patterns...)
227         return err
228 }
229
230 func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
231         cn, err := c._conn(channels)
232         if err != nil {
233                 return err
234         }
235
236         err = c._subscribe(cn, redisCmd, channels)
237         c._releaseConn(cn, err, false)
238         return err
239 }
240
241 func (c *PubSub) Ping(payload ...string) error {
242         args := []interface{}{"ping"}
243         if len(payload) == 1 {
244                 args = append(args, payload[0])
245         }
246         cmd := NewCmd(args...)
247
248         cn, err := c.conn()
249         if err != nil {
250                 return err
251         }
252
253         err = c.writeCmd(cn, cmd)
254         c.releaseConn(cn, err, false)
255         return err
256 }
257
258 // Subscription received after a successful subscription to channel.
259 type Subscription struct {
260         // Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
261         Kind string
262         // Channel name we have subscribed to.
263         Channel string
264         // Number of channels we are currently subscribed to.
265         Count int
266 }
267
268 func (m *Subscription) String() string {
269         return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
270 }
271
272 // Message received as result of a PUBLISH command issued by another client.
273 type Message struct {
274         Channel string
275         Pattern string
276         Payload string
277 }
278
279 func (m *Message) String() string {
280         return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
281 }
282
283 // Pong received as result of a PING command issued by another client.
284 type Pong struct {
285         Payload string
286 }
287
288 func (p *Pong) String() string {
289         if p.Payload != "" {
290                 return fmt.Sprintf("Pong<%s>", p.Payload)
291         }
292         return "Pong"
293 }
294
295 func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
296         switch reply := reply.(type) {
297         case string:
298                 return &Pong{
299                         Payload: reply,
300                 }, nil
301         case []interface{}:
302                 switch kind := reply[0].(string); kind {
303                 case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
304                         return &Subscription{
305                                 Kind:    kind,
306                                 Channel: reply[1].(string),
307                                 Count:   int(reply[2].(int64)),
308                         }, nil
309                 case "message":
310                         return &Message{
311                                 Channel: reply[1].(string),
312                                 Payload: reply[2].(string),
313                         }, nil
314                 case "pmessage":
315                         return &Message{
316                                 Pattern: reply[1].(string),
317                                 Channel: reply[2].(string),
318                                 Payload: reply[3].(string),
319                         }, nil
320                 case "pong":
321                         return &Pong{
322                                 Payload: reply[1].(string),
323                         }, nil
324                 default:
325                         return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
326                 }
327         default:
328                 return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", reply)
329         }
330 }
331
332 // ReceiveTimeout acts like Receive but returns an error if message
333 // is not received in time. This is low-level API and in most cases
334 // Channel should be used instead.
335 func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
336         if c.cmd == nil {
337                 c.cmd = NewCmd()
338         }
339
340         cn, err := c.conn()
341         if err != nil {
342                 return nil, err
343         }
344
345         err = cn.WithReader(timeout, func(rd *proto.Reader) error {
346                 return c.cmd.readReply(rd)
347         })
348
349         c.releaseConn(cn, err, timeout > 0)
350         if err != nil {
351                 return nil, err
352         }
353
354         return c.newMessage(c.cmd.Val())
355 }
356
357 // Receive returns a message as a Subscription, Message, Pong or error.
358 // See PubSub example for details. This is low-level API and in most cases
359 // Channel should be used instead.
360 func (c *PubSub) Receive() (interface{}, error) {
361         return c.ReceiveTimeout(0)
362 }
363
364 // ReceiveMessage returns a Message or error ignoring Subscription and Pong
365 // messages. This is low-level API and in most cases Channel should be used
366 // instead.
367 func (c *PubSub) ReceiveMessage() (*Message, error) {
368         for {
369                 msg, err := c.Receive()
370                 if err != nil {
371                         return nil, err
372                 }
373
374                 switch msg := msg.(type) {
375                 case *Subscription:
376                         // Ignore.
377                 case *Pong:
378                         // Ignore.
379                 case *Message:
380                         return msg, nil
381                 default:
382                         err := fmt.Errorf("redis: unknown message: %T", msg)
383                         return nil, err
384                 }
385         }
386 }
387
388 // Channel returns a Go channel for concurrently receiving messages.
389 // It periodically sends Ping messages to test connection health.
390 // The channel is closed with PubSub. Receive* APIs can not be used
391 // after channel is created.
392 func (c *PubSub) Channel() <-chan *Message {
393         c.chOnce.Do(c.initChannel)
394         return c.ch
395 }
396
397 func (c *PubSub) initChannel() {
398         c.ch = make(chan *Message, 100)
399         c.ping = make(chan struct{}, 10)
400
401         go func() {
402                 var errCount int
403                 for {
404                         msg, err := c.Receive()
405                         if err != nil {
406                                 if err == pool.ErrClosed {
407                                         close(c.ch)
408                                         return
409                                 }
410                                 if errCount > 0 {
411                                         time.Sleep(c.retryBackoff(errCount))
412                                 }
413                                 errCount++
414                                 continue
415                         }
416                         errCount = 0
417
418                         // Any message is as good as a ping.
419                         select {
420                         case c.ping <- struct{}{}:
421                         default:
422                         }
423
424                         switch msg := msg.(type) {
425                         case *Subscription:
426                                 // Ignore.
427                         case *Pong:
428                                 // Ignore.
429                         case *Message:
430                                 c.ch <- msg
431                         default:
432                                 internal.Logf("redis: unknown message: %T", msg)
433                         }
434                 }
435         }()
436
437         go func() {
438                 const timeout = 5 * time.Second
439
440                 timer := time.NewTimer(timeout)
441                 timer.Stop()
442
443                 healthy := true
444                 for {
445                         timer.Reset(timeout)
446                         select {
447                         case <-c.ping:
448                                 healthy = true
449                                 if !timer.Stop() {
450                                         <-timer.C
451                                 }
452                         case <-timer.C:
453                                 pingErr := c.Ping()
454                                 if healthy {
455                                         healthy = false
456                                 } else {
457                                         if pingErr == nil {
458                                                 pingErr = errPingTimeout
459                                         }
460                                         c.mu.Lock()
461                                         c._reconnect(pingErr)
462                                         c.mu.Unlock()
463                                 }
464                         case <-c.exit:
465                                 return
466                         }
467                 }
468         }()
469 }
470
471 func (c *PubSub) retryBackoff(attempt int) time.Duration {
472         return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
473 }