11 "github.com/go-redis/redis/internal"
12 "github.com/go-redis/redis/internal/pool"
15 //------------------------------------------------------------------------------
17 // FailoverOptions are used to configure a failover client and should
18 // be passed to NewFailoverClient.
19 type FailoverOptions struct {
22 // A seed list of host:port addresses of sentinel nodes.
23 SentinelAddrs []string
25 // Following options are copied from Options struct.
27 OnConnect func(*Conn) error
33 MinRetryBackoff time.Duration
34 MaxRetryBackoff time.Duration
36 DialTimeout time.Duration
37 ReadTimeout time.Duration
38 WriteTimeout time.Duration
42 MaxConnAge time.Duration
43 PoolTimeout time.Duration
44 IdleTimeout time.Duration
45 IdleCheckFrequency time.Duration
50 func (opt *FailoverOptions) options() *Options {
52 Addr: "FailoverClient",
54 OnConnect: opt.OnConnect,
57 Password: opt.Password,
59 MaxRetries: opt.MaxRetries,
61 DialTimeout: opt.DialTimeout,
62 ReadTimeout: opt.ReadTimeout,
63 WriteTimeout: opt.WriteTimeout,
65 PoolSize: opt.PoolSize,
66 PoolTimeout: opt.PoolTimeout,
67 IdleTimeout: opt.IdleTimeout,
68 IdleCheckFrequency: opt.IdleCheckFrequency,
70 TLSConfig: opt.TLSConfig,
74 // NewFailoverClient returns a Redis client that uses Redis Sentinel
75 // for automatic failover. It's safe for concurrent use by multiple
77 func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
78 opt := failoverOpt.options()
81 failover := &sentinelFailover{
82 masterName: failoverOpt.MasterName,
83 sentinelAddrs: failoverOpt.SentinelAddrs,
89 baseClient: baseClient{
91 connPool: failover.Pool(),
93 onClose: func() error {
94 return failover.Close()
99 c.cmdable.setProcessor(c.Process)
104 //------------------------------------------------------------------------------
106 type SentinelClient struct {
110 func NewSentinelClient(opt *Options) *SentinelClient {
112 c := &SentinelClient{
113 baseClient: baseClient{
115 connPool: newConnPool(opt),
122 func (c *SentinelClient) pubSub() *PubSub {
126 newConn: func(channels []string) (*pool.Conn, error) {
129 closeConn: c.connPool.CloseConn,
135 // Subscribe subscribes the client to the specified channels.
136 // Channels can be omitted to create empty subscription.
137 func (c *SentinelClient) Subscribe(channels ...string) *PubSub {
139 if len(channels) > 0 {
140 _ = pubsub.Subscribe(channels...)
145 // PSubscribe subscribes the client to the given patterns.
146 // Patterns can be omitted to create empty subscription.
147 func (c *SentinelClient) PSubscribe(channels ...string) *PubSub {
149 if len(channels) > 0 {
150 _ = pubsub.PSubscribe(channels...)
155 func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
156 cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name)
161 func (c *SentinelClient) Sentinels(name string) *SliceCmd {
162 cmd := NewSliceCmd("sentinel", "sentinels", name)
167 // Failover forces a failover as if the master was not reachable, and without
168 // asking for agreement to other Sentinels.
169 func (c *SentinelClient) Failover(name string) *StatusCmd {
170 cmd := NewStatusCmd("sentinel", "failover", name)
175 // Reset resets all the masters with matching name. The pattern argument is a
176 // glob-style pattern. The reset process clears any previous state in a master
177 // (including a failover in progress), and removes every slave and sentinel
178 // already discovered and associated with the master.
179 func (c *SentinelClient) Reset(pattern string) *IntCmd {
180 cmd := NewIntCmd("sentinel", "reset", pattern)
185 type sentinelFailover struct {
186 sentinelAddrs []string
196 sentinel *SentinelClient
200 func (c *sentinelFailover) Close() error {
203 if c.sentinel != nil {
204 return c.closeSentinel()
209 func (c *sentinelFailover) Pool() *pool.ConnPool {
210 c.poolOnce.Do(func() {
211 c.opt.Dialer = c.dial
212 c.pool = newConnPool(c.opt)
217 func (c *sentinelFailover) dial() (net.Conn, error) {
218 addr, err := c.MasterAddr()
222 return net.DialTimeout("tcp", addr, c.opt.DialTimeout)
225 func (c *sentinelFailover) MasterAddr() (string, error) {
226 addr, err := c.masterAddr()
234 func (c *sentinelFailover) masterAddr() (string, error) {
235 addr := c.getMasterAddr()
243 for i, sentinelAddr := range c.sentinelAddrs {
244 sentinel := NewSentinelClient(&Options{
247 MaxRetries: c.opt.MaxRetries,
249 DialTimeout: c.opt.DialTimeout,
250 ReadTimeout: c.opt.ReadTimeout,
251 WriteTimeout: c.opt.WriteTimeout,
253 PoolSize: c.opt.PoolSize,
254 PoolTimeout: c.opt.PoolTimeout,
255 IdleTimeout: c.opt.IdleTimeout,
256 IdleCheckFrequency: c.opt.IdleCheckFrequency,
258 TLSConfig: c.opt.TLSConfig,
261 masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
263 internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
269 // Push working sentinel to the top.
270 c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
271 c.setSentinel(sentinel)
273 addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
277 return "", errors.New("redis: all sentinels are unreachable")
280 func (c *sentinelFailover) getMasterAddr() string {
282 sentinel := c.sentinel
289 addr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
291 internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
294 if c.sentinel == sentinel {
301 return net.JoinHostPort(addr[0], addr[1])
304 func (c *sentinelFailover) switchMaster(addr string) {
306 masterAddr := c._masterAddr
308 if masterAddr == addr {
315 internal.Logf("sentinel: new master=%q addr=%q",
317 _ = c.Pool().Filter(func(cn *pool.Conn) bool {
318 return cn.RemoteAddr().String() != addr
323 func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) {
324 c.discoverSentinels(sentinel)
325 c.sentinel = sentinel
327 c.pubsub = sentinel.Subscribe("+switch-master")
328 go c.listen(c.pubsub)
331 func (c *sentinelFailover) closeSentinel() error {
334 err := c.pubsub.Close()
335 if err != nil && firstErr == err {
340 err = c.sentinel.Close()
341 if err != nil && firstErr == err {
349 func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
350 sentinels, err := sentinel.Sentinels(c.masterName).Result()
352 internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err)
355 for _, sentinel := range sentinels {
356 vals := sentinel.([]interface{})
357 for i := 0; i < len(vals); i += 2 {
358 key := vals[i].(string)
360 sentinelAddr := vals[i+1].(string)
361 if !contains(c.sentinelAddrs, sentinelAddr) {
362 internal.Logf("sentinel: discovered new sentinel=%q for master=%q",
363 sentinelAddr, c.masterName)
364 c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
371 func (c *sentinelFailover) listen(pubsub *PubSub) {
372 ch := pubsub.Channel()
380 case "+switch-master":
381 parts := strings.Split(msg.Payload, " ")
382 if parts[0] != c.masterName {
383 internal.Logf("sentinel: ignore addr for master=%q", parts[0])
386 addr := net.JoinHostPort(parts[3], parts[4])
392 func contains(slice []string, str string) bool {
393 for _, s := range slice {