barometer: update DMA's vendoring packages
[barometer.git] / src / dma / vendor / github.com / go-redis / redis / sentinel.go
1 package redis
2
3 import (
4         "crypto/tls"
5         "errors"
6         "net"
7         "strings"
8         "sync"
9         "time"
10
11         "github.com/go-redis/redis/internal"
12         "github.com/go-redis/redis/internal/pool"
13 )
14
15 //------------------------------------------------------------------------------
16
17 // FailoverOptions are used to configure a failover client and should
18 // be passed to NewFailoverClient.
19 type FailoverOptions struct {
20         // The master name.
21         MasterName string
22         // A seed list of host:port addresses of sentinel nodes.
23         SentinelAddrs []string
24
25         // Following options are copied from Options struct.
26
27         OnConnect func(*Conn) error
28
29         Password string
30         DB       int
31
32         MaxRetries      int
33         MinRetryBackoff time.Duration
34         MaxRetryBackoff time.Duration
35
36         DialTimeout  time.Duration
37         ReadTimeout  time.Duration
38         WriteTimeout time.Duration
39
40         PoolSize           int
41         MinIdleConns       int
42         MaxConnAge         time.Duration
43         PoolTimeout        time.Duration
44         IdleTimeout        time.Duration
45         IdleCheckFrequency time.Duration
46
47         TLSConfig *tls.Config
48 }
49
50 func (opt *FailoverOptions) options() *Options {
51         return &Options{
52                 Addr: "FailoverClient",
53
54                 OnConnect: opt.OnConnect,
55
56                 DB:       opt.DB,
57                 Password: opt.Password,
58
59                 MaxRetries: opt.MaxRetries,
60
61                 DialTimeout:  opt.DialTimeout,
62                 ReadTimeout:  opt.ReadTimeout,
63                 WriteTimeout: opt.WriteTimeout,
64
65                 PoolSize:           opt.PoolSize,
66                 PoolTimeout:        opt.PoolTimeout,
67                 IdleTimeout:        opt.IdleTimeout,
68                 IdleCheckFrequency: opt.IdleCheckFrequency,
69
70                 TLSConfig: opt.TLSConfig,
71         }
72 }
73
74 // NewFailoverClient returns a Redis client that uses Redis Sentinel
75 // for automatic failover. It's safe for concurrent use by multiple
76 // goroutines.
77 func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
78         opt := failoverOpt.options()
79         opt.init()
80
81         failover := &sentinelFailover{
82                 masterName:    failoverOpt.MasterName,
83                 sentinelAddrs: failoverOpt.SentinelAddrs,
84
85                 opt: opt,
86         }
87
88         c := Client{
89                 baseClient: baseClient{
90                         opt:      opt,
91                         connPool: failover.Pool(),
92
93                         onClose: func() error {
94                                 return failover.Close()
95                         },
96                 },
97         }
98         c.baseClient.init()
99         c.cmdable.setProcessor(c.Process)
100
101         return &c
102 }
103
104 //------------------------------------------------------------------------------
105
106 type SentinelClient struct {
107         baseClient
108 }
109
110 func NewSentinelClient(opt *Options) *SentinelClient {
111         opt.init()
112         c := &SentinelClient{
113                 baseClient: baseClient{
114                         opt:      opt,
115                         connPool: newConnPool(opt),
116                 },
117         }
118         c.baseClient.init()
119         return c
120 }
121
122 func (c *SentinelClient) pubSub() *PubSub {
123         pubsub := &PubSub{
124                 opt: c.opt,
125
126                 newConn: func(channels []string) (*pool.Conn, error) {
127                         return c.newConn()
128                 },
129                 closeConn: c.connPool.CloseConn,
130         }
131         pubsub.init()
132         return pubsub
133 }
134
135 // Subscribe subscribes the client to the specified channels.
136 // Channels can be omitted to create empty subscription.
137 func (c *SentinelClient) Subscribe(channels ...string) *PubSub {
138         pubsub := c.pubSub()
139         if len(channels) > 0 {
140                 _ = pubsub.Subscribe(channels...)
141         }
142         return pubsub
143 }
144
145 // PSubscribe subscribes the client to the given patterns.
146 // Patterns can be omitted to create empty subscription.
147 func (c *SentinelClient) PSubscribe(channels ...string) *PubSub {
148         pubsub := c.pubSub()
149         if len(channels) > 0 {
150                 _ = pubsub.PSubscribe(channels...)
151         }
152         return pubsub
153 }
154
155 func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
156         cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name)
157         c.Process(cmd)
158         return cmd
159 }
160
161 func (c *SentinelClient) Sentinels(name string) *SliceCmd {
162         cmd := NewSliceCmd("sentinel", "sentinels", name)
163         c.Process(cmd)
164         return cmd
165 }
166
167 // Failover forces a failover as if the master was not reachable, and without
168 // asking for agreement to other Sentinels.
169 func (c *SentinelClient) Failover(name string) *StatusCmd {
170         cmd := NewStatusCmd("sentinel", "failover", name)
171         c.Process(cmd)
172         return cmd
173 }
174
175 // Reset resets all the masters with matching name. The pattern argument is a
176 // glob-style pattern. The reset process clears any previous state in a master
177 // (including a failover in progress), and removes every slave and sentinel
178 // already discovered and associated with the master.
179 func (c *SentinelClient) Reset(pattern string) *IntCmd {
180         cmd := NewIntCmd("sentinel", "reset", pattern)
181         c.Process(cmd)
182         return cmd
183 }
184
185 type sentinelFailover struct {
186         sentinelAddrs []string
187
188         opt *Options
189
190         pool     *pool.ConnPool
191         poolOnce sync.Once
192
193         mu          sync.RWMutex
194         masterName  string
195         _masterAddr string
196         sentinel    *SentinelClient
197         pubsub      *PubSub
198 }
199
200 func (c *sentinelFailover) Close() error {
201         c.mu.Lock()
202         defer c.mu.Unlock()
203         if c.sentinel != nil {
204                 return c.closeSentinel()
205         }
206         return nil
207 }
208
209 func (c *sentinelFailover) Pool() *pool.ConnPool {
210         c.poolOnce.Do(func() {
211                 c.opt.Dialer = c.dial
212                 c.pool = newConnPool(c.opt)
213         })
214         return c.pool
215 }
216
217 func (c *sentinelFailover) dial() (net.Conn, error) {
218         addr, err := c.MasterAddr()
219         if err != nil {
220                 return nil, err
221         }
222         return net.DialTimeout("tcp", addr, c.opt.DialTimeout)
223 }
224
225 func (c *sentinelFailover) MasterAddr() (string, error) {
226         addr, err := c.masterAddr()
227         if err != nil {
228                 return "", err
229         }
230         c.switchMaster(addr)
231         return addr, nil
232 }
233
234 func (c *sentinelFailover) masterAddr() (string, error) {
235         addr := c.getMasterAddr()
236         if addr != "" {
237                 return addr, nil
238         }
239
240         c.mu.Lock()
241         defer c.mu.Unlock()
242
243         for i, sentinelAddr := range c.sentinelAddrs {
244                 sentinel := NewSentinelClient(&Options{
245                         Addr: sentinelAddr,
246
247                         MaxRetries: c.opt.MaxRetries,
248
249                         DialTimeout:  c.opt.DialTimeout,
250                         ReadTimeout:  c.opt.ReadTimeout,
251                         WriteTimeout: c.opt.WriteTimeout,
252
253                         PoolSize:           c.opt.PoolSize,
254                         PoolTimeout:        c.opt.PoolTimeout,
255                         IdleTimeout:        c.opt.IdleTimeout,
256                         IdleCheckFrequency: c.opt.IdleCheckFrequency,
257
258                         TLSConfig: c.opt.TLSConfig,
259                 })
260
261                 masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
262                 if err != nil {
263                         internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
264                                 c.masterName, err)
265                         _ = sentinel.Close()
266                         continue
267                 }
268
269                 // Push working sentinel to the top.
270                 c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
271                 c.setSentinel(sentinel)
272
273                 addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
274                 return addr, nil
275         }
276
277         return "", errors.New("redis: all sentinels are unreachable")
278 }
279
280 func (c *sentinelFailover) getMasterAddr() string {
281         c.mu.RLock()
282         sentinel := c.sentinel
283         c.mu.RUnlock()
284
285         if sentinel == nil {
286                 return ""
287         }
288
289         addr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
290         if err != nil {
291                 internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
292                         c.masterName, err)
293                 c.mu.Lock()
294                 if c.sentinel == sentinel {
295                         c.closeSentinel()
296                 }
297                 c.mu.Unlock()
298                 return ""
299         }
300
301         return net.JoinHostPort(addr[0], addr[1])
302 }
303
304 func (c *sentinelFailover) switchMaster(addr string) {
305         c.mu.RLock()
306         masterAddr := c._masterAddr
307         c.mu.RUnlock()
308         if masterAddr == addr {
309                 return
310         }
311
312         c.mu.Lock()
313         defer c.mu.Unlock()
314
315         internal.Logf("sentinel: new master=%q addr=%q",
316                 c.masterName, addr)
317         _ = c.Pool().Filter(func(cn *pool.Conn) bool {
318                 return cn.RemoteAddr().String() != addr
319         })
320         c._masterAddr = addr
321 }
322
323 func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) {
324         c.discoverSentinels(sentinel)
325         c.sentinel = sentinel
326
327         c.pubsub = sentinel.Subscribe("+switch-master")
328         go c.listen(c.pubsub)
329 }
330
331 func (c *sentinelFailover) closeSentinel() error {
332         var firstErr error
333
334         err := c.pubsub.Close()
335         if err != nil && firstErr == err {
336                 firstErr = err
337         }
338         c.pubsub = nil
339
340         err = c.sentinel.Close()
341         if err != nil && firstErr == err {
342                 firstErr = err
343         }
344         c.sentinel = nil
345
346         return firstErr
347 }
348
349 func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
350         sentinels, err := sentinel.Sentinels(c.masterName).Result()
351         if err != nil {
352                 internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err)
353                 return
354         }
355         for _, sentinel := range sentinels {
356                 vals := sentinel.([]interface{})
357                 for i := 0; i < len(vals); i += 2 {
358                         key := vals[i].(string)
359                         if key == "name" {
360                                 sentinelAddr := vals[i+1].(string)
361                                 if !contains(c.sentinelAddrs, sentinelAddr) {
362                                         internal.Logf("sentinel: discovered new sentinel=%q for master=%q",
363                                                 sentinelAddr, c.masterName)
364                                         c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
365                                 }
366                         }
367                 }
368         }
369 }
370
371 func (c *sentinelFailover) listen(pubsub *PubSub) {
372         ch := pubsub.Channel()
373         for {
374                 msg, ok := <-ch
375                 if !ok {
376                         break
377                 }
378
379                 switch msg.Channel {
380                 case "+switch-master":
381                         parts := strings.Split(msg.Payload, " ")
382                         if parts[0] != c.masterName {
383                                 internal.Logf("sentinel: ignore addr for master=%q", parts[0])
384                                 continue
385                         }
386                         addr := net.JoinHostPort(parts[3], parts[4])
387                         c.switchMaster(addr)
388                 }
389         }
390 }
391
392 func contains(slice []string, str string) bool {
393         for _, s := range slice {
394                 if s == str {
395                         return true
396                 }
397         }
398         return false
399 }