8 "github.com/go-redis/redis/internal"
9 "github.com/go-redis/redis/internal/pool"
12 // PubSub implements Pub/Sub commands as described in
13 // http://redis.io/topics/pubsub. Message receiving is NOT safe
14 // for concurrent use by multiple goroutines.
16 // PubSub automatically reconnects to Redis Server and resubscribes
17 // to the channels in case of network errors.
21 newConn func([]string) (*pool.Conn, error)
22 closeConn func(*pool.Conn) error
26 channels map[string]struct{}
27 patterns map[string]struct{}
38 func (c *PubSub) init() {
39 c.exit = make(chan struct{})
42 func (c *PubSub) conn() (*pool.Conn, error) {
44 cn, err := c._conn(nil)
49 func (c *PubSub) _conn(channels []string) (*pool.Conn, error) {
51 return nil, pool.ErrClosed
58 cn, err := c.newConn(channels)
63 if err := c.resubscribe(cn); err != nil {
72 func (c *PubSub) resubscribe(cn *pool.Conn) error {
75 if len(c.channels) > 0 {
76 channels := mapKeys(c.channels)
77 err := c._subscribe(cn, "subscribe", channels...)
78 if err != nil && firstErr == nil {
83 if len(c.patterns) > 0 {
84 patterns := mapKeys(c.patterns)
85 err := c._subscribe(cn, "psubscribe", patterns...)
86 if err != nil && firstErr == nil {
94 func mapKeys(m map[string]struct{}) []string {
95 s := make([]string, len(m))
104 func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error {
105 args := make([]interface{}, 1+len(channels))
107 for i, channel := range channels {
110 cmd := NewSliceCmd(args...)
112 cn.SetWriteTimeout(c.opt.WriteTimeout)
113 return writeCmd(cn, cmd)
116 func (c *PubSub) releaseConn(cn *pool.Conn, err error) {
118 c._releaseConn(cn, err)
122 func (c *PubSub) _releaseConn(cn *pool.Conn, err error) {
126 if internal.IsBadConn(err, true) {
131 func (c *PubSub) _closeTheCn() error {
134 err = c.closeConn(c.cn)
140 func (c *PubSub) reconnect() {
146 func (c *PubSub) _reconnect() {
151 func (c *PubSub) Close() error {
156 return pool.ErrClosed
161 err := c._closeTheCn()
165 // Subscribe the client to the specified channels. It returns
166 // empty subscription if there are no channels.
167 func (c *PubSub) Subscribe(channels ...string) error {
171 err := c.subscribe("subscribe", channels...)
172 if c.channels == nil {
173 c.channels = make(map[string]struct{})
175 for _, channel := range channels {
176 c.channels[channel] = struct{}{}
181 // PSubscribe the client to the given patterns. It returns
182 // empty subscription if there are no patterns.
183 func (c *PubSub) PSubscribe(patterns ...string) error {
187 err := c.subscribe("psubscribe", patterns...)
188 if c.patterns == nil {
189 c.patterns = make(map[string]struct{})
191 for _, pattern := range patterns {
192 c.patterns[pattern] = struct{}{}
197 // Unsubscribe the client from the given channels, or from all of
198 // them if none is given.
199 func (c *PubSub) Unsubscribe(channels ...string) error {
203 err := c.subscribe("unsubscribe", channels...)
204 for _, channel := range channels {
205 delete(c.channels, channel)
210 // PUnsubscribe the client from the given patterns, or from all of
211 // them if none is given.
212 func (c *PubSub) PUnsubscribe(patterns ...string) error {
216 err := c.subscribe("punsubscribe", patterns...)
217 for _, pattern := range patterns {
218 delete(c.patterns, pattern)
223 func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
224 cn, err := c._conn(channels)
229 err = c._subscribe(cn, redisCmd, channels...)
230 c._releaseConn(cn, err)
234 func (c *PubSub) Ping(payload ...string) error {
235 args := []interface{}{"ping"}
236 if len(payload) == 1 {
237 args = append(args, payload[0])
239 cmd := NewCmd(args...)
246 cn.SetWriteTimeout(c.opt.WriteTimeout)
247 err = writeCmd(cn, cmd)
248 c.releaseConn(cn, err)
252 // Subscription received after a successful subscription to channel.
253 type Subscription struct {
254 // Can be "subscribe", "unsubscribe", "psubscribe" or "punsubscribe".
256 // Channel name we have subscribed to.
258 // Number of channels we are currently subscribed to.
262 func (m *Subscription) String() string {
263 return fmt.Sprintf("%s: %s", m.Kind, m.Channel)
266 // Message received as result of a PUBLISH command issued by another client.
267 type Message struct {
273 func (m *Message) String() string {
274 return fmt.Sprintf("Message<%s: %s>", m.Channel, m.Payload)
277 // Pong received as result of a PING command issued by another client.
282 func (p *Pong) String() string {
284 return fmt.Sprintf("Pong<%s>", p.Payload)
289 func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
290 switch reply := reply.(type) {
296 switch kind := reply[0].(string); kind {
297 case "subscribe", "unsubscribe", "psubscribe", "punsubscribe":
298 return &Subscription{
300 Channel: reply[1].(string),
301 Count: int(reply[2].(int64)),
305 Channel: reply[1].(string),
306 Payload: reply[2].(string),
310 Pattern: reply[1].(string),
311 Channel: reply[2].(string),
312 Payload: reply[3].(string),
316 Payload: reply[1].(string),
319 return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
322 return nil, fmt.Errorf("redis: unsupported pubsub message: %#v", reply)
326 // ReceiveTimeout acts like Receive but returns an error if message
327 // is not received in time. This is low-level API and in most cases
328 // Channel should be used instead.
329 func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
339 cn.SetReadTimeout(timeout)
340 err = c.cmd.readReply(cn)
341 c.releaseConn(cn, err)
346 return c.newMessage(c.cmd.Val())
349 // Receive returns a message as a Subscription, Message, Pong or error.
350 // See PubSub example for details. This is low-level API and in most cases
351 // Channel should be used instead.
352 func (c *PubSub) Receive() (interface{}, error) {
353 return c.ReceiveTimeout(0)
356 // ReceiveMessage returns a Message or error ignoring Subscription and Pong
357 // messages. This is low-level API and in most cases Channel should be used
359 func (c *PubSub) ReceiveMessage() (*Message, error) {
361 msg, err := c.Receive()
366 switch msg := msg.(type) {
374 err := fmt.Errorf("redis: unknown message: %T", msg)
380 // Channel returns a Go channel for concurrently receiving messages.
381 // It periodically sends Ping messages to test connection health.
382 // The channel is closed with PubSub. Receive* APIs can not be used
383 // after channel is created.
384 func (c *PubSub) Channel() <-chan *Message {
385 c.chOnce.Do(c.initChannel)
389 func (c *PubSub) initChannel() {
390 c.ch = make(chan *Message, 100)
391 c.ping = make(chan struct{}, 10)
396 msg, err := c.Receive()
398 if err == pool.ErrClosed {
403 time.Sleep(c.retryBackoff(errCount))
410 // Any message is as good as a ping.
412 case c.ping <- struct{}{}:
416 switch msg := msg.(type) {
424 internal.Logf("redis: unknown message: %T", msg)
430 const timeout = 5 * time.Second
432 timer := time.NewTimer(timeout)
458 func (c *PubSub) retryBackoff(attempt int) time.Duration {
459 return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)