src: Add DMA localagent
[barometer.git] / src / dma / vendor / github.com / go-redis / redis / pubsub.go
1 package redis
2
3 import (
4         "fmt"
5         "sync"
6         "time"
7
8         "github.com/go-redis/redis/internal"
9         "github.com/go-redis/redis/internal/pool"
10 )
11
12 // PubSub implements Pub/Sub commands as described in
13 // http://redis.io/topics/pubsub. Message receiving is NOT safe
14 // for concurrent use by multiple goroutines.
15 //
16 // PubSub automatically reconnects to Redis Server and resubscribes
17 // to the channels in case of network errors.
18 type PubSub struct {
19         opt *Options
20
21         newConn   func([]string) (*pool.Conn, error)
22         closeConn func(*pool.Conn) error
23
24         mu       sync.Mutex
25         cn       *pool.Conn
26         channels map[string]struct{}
27         patterns map[string]struct{}
28         closed   bool
29         exit     chan struct{}
30
31         cmd *Cmd
32
33         chOnce sync.Once
34         ch     chan *Message
35         ping   chan struct{}
36 }
37
38 func (c *PubSub) init() {
39         c.exit = make(chan struct{})
40 }
41
42 func (c *PubSub) conn() (*pool.Conn, error) {
43         c.mu.Lock()
44         cn, err := c._conn(nil)
45         c.mu.Unlock()
46         return cn, err
47 }
48
49 func (c *PubSub) _conn(channels []string) (*pool.Conn, error) {
50         if c.closed {
51                 return nil, pool.ErrClosed
52         }
53
54         if c.cn != nil {
55                 return c.cn, nil
56         }
57
58         cn, err := c.newConn(channels)
59         if err != nil {
60                 return nil, err
61         }
62
63         if err := c.resubscribe(cn); err != nil {
64                 _ = c.closeConn(cn)
65                 return nil, err
66         }
67
68         c.cn = cn
69         return cn, nil
70 }
71
72 func (c *PubSub) resubscribe(cn *pool.Conn) error {
73         var firstErr error
74
75         if len(c.channels) > 0 {
76                 channels := mapKeys(c.channels)
77                 err := c._subscribe(cn, "subscribe", channels...)
78                 if err != nil && firstErr == nil {
79                         firstErr = err
80                 }
81         }
82
83         if len(c.patterns) > 0 {
84                 patterns := mapKeys(c.patterns)
85                 err := c._subscribe(cn, "psubscribe", patterns...)
86                 if err != nil && firstErr == nil {
87                         firstErr = err
88                 }
89         }
90
91         return firstErr
92 }
93
94 func mapKeys(m map[string]struct{}) []string {
95         s := make([]string, len(m))
96         i := 0
97         for k := range m {
98                 s[i] = k
99                 i++
100         }
101         return s
102 }
103
104 func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error {
105         args := make([]interface{}, 1+len(channels))
106         args[0] = redisCmd
107         for i, channel := range channels {
108                 args[1+i] = channel
109         }
110         cmd := NewSliceCmd(args...)
111
112         cn.SetWriteTimeout(c.opt.WriteTimeout)
113         return writeCmd(cn, cmd)
114 }
115
116 func (c *PubSub) releaseConn(cn *pool.Conn, err error) {
117         c.mu.Lock()
118         c._releaseConn(cn, err)
119         c.mu.Unlock()
120 }
121
122 func (c *PubSub) _releaseConn(cn *pool.Conn, err error) {
123         if c.cn != cn {
124                 return
125         }
126         if internal.IsBadConn(err, true) {
127                 c._reconnect()
128         }
129 }
130
131 func (c *PubSub) _closeTheCn() error {
132         var err error
133         if c.cn != nil {
134                 err = c.closeConn(c.cn)
135                 c.cn = nil
136         }
137         return err
138 }
139
140 func (c *PubSub) reconnect() {
141         c.mu.Lock()
142         c._reconnect()
143         c.mu.Unlock()
144 }
145
146 func (c *PubSub) _reconnect() {
147         _ = c._closeTheCn()
148         _, _ = c._conn(nil)
149 }
150
151 func (c *PubSub) Close() error {
152         c.mu.Lock()
153         defer c.mu.Unlock()
154
155         if c.closed {
156                 return pool.ErrClosed
157         }
158         c.closed = true
159         close(c.exit)
160
161         err := c._closeTheCn()
162         return err
163 }
164
165 // Subscribe the client to the specified channels. It returns
166 // empty subscription if there are no channels.
167 func (c *PubSub) Subscribe(channels ...string) error {
168         c.mu.Lock()
169         defer c.mu.Unlock()
170
171         err := c.subscribe("subscribe", channels...)
172         if c.channels == nil {
173                 c.channels = make(map[string]struct{})
174         }
175         for _, channel := range channels {
176                 c.channels[channel] = struct{}{}
177         }
178         return err
179 }
180
181 // PSubscribe the client to the given patterns. It returns
182 // empty subscription if there are no patterns.
183 func (c *PubSub) PSubscribe(patterns ...string) error {
184         c.mu.Lock()
185         defer c.mu.Unlock()
186
187         err := c.subscribe("psubscribe", patterns...)
188         if c.patterns == nil {
189                 c.patterns = make(map[string]struct{})
190         }
191         for _, pattern := range patterns {
192                 c.patterns[pattern] = struct{}{}
193         }
194         return err
195 }
196
197 // Unsubscribe the client from the given channels, or from all of
198 // them if none is given.
199 func (c *PubSub) Unsubscribe(channels ...string) error {
200         c.mu.Lock()
201         defer c.mu.Unlock()
202
203         err := c.subscribe("unsubscribe", channels...)
204         for _, channel := range channels {
205                 delete(c.channels, channel)
206         }
207         return err
208 }
209
210 // PUnsubscribe the client from the given patterns, or from all of
211 // them if none is given.
212 func (c *PubSub) PUnsubscribe(patterns ...string) error {
213         c.mu.Lock()
214         defer c.mu.Unlock()
215
216         err := c.subscribe("punsubscribe", patterns...)
217         for _, pattern := range patterns {
218                 delete(c.patterns, pattern)
219         }
220         return err
221 }
222
223 func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
224         cn, err := c._conn(channels)
225         if err != nil {
226                 return err
227         }
228
229         err = c._subscribe(cn, redisCmd, channels...)
230         c._releaseConn(cn, err)
231         return err
232 }
233
234 func (c *PubSub) Ping(payload ...string) error {
235         args := []interface{}{"ping"}
236         if len(payload) == 1 {
237                 args = append(args, payload[0])
238         }
239         cmd := NewCmd(args...)
240
241         cn, err := c.conn()
242         if err != nil {
243                 return err
244         }
245
246         cn.SetWriteTimeout(c.opt.WriteTimeout)
247         err = writeCmd(cn, cmd)
248         c.releaseConn(cn, err)
249         return err
250 }
251
252 // Subscription received after a successful subscription to channel.
253 type Subscription struct {
254         // Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
255         Kind string
256         // Channel name we have subscribed to.
257         Channel string
258         // Number of channels we are currently subscribed to.
259         Count int
260 }
261
262 func (m *Subscription) String() string {
263         return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
264 }
265
266 // Message received as result of a PUBLISH command issued by another client.
267 type Message struct {
268         Channel string
269         Pattern string
270         Payload string
271 }
272
273 func (m *Message) String() string {
274         return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
275 }
276
277 // Pong received as result of a PING command issued by another client.
278 type Pong struct {
279         Payload string
280 }
281
282 func (p *Pong) String() string {
283         if p.Payload != "" {
284                 return fmt.Sprintf("Pong<%s>", p.Payload)
285         }
286         return "Pong"
287 }
288
289 func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
290         switch reply := reply.(type) {
291         case string:
292                 return &Pong{
293                         Payload: reply,
294                 }, nil
295         case []interface{}:
296                 switch kind := reply[0].(string); kind {
297                 case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
298                         return &Subscription{
299                                 Kind:    kind,
300                                 Channel: reply[1].(string),
301                                 Count:   int(reply[2].(int64)),
302                         }, nil
303                 case "message":
304                         return &Message{
305                                 Channel: reply[1].(string),
306                                 Payload: reply[2].(string),
307                         }, nil
308                 case "pmessage":
309                         return &Message{
310                                 Pattern: reply[1].(string),
311                                 Channel: reply[2].(string),
312                                 Payload: reply[3].(string),
313                         }, nil
314                 case "pong":
315                         return &Pong{
316                                 Payload: reply[1].(string),
317                         }, nil
318                 default:
319                         return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
320                 }
321         default:
322                 return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", reply)
323         }
324 }
325
326 // ReceiveTimeout acts like Receive but returns an error if message
327 // is not received in time. This is low-level API and in most cases
328 // Channel should be used instead.
329 func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
330         if c.cmd == nil {
331                 c.cmd = NewCmd()
332         }
333
334         cn, err := c.conn()
335         if err != nil {
336                 return nil, err
337         }
338
339         cn.SetReadTimeout(timeout)
340         err = c.cmd.readReply(cn)
341         c.releaseConn(cn, err)
342         if err != nil {
343                 return nil, err
344         }
345
346         return c.newMessage(c.cmd.Val())
347 }
348
349 // Receive returns a message as a Subscription, Message, Pong or error.
350 // See PubSub example for details. This is low-level API and in most cases
351 // Channel should be used instead.
352 func (c *PubSub) Receive() (interface{}, error) {
353         return c.ReceiveTimeout(0)
354 }
355
356 // ReceiveMessage returns a Message or error ignoring Subscription and Pong
357 // messages. This is low-level API and in most cases Channel should be used
358 // instead.
359 func (c *PubSub) ReceiveMessage() (*Message, error) {
360         for {
361                 msg, err := c.Receive()
362                 if err != nil {
363                         return nil, err
364                 }
365
366                 switch msg := msg.(type) {
367                 case *Subscription:
368                         // Ignore.
369                 case *Pong:
370                         // Ignore.
371                 case *Message:
372                         return msg, nil
373                 default:
374                         err := fmt.Errorf("redis: unknown message: %T", msg)
375                         return nil, err
376                 }
377         }
378 }
379
380 // Channel returns a Go channel for concurrently receiving messages.
381 // It periodically sends Ping messages to test connection health.
382 // The channel is closed with PubSub. Receive* APIs can not be used
383 // after channel is created.
384 func (c *PubSub) Channel() <-chan *Message {
385         c.chOnce.Do(c.initChannel)
386         return c.ch
387 }
388
389 func (c *PubSub) initChannel() {
390         c.ch = make(chan *Message, 100)
391         c.ping = make(chan struct{}, 10)
392
393         go func() {
394                 var errCount int
395                 for {
396                         msg, err := c.Receive()
397                         if err != nil {
398                                 if err == pool.ErrClosed {
399                                         close(c.ch)
400                                         return
401                                 }
402                                 if errCount > 0 {
403                                         time.Sleep(c.retryBackoff(errCount))
404                                 }
405                                 errCount++
406                                 continue
407                         }
408                         errCount = 0
409
410                         // Any message is as good as a ping.
411                         select {
412                         case c.ping <- struct{}{}:
413                         default:
414                         }
415
416                         switch msg := msg.(type) {
417                         case *Subscription:
418                                 // Ignore.
419                         case *Pong:
420                                 // Ignore.
421                         case *Message:
422                                 c.ch <- msg
423                         default:
424                                 internal.Logf("redis: unknown message: %T", msg)
425                         }
426                 }
427         }()
428
429         go func() {
430                 const timeout = 5 * time.Second
431
432                 timer := time.NewTimer(timeout)
433                 timer.Stop()
434
435                 var hasPing bool
436                 for {
437                         timer.Reset(timeout)
438                         select {
439                         case <-c.ping:
440                                 hasPing = true
441                                 if !timer.Stop() {
442                                         <-timer.C
443                                 }
444                         case <-timer.C:
445                                 if hasPing {
446                                         hasPing = false
447                                         _ = c.Ping()
448                                 } else {
449                                         c.reconnect()
450                                 }
451                         case <-c.exit:
452                                 return
453                         }
454                 }
455         }()
456 }
457
458 func (c *PubSub) retryBackoff(attempt int) time.Duration {
459         return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
460 }