barometer: update DMA's vendoring packages
[barometer.git] / src / dma / vendor / github.com / go-redis / redis / sentinel.go
index 12c29a7..7cbb90b 100644 (file)
@@ -29,13 +29,17 @@ type FailoverOptions struct {
        Password string
        DB       int
 
-       MaxRetries int
+       MaxRetries      int
+       MinRetryBackoff time.Duration
+       MaxRetryBackoff time.Duration
 
        DialTimeout  time.Duration
        ReadTimeout  time.Duration
        WriteTimeout time.Duration
 
        PoolSize           int
+       MinIdleConns       int
+       MaxConnAge         time.Duration
        PoolTimeout        time.Duration
        IdleTimeout        time.Duration
        IdleCheckFrequency time.Duration
@@ -92,7 +96,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
                },
        }
        c.baseClient.init()
-       c.setProcessor(c.Process)
+       c.cmdable.setProcessor(c.Process)
 
        return &c
 }
@@ -115,7 +119,7 @@ func NewSentinelClient(opt *Options) *SentinelClient {
        return c
 }
 
-func (c *SentinelClient) PubSub() *PubSub {
+func (c *SentinelClient) pubSub() *PubSub {
        pubsub := &PubSub{
                opt: c.opt,
 
@@ -128,14 +132,52 @@ func (c *SentinelClient) PubSub() *PubSub {
        return pubsub
 }
 
+// Subscribe subscribes the client to the specified channels.
+// Channels can be omitted to create empty subscription.
+func (c *SentinelClient) Subscribe(channels ...string) *PubSub {
+       pubsub := c.pubSub()
+       if len(channels) > 0 {
+               _ = pubsub.Subscribe(channels...)
+       }
+       return pubsub
+}
+
+// PSubscribe subscribes the client to the given patterns.
+// Patterns can be omitted to create empty subscription.
+func (c *SentinelClient) PSubscribe(channels ...string) *PubSub {
+       pubsub := c.pubSub()
+       if len(channels) > 0 {
+               _ = pubsub.PSubscribe(channels...)
+       }
+       return pubsub
+}
+
 func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
-       cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name)
+       cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name)
        c.Process(cmd)
        return cmd
 }
 
 func (c *SentinelClient) Sentinels(name string) *SliceCmd {
-       cmd := NewSliceCmd("SENTINEL", "sentinels", name)
+       cmd := NewSliceCmd("sentinel", "sentinels", name)
+       c.Process(cmd)
+       return cmd
+}
+
+// Failover forces a failover as if the master was not reachable, and without
+// asking for agreement to other Sentinels.
+func (c *SentinelClient) Failover(name string) *StatusCmd {
+       cmd := NewStatusCmd("sentinel", "failover", name)
+       c.Process(cmd)
+       return cmd
+}
+
+// Reset resets all the masters with matching name. The pattern argument is a
+// glob-style pattern. The reset process clears any previous state in a master
+// (including a failover in progress), and removes every slave and sentinel
+// already discovered and associated with the master.
+func (c *SentinelClient) Reset(pattern string) *IntCmd {
+       cmd := NewIntCmd("sentinel", "reset", pattern)
        c.Process(cmd)
        return cmd
 }
@@ -152,79 +194,81 @@ type sentinelFailover struct {
        masterName  string
        _masterAddr string
        sentinel    *SentinelClient
+       pubsub      *PubSub
 }
 
-func (d *sentinelFailover) Close() error {
-       return d.resetSentinel()
+func (c *sentinelFailover) Close() error {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if c.sentinel != nil {
+               return c.closeSentinel()
+       }
+       return nil
 }
 
-func (d *sentinelFailover) Pool() *pool.ConnPool {
-       d.poolOnce.Do(func() {
-               d.opt.Dialer = d.dial
-               d.pool = newConnPool(d.opt)
+func (c *sentinelFailover) Pool() *pool.ConnPool {
+       c.poolOnce.Do(func() {
+               c.opt.Dialer = c.dial
+               c.pool = newConnPool(c.opt)
        })
-       return d.pool
+       return c.pool
 }
 
-func (d *sentinelFailover) dial() (net.Conn, error) {
-       addr, err := d.MasterAddr()
+func (c *sentinelFailover) dial() (net.Conn, error) {
+       addr, err := c.MasterAddr()
        if err != nil {
                return nil, err
        }
-       return net.DialTimeout("tcp", addr, d.opt.DialTimeout)
+       return net.DialTimeout("tcp", addr, c.opt.DialTimeout)
 }
 
-func (d *sentinelFailover) MasterAddr() (string, error) {
-       d.mu.Lock()
-       defer d.mu.Unlock()
-
-       addr, err := d.masterAddr()
+func (c *sentinelFailover) MasterAddr() (string, error) {
+       addr, err := c.masterAddr()
        if err != nil {
                return "", err
        }
-       d._switchMaster(addr)
-
+       c.switchMaster(addr)
        return addr, nil
 }
 
-func (d *sentinelFailover) masterAddr() (string, error) {
-       // Try last working sentinel.
-       if d.sentinel != nil {
-               addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result()
-               if err == nil {
-                       addr := net.JoinHostPort(addr[0], addr[1])
-                       return addr, nil
-               }
-
-               internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
-                       d.masterName, err)
-               d._resetSentinel()
+func (c *sentinelFailover) masterAddr() (string, error) {
+       addr := c.getMasterAddr()
+       if addr != "" {
+               return addr, nil
        }
 
-       for i, sentinelAddr := range d.sentinelAddrs {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+
+       for i, sentinelAddr := range c.sentinelAddrs {
                sentinel := NewSentinelClient(&Options{
                        Addr: sentinelAddr,
 
-                       DialTimeout:  d.opt.DialTimeout,
-                       ReadTimeout:  d.opt.ReadTimeout,
-                       WriteTimeout: d.opt.WriteTimeout,
+                       MaxRetries: c.opt.MaxRetries,
+
+                       DialTimeout:  c.opt.DialTimeout,
+                       ReadTimeout:  c.opt.ReadTimeout,
+                       WriteTimeout: c.opt.WriteTimeout,
 
-                       PoolSize:    d.opt.PoolSize,
-                       PoolTimeout: d.opt.PoolTimeout,
-                       IdleTimeout: d.opt.IdleTimeout,
+                       PoolSize:           c.opt.PoolSize,
+                       PoolTimeout:        c.opt.PoolTimeout,
+                       IdleTimeout:        c.opt.IdleTimeout,
+                       IdleCheckFrequency: c.opt.IdleCheckFrequency,
+
+                       TLSConfig: c.opt.TLSConfig,
                })
 
-               masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
+               masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
                if err != nil {
                        internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
-                               d.masterName, err)
-                       sentinel.Close()
+                               c.masterName, err)
+                       _ = sentinel.Close()
                        continue
                }
 
                // Push working sentinel to the top.
-               d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0]
-               d.setSentinel(sentinel)
+               c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
+               c.setSentinel(sentinel)
 
                addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
                return addr, nil
@@ -233,17 +277,41 @@ func (d *sentinelFailover) masterAddr() (string, error) {
        return "", errors.New("redis: all sentinels are unreachable")
 }
 
-func (c *sentinelFailover) switchMaster(addr string) {
-       c.mu.Lock()
-       c._switchMaster(addr)
-       c.mu.Unlock()
+func (c *sentinelFailover) getMasterAddr() string {
+       c.mu.RLock()
+       sentinel := c.sentinel
+       c.mu.RUnlock()
+
+       if sentinel == nil {
+               return ""
+       }
+
+       addr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
+       if err != nil {
+               internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
+                       c.masterName, err)
+               c.mu.Lock()
+               if c.sentinel == sentinel {
+                       c.closeSentinel()
+               }
+               c.mu.Unlock()
+               return ""
+       }
+
+       return net.JoinHostPort(addr[0], addr[1])
 }
 
-func (c *sentinelFailover) _switchMaster(addr string) {
-       if c._masterAddr == addr {
+func (c *sentinelFailover) switchMaster(addr string) {
+       c.mu.RLock()
+       masterAddr := c._masterAddr
+       c.mu.RUnlock()
+       if masterAddr == addr {
                return
        }
 
+       c.mu.Lock()
+       defer c.mu.Unlock()
+
        internal.Logf("sentinel: new master=%q addr=%q",
                c.masterName, addr)
        _ = c.Pool().Filter(func(cn *pool.Conn) bool {
@@ -252,32 +320,36 @@ func (c *sentinelFailover) _switchMaster(addr string) {
        c._masterAddr = addr
 }
 
-func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) {
-       d.discoverSentinels(sentinel)
-       d.sentinel = sentinel
-       go d.listen(sentinel)
+func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) {
+       c.discoverSentinels(sentinel)
+       c.sentinel = sentinel
+
+       c.pubsub = sentinel.Subscribe("+switch-master")
+       go c.listen(c.pubsub)
 }
 
-func (d *sentinelFailover) resetSentinel() error {
-       var err error
-       d.mu.Lock()
-       if d.sentinel != nil {
-               err = d._resetSentinel()
+func (c *sentinelFailover) closeSentinel() error {
+       var firstErr error
+
+       err := c.pubsub.Close()
+       if err != nil && firstErr == err {
+               firstErr = err
        }
-       d.mu.Unlock()
-       return err
-}
+       c.pubsub = nil
 
-func (d *sentinelFailover) _resetSentinel() error {
-       err := d.sentinel.Close()
-       d.sentinel = nil
-       return err
+       err = c.sentinel.Close()
+       if err != nil && firstErr == err {
+               firstErr = err
+       }
+       c.sentinel = nil
+
+       return firstErr
 }
 
-func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
-       sentinels, err := sentinel.Sentinels(d.masterName).Result()
+func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
+       sentinels, err := sentinel.Sentinels(c.masterName).Result()
        if err != nil {
-               internal.Logf("sentinel: Sentinels master=%q failed: %s", d.masterName, err)
+               internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err)
                return
        }
        for _, sentinel := range sentinels {
@@ -286,49 +358,33 @@ func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
                        key := vals[i].(string)
                        if key == "name" {
                                sentinelAddr := vals[i+1].(string)
-                               if !contains(d.sentinelAddrs, sentinelAddr) {
-                                       internal.Logf(
-                                               "sentinel: discovered new sentinel=%q for master=%q",
-                                               sentinelAddr, d.masterName,
-                                       )
-                                       d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr)
+                               if !contains(c.sentinelAddrs, sentinelAddr) {
+                                       internal.Logf("sentinel: discovered new sentinel=%q for master=%q",
+                                               sentinelAddr, c.masterName)
+                                       c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
                                }
                        }
                }
        }
 }
 
-func (d *sentinelFailover) listen(sentinel *SentinelClient) {
-       pubsub := sentinel.PubSub()
-       defer pubsub.Close()
-
-       err := pubsub.Subscribe("+switch-master")
-       if err != nil {
-               internal.Logf("sentinel: Subscribe failed: %s", err)
-               d.resetSentinel()
-               return
-       }
-
+func (c *sentinelFailover) listen(pubsub *PubSub) {
+       ch := pubsub.Channel()
        for {
-               msg, err := pubsub.ReceiveMessage()
-               if err != nil {
-                       if err == pool.ErrClosed {
-                               d.resetSentinel()
-                               return
-                       }
-                       internal.Logf("sentinel: ReceiveMessage failed: %s", err)
-                       continue
+               msg, ok := <-ch
+               if !ok {
+                       break
                }
 
                switch msg.Channel {
                case "+switch-master":
                        parts := strings.Split(msg.Payload, " ")
-                       if parts[0] != d.masterName {
+                       if parts[0] != c.masterName {
                                internal.Logf("sentinel: ignore addr for master=%q", parts[0])
                                continue
                        }
                        addr := net.JoinHostPort(parts[3], parts[4])
-                       d.switchMaster(addr)
+                       c.switchMaster(addr)
                }
        }
 }