9 "github.com/go-redis/redis/internal"
10 "github.com/go-redis/redis/internal/pool"
11 "github.com/go-redis/redis/internal/proto"
14 var errPingTimeout = errors.New("redis: ping timeout")
16 // PubSub implements Pub/Sub commands bas described in
17 // http://redis.io/topics/pubsub. Message receiving is NOT safe
18 // for concurrent use by multiple goroutines.
20 // PubSub automatically reconnects to Redis Server and resubscribes
21 // to the channels in case of network errors.
25 newConn func([]string) (*pool.Conn, error)
26 closeConn func(*pool.Conn) error
30 channels map[string]struct{}
31 patterns map[string]struct{}
42 func (c *PubSub) init() {
43 c.exit = make(chan struct{})
46 func (c *PubSub) conn() (*pool.Conn, error) {
48 cn, err := c._conn(nil)
53 func (c *PubSub) _conn(newChannels []string) (*pool.Conn, error) {
55 return nil, pool.ErrClosed
61 channels := mapKeys(c.channels)
62 channels = append(channels, newChannels...)
64 cn, err := c.newConn(channels)
69 if err := c.resubscribe(cn); err != nil {
78 func (c *PubSub) writeCmd(cn *pool.Conn, cmd Cmder) error {
79 return cn.WithWriter(c.opt.WriteTimeout, func(wr *proto.Writer) error {
80 return writeCmd(wr, cmd)
84 func (c *PubSub) resubscribe(cn *pool.Conn) error {
87 if len(c.channels) > 0 {
88 err := c._subscribe(cn, "subscribe", mapKeys(c.channels))
89 if err != nil && firstErr == nil {
94 if len(c.patterns) > 0 {
95 err := c._subscribe(cn, "psubscribe", mapKeys(c.patterns))
96 if err != nil && firstErr == nil {
104 func mapKeys(m map[string]struct{}) []string {
105 s := make([]string, len(m))
114 func (c *PubSub) _subscribe(
115 cn *pool.Conn, redisCmd string, channels []string,
117 args := make([]interface{}, 0, 1+len(channels))
118 args = append(args, redisCmd)
119 for _, channel := range channels {
120 args = append(args, channel)
122 cmd := NewSliceCmd(args...)
123 return c.writeCmd(cn, cmd)
126 func (c *PubSub) releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
128 c._releaseConn(cn, err, allowTimeout)
132 func (c *PubSub) _releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
136 if internal.IsBadConn(err, allowTimeout) {
141 func (c *PubSub) _reconnect(reason error) {
142 _ = c._closeTheCn(reason)
146 func (c *PubSub) _closeTheCn(reason error) error {
151 internal.Logf("redis: discarding bad PubSub connection: %s", reason)
153 err := c.closeConn(c.cn)
158 func (c *PubSub) Close() error {
163 return pool.ErrClosed
168 err := c._closeTheCn(pool.ErrClosed)
172 // Subscribe the client to the specified channels. It returns
173 // empty subscription if there are no channels.
174 func (c *PubSub) Subscribe(channels ...string) error {
178 err := c.subscribe("subscribe", channels...)
179 if c.channels == nil {
180 c.channels = make(map[string]struct{})
182 for _, s := range channels {
183 c.channels[s] = struct{}{}
188 // PSubscribe the client to the given patterns. It returns
189 // empty subscription if there are no patterns.
190 func (c *PubSub) PSubscribe(patterns ...string) error {
194 err := c.subscribe("psubscribe", patterns...)
195 if c.patterns == nil {
196 c.patterns = make(map[string]struct{})
198 for _, s := range patterns {
199 c.patterns[s] = struct{}{}
204 // Unsubscribe the client from the given channels, or from all of
205 // them if none is given.
206 func (c *PubSub) Unsubscribe(channels ...string) error {
210 for _, channel := range channels {
211 delete(c.channels, channel)
213 err := c.subscribe("unsubscribe", channels...)
217 // PUnsubscribe the client from the given patterns, or from all of
218 // them if none is given.
219 func (c *PubSub) PUnsubscribe(patterns ...string) error {
223 for _, pattern := range patterns {
224 delete(c.patterns, pattern)
226 err := c.subscribe("punsubscribe", patterns...)
230 func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
231 cn, err := c._conn(channels)
236 err = c._subscribe(cn, redisCmd, channels)
237 c._releaseConn(cn, err, false)
241 func (c *PubSub) Ping(payload ...string) error {
242 args := []interface{}{"ping"}
243 if len(payload) == 1 {
244 args = append(args, payload[0])
246 cmd := NewCmd(args...)
253 err = c.writeCmd(cn, cmd)
254 c.releaseConn(cn, err, false)
258 // Subscription received after a successful subscription to channel.
259 type Subscription struct {
260 // Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
262 // Channel name we have subscribed to.
264 // Number of channels we are currently subscribed to.
268 func (m *Subscription) String() string {
269 return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
272 // Message received as result of a PUBLISH command issued by another client.
273 type Message struct {
279 func (m *Message) String() string {
280 return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
283 // Pong received as result of a PING command issued by another client.
288 func (p *Pong) String() string {
290 return fmt.Sprintf("Pong<%s>", p.Payload)
295 func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
296 switch reply := reply.(type) {
302 switch kind := reply[0].(string); kind {
303 case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
304 return &Subscription{
306 Channel: reply[1].(string),
307 Count: int(reply[2].(int64)),
311 Channel: reply[1].(string),
312 Payload: reply[2].(string),
316 Pattern: reply[1].(string),
317 Channel: reply[2].(string),
318 Payload: reply[3].(string),
322 Payload: reply[1].(string),
325 return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
328 return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", reply)
332 // ReceiveTimeout acts like Receive but returns an error if message
333 // is not received in time. This is low-level API and in most cases
334 // Channel should be used instead.
335 func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
345 err = cn.WithReader(timeout, func(rd *proto.Reader) error {
346 return c.cmd.readReply(rd)
349 c.releaseConn(cn, err, timeout > 0)
354 return c.newMessage(c.cmd.Val())
357 // Receive returns a message as a Subscription, Message, Pong or error.
358 // See PubSub example for details. This is low-level API and in most cases
359 // Channel should be used instead.
360 func (c *PubSub) Receive() (interface{}, error) {
361 return c.ReceiveTimeout(0)
364 // ReceiveMessage returns a Message or error ignoring Subscription and Pong
365 // messages. This is low-level API and in most cases Channel should be used
367 func (c *PubSub) ReceiveMessage() (*Message, error) {
369 msg, err := c.Receive()
374 switch msg := msg.(type) {
382 err := fmt.Errorf("redis: unknown message: %T", msg)
388 // Channel returns a Go channel for concurrently receiving messages.
389 // It periodically sends Ping messages to test connection health.
390 // The channel is closed with PubSub. Receive* APIs can not be used
391 // after channel is created.
392 func (c *PubSub) Channel() <-chan *Message {
393 c.chOnce.Do(c.initChannel)
397 func (c *PubSub) initChannel() {
398 c.ch = make(chan *Message, 100)
399 c.ping = make(chan struct{}, 10)
404 msg, err := c.Receive()
406 if err == pool.ErrClosed {
411 time.Sleep(c.retryBackoff(errCount))
418 // Any message is as good as a ping.
420 case c.ping <- struct{}{}:
424 switch msg := msg.(type) {
432 internal.Logf("redis: unknown message: %T", msg)
438 const timeout = 5 * time.Second
440 timer := time.NewTimer(timeout)
458 pingErr = errPingTimeout
461 c._reconnect(pingErr)
471 func (c *PubSub) retryBackoff(attempt int) time.Duration {
472 return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)