Merge "Fix that required tests are skipped"
[barometer.git] / src / dma / vendor / github.com / go-redis / redis / sentinel.go
1 package redis
2
3 import (
4         "crypto/tls"
5         "errors"
6         "net"
7         "strings"
8         "sync"
9         "time"
10
11         "github.com/go-redis/redis/internal"
12         "github.com/go-redis/redis/internal/pool"
13 )
14
15 //------------------------------------------------------------------------------
16
17 // FailoverOptions are used to configure a failover client and should
18 // be passed to NewFailoverClient.
19 type FailoverOptions struct {
20         // The master name.
21         MasterName string
22         // A seed list of host:port addresses of sentinel nodes.
23         SentinelAddrs []string
24
25         // Following options are copied from Options struct.
26
27         OnConnect func(*Conn) error
28
29         Password string
30         DB       int
31
32         MaxRetries int
33
34         DialTimeout  time.Duration
35         ReadTimeout  time.Duration
36         WriteTimeout time.Duration
37
38         PoolSize           int
39         PoolTimeout        time.Duration
40         IdleTimeout        time.Duration
41         IdleCheckFrequency time.Duration
42
43         TLSConfig *tls.Config
44 }
45
46 func (opt *FailoverOptions) options() *Options {
47         return &Options{
48                 Addr: "FailoverClient",
49
50                 OnConnect: opt.OnConnect,
51
52                 DB:       opt.DB,
53                 Password: opt.Password,
54
55                 MaxRetries: opt.MaxRetries,
56
57                 DialTimeout:  opt.DialTimeout,
58                 ReadTimeout:  opt.ReadTimeout,
59                 WriteTimeout: opt.WriteTimeout,
60
61                 PoolSize:           opt.PoolSize,
62                 PoolTimeout:        opt.PoolTimeout,
63                 IdleTimeout:        opt.IdleTimeout,
64                 IdleCheckFrequency: opt.IdleCheckFrequency,
65
66                 TLSConfig: opt.TLSConfig,
67         }
68 }
69
70 // NewFailoverClient returns a Redis client that uses Redis Sentinel
71 // for automatic failover. It's safe for concurrent use by multiple
72 // goroutines.
73 func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
74         opt := failoverOpt.options()
75         opt.init()
76
77         failover := &sentinelFailover{
78                 masterName:    failoverOpt.MasterName,
79                 sentinelAddrs: failoverOpt.SentinelAddrs,
80
81                 opt: opt,
82         }
83
84         c := Client{
85                 baseClient: baseClient{
86                         opt:      opt,
87                         connPool: failover.Pool(),
88
89                         onClose: func() error {
90                                 return failover.Close()
91                         },
92                 },
93         }
94         c.baseClient.init()
95         c.setProcessor(c.Process)
96
97         return &c
98 }
99
100 //------------------------------------------------------------------------------
101
102 type SentinelClient struct {
103         baseClient
104 }
105
106 func NewSentinelClient(opt *Options) *SentinelClient {
107         opt.init()
108         c := &SentinelClient{
109                 baseClient: baseClient{
110                         opt:      opt,
111                         connPool: newConnPool(opt),
112                 },
113         }
114         c.baseClient.init()
115         return c
116 }
117
118 func (c *SentinelClient) PubSub() *PubSub {
119         pubsub := &PubSub{
120                 opt: c.opt,
121
122                 newConn: func(channels []string) (*pool.Conn, error) {
123                         return c.newConn()
124                 },
125                 closeConn: c.connPool.CloseConn,
126         }
127         pubsub.init()
128         return pubsub
129 }
130
131 func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
132         cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name)
133         c.Process(cmd)
134         return cmd
135 }
136
137 func (c *SentinelClient) Sentinels(name string) *SliceCmd {
138         cmd := NewSliceCmd("SENTINEL", "sentinels", name)
139         c.Process(cmd)
140         return cmd
141 }
142
143 type sentinelFailover struct {
144         sentinelAddrs []string
145
146         opt *Options
147
148         pool     *pool.ConnPool
149         poolOnce sync.Once
150
151         mu          sync.RWMutex
152         masterName  string
153         _masterAddr string
154         sentinel    *SentinelClient
155 }
156
157 func (d *sentinelFailover) Close() error {
158         return d.resetSentinel()
159 }
160
161 func (d *sentinelFailover) Pool() *pool.ConnPool {
162         d.poolOnce.Do(func() {
163                 d.opt.Dialer = d.dial
164                 d.pool = newConnPool(d.opt)
165         })
166         return d.pool
167 }
168
169 func (d *sentinelFailover) dial() (net.Conn, error) {
170         addr, err := d.MasterAddr()
171         if err != nil {
172                 return nil, err
173         }
174         return net.DialTimeout("tcp", addr, d.opt.DialTimeout)
175 }
176
177 func (d *sentinelFailover) MasterAddr() (string, error) {
178         d.mu.Lock()
179         defer d.mu.Unlock()
180
181         addr, err := d.masterAddr()
182         if err != nil {
183                 return "", err
184         }
185         d._switchMaster(addr)
186
187         return addr, nil
188 }
189
190 func (d *sentinelFailover) masterAddr() (string, error) {
191         // Try last working sentinel.
192         if d.sentinel != nil {
193                 addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result()
194                 if err == nil {
195                         addr := net.JoinHostPort(addr[0], addr[1])
196                         return addr, nil
197                 }
198
199                 internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
200                         d.masterName, err)
201                 d._resetSentinel()
202         }
203
204         for i, sentinelAddr := range d.sentinelAddrs {
205                 sentinel := NewSentinelClient(&Options{
206                         Addr: sentinelAddr,
207
208                         DialTimeout:  d.opt.DialTimeout,
209                         ReadTimeout:  d.opt.ReadTimeout,
210                         WriteTimeout: d.opt.WriteTimeout,
211
212                         PoolSize:    d.opt.PoolSize,
213                         PoolTimeout: d.opt.PoolTimeout,
214                         IdleTimeout: d.opt.IdleTimeout,
215                 })
216
217                 masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
218                 if err != nil {
219                         internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
220                                 d.masterName, err)
221                         sentinel.Close()
222                         continue
223                 }
224
225                 // Push working sentinel to the top.
226                 d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0]
227                 d.setSentinel(sentinel)
228
229                 addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
230                 return addr, nil
231         }
232
233         return "", errors.New("redis: all sentinels are unreachable")
234 }
235
236 func (c *sentinelFailover) switchMaster(addr string) {
237         c.mu.Lock()
238         c._switchMaster(addr)
239         c.mu.Unlock()
240 }
241
242 func (c *sentinelFailover) _switchMaster(addr string) {
243         if c._masterAddr == addr {
244                 return
245         }
246
247         internal.Logf("sentinel: new master=%q addr=%q",
248                 c.masterName, addr)
249         _ = c.Pool().Filter(func(cn *pool.Conn) bool {
250                 return cn.RemoteAddr().String() != addr
251         })
252         c._masterAddr = addr
253 }
254
255 func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) {
256         d.discoverSentinels(sentinel)
257         d.sentinel = sentinel
258         go d.listen(sentinel)
259 }
260
261 func (d *sentinelFailover) resetSentinel() error {
262         var err error
263         d.mu.Lock()
264         if d.sentinel != nil {
265                 err = d._resetSentinel()
266         }
267         d.mu.Unlock()
268         return err
269 }
270
271 func (d *sentinelFailover) _resetSentinel() error {
272         err := d.sentinel.Close()
273         d.sentinel = nil
274         return err
275 }
276
277 func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
278         sentinels, err := sentinel.Sentinels(d.masterName).Result()
279         if err != nil {
280                 internal.Logf("sentinel: Sentinels master=%q failed: %s", d.masterName, err)
281                 return
282         }
283         for _, sentinel := range sentinels {
284                 vals := sentinel.([]interface{})
285                 for i := 0; i < len(vals); i += 2 {
286                         key := vals[i].(string)
287                         if key == "name" {
288                                 sentinelAddr := vals[i+1].(string)
289                                 if !contains(d.sentinelAddrs, sentinelAddr) {
290                                         internal.Logf(
291                                                 "sentinel: discovered new sentinel=%q for master=%q",
292                                                 sentinelAddr, d.masterName,
293                                         )
294                                         d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr)
295                                 }
296                         }
297                 }
298         }
299 }
300
301 func (d *sentinelFailover) listen(sentinel *SentinelClient) {
302         pubsub := sentinel.PubSub()
303         defer pubsub.Close()
304
305         err := pubsub.Subscribe("+switch-master")
306         if err != nil {
307                 internal.Logf("sentinel: Subscribe failed: %s", err)
308                 d.resetSentinel()
309                 return
310         }
311
312         for {
313                 msg, err := pubsub.ReceiveMessage()
314                 if err != nil {
315                         if err == pool.ErrClosed {
316                                 d.resetSentinel()
317                                 return
318                         }
319                         internal.Logf("sentinel: ReceiveMessage failed: %s", err)
320                         continue
321                 }
322
323                 switch msg.Channel {
324                 case "+switch-master":
325                         parts := strings.Split(msg.Payload, " ")
326                         if parts[0] != d.masterName {
327                                 internal.Logf("sentinel: ignore addr for master=%q", parts[0])
328                                 continue
329                         }
330                         addr := net.JoinHostPort(parts[3], parts[4])
331                         d.switchMaster(addr)
332                 }
333         }
334 }
335
336 func contains(slice []string, str string) bool {
337         for _, s := range slice {
338                 if s == str {
339                         return true
340                 }
341         }
342         return false
343 }