11 "github.com/go-redis/redis/internal"
12 "github.com/go-redis/redis/internal/pool"
15 //------------------------------------------------------------------------------
17 // FailoverOptions are used to configure a failover client and should
18 // be passed to NewFailoverClient.
19 type FailoverOptions struct {
22 // A seed list of host:port addresses of sentinel nodes.
23 SentinelAddrs []string
25 // Following options are copied from Options struct.
27 OnConnect func(*Conn) error
34 DialTimeout time.Duration
35 ReadTimeout time.Duration
36 WriteTimeout time.Duration
39 PoolTimeout time.Duration
40 IdleTimeout time.Duration
41 IdleCheckFrequency time.Duration
46 func (opt *FailoverOptions) options() *Options {
48 Addr: "FailoverClient",
50 OnConnect: opt.OnConnect,
53 Password: opt.Password,
55 MaxRetries: opt.MaxRetries,
57 DialTimeout: opt.DialTimeout,
58 ReadTimeout: opt.ReadTimeout,
59 WriteTimeout: opt.WriteTimeout,
61 PoolSize: opt.PoolSize,
62 PoolTimeout: opt.PoolTimeout,
63 IdleTimeout: opt.IdleTimeout,
64 IdleCheckFrequency: opt.IdleCheckFrequency,
66 TLSConfig: opt.TLSConfig,
70 // NewFailoverClient returns a Redis client that uses Redis Sentinel
71 // for automatic failover. It's safe for concurrent use by multiple
73 func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
74 opt := failoverOpt.options()
77 failover := &sentinelFailover{
78 masterName: failoverOpt.MasterName,
79 sentinelAddrs: failoverOpt.SentinelAddrs,
85 baseClient: baseClient{
87 connPool: failover.Pool(),
89 onClose: func() error {
90 return failover.Close()
95 c.setProcessor(c.Process)
100 //------------------------------------------------------------------------------
102 type SentinelClient struct {
106 func NewSentinelClient(opt *Options) *SentinelClient {
108 c := &SentinelClient{
109 baseClient: baseClient{
111 connPool: newConnPool(opt),
118 func (c *SentinelClient) PubSub() *PubSub {
122 newConn: func(channels []string) (*pool.Conn, error) {
125 closeConn: c.connPool.CloseConn,
131 func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
132 cmd := NewStringSliceCmd("SENTINEL", "get-master-addr-by-name", name)
137 func (c *SentinelClient) Sentinels(name string) *SliceCmd {
138 cmd := NewSliceCmd("SENTINEL", "sentinels", name)
143 type sentinelFailover struct {
144 sentinelAddrs []string
154 sentinel *SentinelClient
157 func (d *sentinelFailover) Close() error {
158 return d.resetSentinel()
161 func (d *sentinelFailover) Pool() *pool.ConnPool {
162 d.poolOnce.Do(func() {
163 d.opt.Dialer = d.dial
164 d.pool = newConnPool(d.opt)
169 func (d *sentinelFailover) dial() (net.Conn, error) {
170 addr, err := d.MasterAddr()
174 return net.DialTimeout("tcp", addr, d.opt.DialTimeout)
177 func (d *sentinelFailover) MasterAddr() (string, error) {
181 addr, err := d.masterAddr()
185 d._switchMaster(addr)
190 func (d *sentinelFailover) masterAddr() (string, error) {
191 // Try last working sentinel.
192 if d.sentinel != nil {
193 addr, err := d.sentinel.GetMasterAddrByName(d.masterName).Result()
195 addr := net.JoinHostPort(addr[0], addr[1])
199 internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
204 for i, sentinelAddr := range d.sentinelAddrs {
205 sentinel := NewSentinelClient(&Options{
208 DialTimeout: d.opt.DialTimeout,
209 ReadTimeout: d.opt.ReadTimeout,
210 WriteTimeout: d.opt.WriteTimeout,
212 PoolSize: d.opt.PoolSize,
213 PoolTimeout: d.opt.PoolTimeout,
214 IdleTimeout: d.opt.IdleTimeout,
217 masterAddr, err := sentinel.GetMasterAddrByName(d.masterName).Result()
219 internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
225 // Push working sentinel to the top.
226 d.sentinelAddrs[0], d.sentinelAddrs[i] = d.sentinelAddrs[i], d.sentinelAddrs[0]
227 d.setSentinel(sentinel)
229 addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
233 return "", errors.New("redis: all sentinels are unreachable")
236 func (c *sentinelFailover) switchMaster(addr string) {
238 c._switchMaster(addr)
242 func (c *sentinelFailover) _switchMaster(addr string) {
243 if c._masterAddr == addr {
247 internal.Logf("sentinel: new master=%q addr=%q",
249 _ = c.Pool().Filter(func(cn *pool.Conn) bool {
250 return cn.RemoteAddr().String() != addr
255 func (d *sentinelFailover) setSentinel(sentinel *SentinelClient) {
256 d.discoverSentinels(sentinel)
257 d.sentinel = sentinel
258 go d.listen(sentinel)
261 func (d *sentinelFailover) resetSentinel() error {
264 if d.sentinel != nil {
265 err = d._resetSentinel()
271 func (d *sentinelFailover) _resetSentinel() error {
272 err := d.sentinel.Close()
277 func (d *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
278 sentinels, err := sentinel.Sentinels(d.masterName).Result()
280 internal.Logf("sentinel: Sentinels master=%q failed: %s", d.masterName, err)
283 for _, sentinel := range sentinels {
284 vals := sentinel.([]interface{})
285 for i := 0; i < len(vals); i += 2 {
286 key := vals[i].(string)
288 sentinelAddr := vals[i+1].(string)
289 if !contains(d.sentinelAddrs, sentinelAddr) {
291 "sentinel: discovered new sentinel=%q for master=%q",
292 sentinelAddr, d.masterName,
294 d.sentinelAddrs = append(d.sentinelAddrs, sentinelAddr)
301 func (d *sentinelFailover) listen(sentinel *SentinelClient) {
302 pubsub := sentinel.PubSub()
305 err := pubsub.Subscribe("+switch-master")
307 internal.Logf("sentinel: Subscribe failed: %s", err)
313 msg, err := pubsub.ReceiveMessage()
315 if err == pool.ErrClosed {
319 internal.Logf("sentinel: ReceiveMessage failed: %s", err)
324 case "+switch-master":
325 parts := strings.Split(msg.Payload, " ")
326 if parts[0] != d.masterName {
327 internal.Logf("sentinel: ignore addr for master=%q", parts[0])
330 addr := net.JoinHostPort(parts[3], parts[4])
336 func contains(slice []string, str string) bool {
337 for _, s := range slice {