barometer: update DMA's vendoring packages
[barometer.git] / src / dma / vendor / github.com / go-redis / redis / command.go
index 11472be..cb4f94b 100644 (file)
@@ -1,16 +1,14 @@
 package redis
 
 import (
-       "bytes"
        "fmt"
+       "net"
        "strconv"
        "strings"
        "time"
 
        "github.com/go-redis/redis/internal"
-       "github.com/go-redis/redis/internal/pool"
        "github.com/go-redis/redis/internal/proto"
-       "github.com/go-redis/redis/internal/util"
 )
 
 type Cmder interface {
@@ -18,13 +16,12 @@ type Cmder interface {
        Args() []interface{}
        stringArg(int) string
 
-       readReply(*pool.Conn) error
+       readReply(rd *proto.Reader) error
        setErr(error)
 
        readTimeout() *time.Duration
 
        Err() error
-       fmt.Stringer
 }
 
 func setCmdsErr(cmds []Cmder, e error) {
@@ -35,7 +32,7 @@ func setCmdsErr(cmds []Cmder, e error) {
        }
 }
 
-func firstCmdsErr(cmds []Cmder) error {
+func cmdsFirstErr(cmds []Cmder) error {
        for _, cmd := range cmds {
                if err := cmd.Err(); err != nil {
                        return err
@@ -44,16 +41,14 @@ func firstCmdsErr(cmds []Cmder) error {
        return nil
 }
 
-func writeCmd(cn *pool.Conn, cmds ...Cmder) error {
-       cn.Wb.Reset()
+func writeCmd(wr *proto.Writer, cmds ...Cmder) error {
        for _, cmd := range cmds {
-               if err := cn.Wb.Append(cmd.Args()); err != nil {
+               err := wr.WriteArgs(cmd.Args())
+               if err != nil {
                        return err
                }
        }
-
-       _, err := cn.Write(cn.Wb.Bytes())
-       return err
+       return nil
 }
 
 func cmdString(cmd Cmder, val interface{}) string {
@@ -165,20 +160,124 @@ func (cmd *Cmd) Result() (interface{}, error) {
        return cmd.val, cmd.err
 }
 
-func (cmd *Cmd) String() string {
-       return cmdString(cmd, cmd.val)
+func (cmd *Cmd) String() (string, error) {
+       if cmd.err != nil {
+               return "", cmd.err
+       }
+       switch val := cmd.val.(type) {
+       case string:
+               return val, nil
+       default:
+               err := fmt.Errorf("redis: unexpected type=%T for String", val)
+               return "", err
+       }
 }
 
-func (cmd *Cmd) readReply(cn *pool.Conn) error {
-       cmd.val, cmd.err = cn.Rd.ReadReply(sliceParser)
+func (cmd *Cmd) Int() (int, error) {
        if cmd.err != nil {
-               return cmd.err
+               return 0, cmd.err
        }
-       if b, ok := cmd.val.([]byte); ok {
-               // Bytes must be copied, because underlying memory is reused.
-               cmd.val = string(b)
+       switch val := cmd.val.(type) {
+       case int64:
+               return int(val), nil
+       case string:
+               return strconv.Atoi(val)
+       default:
+               err := fmt.Errorf("redis: unexpected type=%T for Int", val)
+               return 0, err
        }
-       return nil
+}
+
+func (cmd *Cmd) Int64() (int64, error) {
+       if cmd.err != nil {
+               return 0, cmd.err
+       }
+       switch val := cmd.val.(type) {
+       case int64:
+               return val, nil
+       case string:
+               return strconv.ParseInt(val, 10, 64)
+       default:
+               err := fmt.Errorf("redis: unexpected type=%T for Int64", val)
+               return 0, err
+       }
+}
+
+func (cmd *Cmd) Uint64() (uint64, error) {
+       if cmd.err != nil {
+               return 0, cmd.err
+       }
+       switch val := cmd.val.(type) {
+       case int64:
+               return uint64(val), nil
+       case string:
+               return strconv.ParseUint(val, 10, 64)
+       default:
+               err := fmt.Errorf("redis: unexpected type=%T for Uint64", val)
+               return 0, err
+       }
+}
+
+func (cmd *Cmd) Float64() (float64, error) {
+       if cmd.err != nil {
+               return 0, cmd.err
+       }
+       switch val := cmd.val.(type) {
+       case int64:
+               return float64(val), nil
+       case string:
+               return strconv.ParseFloat(val, 64)
+       default:
+               err := fmt.Errorf("redis: unexpected type=%T for Float64", val)
+               return 0, err
+       }
+}
+
+func (cmd *Cmd) Bool() (bool, error) {
+       if cmd.err != nil {
+               return false, cmd.err
+       }
+       switch val := cmd.val.(type) {
+       case int64:
+               return val != 0, nil
+       case string:
+               return strconv.ParseBool(val)
+       default:
+               err := fmt.Errorf("redis: unexpected type=%T for Bool", val)
+               return false, err
+       }
+}
+
+func (cmd *Cmd) readReply(rd *proto.Reader) error {
+       cmd.val, cmd.err = rd.ReadReply(sliceParser)
+       return cmd.err
+}
+
+// Implements proto.MultiBulkParse
+func sliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+       vals := make([]interface{}, 0, n)
+       for i := int64(0); i < n; i++ {
+               v, err := rd.ReadReply(sliceParser)
+               if err != nil {
+                       if err == Nil {
+                               vals = append(vals, nil)
+                               continue
+                       }
+                       if err, ok := err.(proto.RedisError); ok {
+                               vals = append(vals, err)
+                               continue
+                       }
+                       return nil, err
+               }
+
+               switch v := v.(type) {
+               case string:
+                       vals = append(vals, v)
+               default:
+                       vals = append(vals, v)
+               }
+       }
+       return vals, nil
 }
 
 //------------------------------------------------------------------------------
@@ -209,9 +308,9 @@ func (cmd *SliceCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-func (cmd *SliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *SliceCmd) readReply(rd *proto.Reader) error {
        var v interface{}
-       v, cmd.err = cn.Rd.ReadArrayReply(sliceParser)
+       v, cmd.err = rd.ReadArrayReply(sliceParser)
        if cmd.err != nil {
                return cmd.err
        }
@@ -247,8 +346,8 @@ func (cmd *StatusCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-func (cmd *StatusCmd) readReply(cn *pool.Conn) error {
-       cmd.val, cmd.err = cn.Rd.ReadStringReply()
+func (cmd *StatusCmd) readReply(rd *proto.Reader) error {
+       cmd.val, cmd.err = rd.ReadString()
        return cmd.err
 }
 
@@ -280,8 +379,8 @@ func (cmd *IntCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-func (cmd *IntCmd) readReply(cn *pool.Conn) error {
-       cmd.val, cmd.err = cn.Rd.ReadIntReply()
+func (cmd *IntCmd) readReply(rd *proto.Reader) error {
+       cmd.val, cmd.err = rd.ReadIntReply()
        return cmd.err
 }
 
@@ -315,9 +414,9 @@ func (cmd *DurationCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-func (cmd *DurationCmd) readReply(cn *pool.Conn) error {
+func (cmd *DurationCmd) readReply(rd *proto.Reader) error {
        var n int64
-       n, cmd.err = cn.Rd.ReadIntReply()
+       n, cmd.err = rd.ReadIntReply()
        if cmd.err != nil {
                return cmd.err
        }
@@ -353,9 +452,9 @@ func (cmd *TimeCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-func (cmd *TimeCmd) readReply(cn *pool.Conn) error {
+func (cmd *TimeCmd) readReply(rd *proto.Reader) error {
        var v interface{}
-       v, cmd.err = cn.Rd.ReadArrayReply(timeParser)
+       v, cmd.err = rd.ReadArrayReply(timeParser)
        if cmd.err != nil {
                return cmd.err
        }
@@ -363,6 +462,25 @@ func (cmd *TimeCmd) readReply(cn *pool.Conn) error {
        return nil
 }
 
+// Implements proto.MultiBulkParse
+func timeParser(rd *proto.Reader, n int64) (interface{}, error) {
+       if n != 2 {
+               return nil, fmt.Errorf("got %d elements, expected 2", n)
+       }
+
+       sec, err := rd.ReadInt()
+       if err != nil {
+               return nil, err
+       }
+
+       microsec, err := rd.ReadInt()
+       if err != nil {
+               return nil, err
+       }
+
+       return time.Unix(sec, microsec*1000), nil
+}
+
 //------------------------------------------------------------------------------
 
 type BoolCmd struct {
@@ -391,11 +509,9 @@ func (cmd *BoolCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-var ok = []byte("OK")
-
-func (cmd *BoolCmd) readReply(cn *pool.Conn) error {
+func (cmd *BoolCmd) readReply(rd *proto.Reader) error {
        var v interface{}
-       v, cmd.err = cn.Rd.ReadReply(nil)
+       v, cmd.err = rd.ReadReply(nil)
        // `SET key value NX` returns nil when key already exists. But
        // `SETNX key value` returns bool (0/1). So convert nil to bool.
        // TODO: is this okay?
@@ -411,8 +527,8 @@ func (cmd *BoolCmd) readReply(cn *pool.Conn) error {
        case int64:
                cmd.val = v == 1
                return nil
-       case []byte:
-               cmd.val = bytes.Equal(v, ok)
+       case string:
+               cmd.val = v == "OK"
                return nil
        default:
                cmd.err = fmt.Errorf("got %T, wanted int64 or string", v)
@@ -425,7 +541,7 @@ func (cmd *BoolCmd) readReply(cn *pool.Conn) error {
 type StringCmd struct {
        baseCmd
 
-       val []byte
+       val string
 }
 
 var _ Cmder = (*StringCmd)(nil)
@@ -437,7 +553,7 @@ func NewStringCmd(args ...interface{}) *StringCmd {
 }
 
 func (cmd *StringCmd) Val() string {
-       return util.BytesToString(cmd.val)
+       return cmd.val
 }
 
 func (cmd *StringCmd) Result() (string, error) {
@@ -445,7 +561,14 @@ func (cmd *StringCmd) Result() (string, error) {
 }
 
 func (cmd *StringCmd) Bytes() ([]byte, error) {
-       return cmd.val, cmd.err
+       return []byte(cmd.val), cmd.err
+}
+
+func (cmd *StringCmd) Int() (int, error) {
+       if cmd.err != nil {
+               return 0, cmd.err
+       }
+       return strconv.Atoi(cmd.Val())
 }
 
 func (cmd *StringCmd) Int64() (int64, error) {
@@ -473,15 +596,15 @@ func (cmd *StringCmd) Scan(val interface{}) error {
        if cmd.err != nil {
                return cmd.err
        }
-       return proto.Scan(cmd.val, val)
+       return proto.Scan([]byte(cmd.val), val)
 }
 
 func (cmd *StringCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-func (cmd *StringCmd) readReply(cn *pool.Conn) error {
-       cmd.val, cmd.err = cn.Rd.ReadBytesReply()
+func (cmd *StringCmd) readReply(rd *proto.Reader) error {
+       cmd.val, cmd.err = rd.ReadString()
        return cmd.err
 }
 
@@ -513,8 +636,8 @@ func (cmd *FloatCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-func (cmd *FloatCmd) readReply(cn *pool.Conn) error {
-       cmd.val, cmd.err = cn.Rd.ReadFloatReply()
+func (cmd *FloatCmd) readReply(rd *proto.Reader) error {
+       cmd.val, cmd.err = rd.ReadFloatReply()
        return cmd.err
 }
 
@@ -550,9 +673,9 @@ func (cmd *StringSliceCmd) ScanSlice(container interface{}) error {
        return proto.ScanSlice(cmd.Val(), container)
 }
 
-func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *StringSliceCmd) readReply(rd *proto.Reader) error {
        var v interface{}
-       v, cmd.err = cn.Rd.ReadArrayReply(stringSliceParser)
+       v, cmd.err = rd.ReadArrayReply(stringSliceParser)
        if cmd.err != nil {
                return cmd.err
        }
@@ -560,6 +683,22 @@ func (cmd *StringSliceCmd) readReply(cn *pool.Conn) error {
        return nil
 }
 
+// Implements proto.MultiBulkParse
+func stringSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+       ss := make([]string, 0, n)
+       for i := int64(0); i < n; i++ {
+               s, err := rd.ReadString()
+               if err == Nil {
+                       ss = append(ss, "")
+               } else if err != nil {
+                       return nil, err
+               } else {
+                       ss = append(ss, s)
+               }
+       }
+       return ss, nil
+}
+
 //------------------------------------------------------------------------------
 
 type BoolSliceCmd struct {
@@ -588,9 +727,9 @@ func (cmd *BoolSliceCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *BoolSliceCmd) readReply(rd *proto.Reader) error {
        var v interface{}
-       v, cmd.err = cn.Rd.ReadArrayReply(boolSliceParser)
+       v, cmd.err = rd.ReadArrayReply(boolSliceParser)
        if cmd.err != nil {
                return cmd.err
        }
@@ -598,6 +737,19 @@ func (cmd *BoolSliceCmd) readReply(cn *pool.Conn) error {
        return nil
 }
 
+// Implements proto.MultiBulkParse
+func boolSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+       bools := make([]bool, 0, n)
+       for i := int64(0); i < n; i++ {
+               n, err := rd.ReadIntReply()
+               if err != nil {
+                       return nil, err
+               }
+               bools = append(bools, n == 1)
+       }
+       return bools, nil
+}
+
 //------------------------------------------------------------------------------
 
 type StringStringMapCmd struct {
@@ -626,9 +778,9 @@ func (cmd *StringStringMapCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error {
+func (cmd *StringStringMapCmd) readReply(rd *proto.Reader) error {
        var v interface{}
-       v, cmd.err = cn.Rd.ReadArrayReply(stringStringMapParser)
+       v, cmd.err = rd.ReadArrayReply(stringStringMapParser)
        if cmd.err != nil {
                return cmd.err
        }
@@ -636,6 +788,25 @@ func (cmd *StringStringMapCmd) readReply(cn *pool.Conn) error {
        return nil
 }
 
+// Implements proto.MultiBulkParse
+func stringStringMapParser(rd *proto.Reader, n int64) (interface{}, error) {
+       m := make(map[string]string, n/2)
+       for i := int64(0); i < n; i += 2 {
+               key, err := rd.ReadString()
+               if err != nil {
+                       return nil, err
+               }
+
+               value, err := rd.ReadString()
+               if err != nil {
+                       return nil, err
+               }
+
+               m[key] = value
+       }
+       return m, nil
+}
+
 //------------------------------------------------------------------------------
 
 type StringIntMapCmd struct {
@@ -664,9 +835,9 @@ func (cmd *StringIntMapCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error {
+func (cmd *StringIntMapCmd) readReply(rd *proto.Reader) error {
        var v interface{}
-       v, cmd.err = cn.Rd.ReadArrayReply(stringIntMapParser)
+       v, cmd.err = rd.ReadArrayReply(stringIntMapParser)
        if cmd.err != nil {
                return cmd.err
        }
@@ -674,6 +845,25 @@ func (cmd *StringIntMapCmd) readReply(cn *pool.Conn) error {
        return nil
 }
 
+// Implements proto.MultiBulkParse
+func stringIntMapParser(rd *proto.Reader, n int64) (interface{}, error) {
+       m := make(map[string]int64, n/2)
+       for i := int64(0); i < n; i += 2 {
+               key, err := rd.ReadString()
+               if err != nil {
+                       return nil, err
+               }
+
+               n, err := rd.ReadIntReply()
+               if err != nil {
+                       return nil, err
+               }
+
+               m[key] = n
+       }
+       return m, nil
+}
+
 //------------------------------------------------------------------------------
 
 type StringStructMapCmd struct {
@@ -702,9 +892,9 @@ func (cmd *StringStructMapCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error {
+func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error {
        var v interface{}
-       v, cmd.err = cn.Rd.ReadArrayReply(stringStructMapParser)
+       v, cmd.err = rd.ReadArrayReply(stringStructMapParser)
        if cmd.err != nil {
                return cmd.err
        }
@@ -712,24 +902,121 @@ func (cmd *StringStructMapCmd) readReply(cn *pool.Conn) error {
        return nil
 }
 
-//------------------------------------------------------------------------------
+// Implements proto.MultiBulkParse
+func stringStructMapParser(rd *proto.Reader, n int64) (interface{}, error) {
+       m := make(map[string]struct{}, n)
+       for i := int64(0); i < n; i++ {
+               key, err := rd.ReadString()
+               if err != nil {
+                       return nil, err
+               }
 
-type XStream struct {
-       Stream   string
-       Messages []*XMessage
+               m[key] = struct{}{}
+       }
+       return m, nil
 }
 
+//------------------------------------------------------------------------------
+
 type XMessage struct {
        ID     string
        Values map[string]interface{}
 }
 
+type XMessageSliceCmd struct {
+       baseCmd
+
+       val []XMessage
+}
+
+var _ Cmder = (*XMessageSliceCmd)(nil)
+
+func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd {
+       return &XMessageSliceCmd{
+               baseCmd: baseCmd{_args: args},
+       }
+}
+
+func (cmd *XMessageSliceCmd) Val() []XMessage {
+       return cmd.val
+}
+
+func (cmd *XMessageSliceCmd) Result() ([]XMessage, error) {
+       return cmd.val, cmd.err
+}
+
+func (cmd *XMessageSliceCmd) String() string {
+       return cmdString(cmd, cmd.val)
+}
+
+func (cmd *XMessageSliceCmd) readReply(rd *proto.Reader) error {
+       var v interface{}
+       v, cmd.err = rd.ReadArrayReply(xMessageSliceParser)
+       if cmd.err != nil {
+               return cmd.err
+       }
+       cmd.val = v.([]XMessage)
+       return nil
+}
+
+// Implements proto.MultiBulkParse
+func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+       msgs := make([]XMessage, 0, n)
+       for i := int64(0); i < n; i++ {
+               _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+                       id, err := rd.ReadString()
+                       if err != nil {
+                               return nil, err
+                       }
+
+                       v, err := rd.ReadArrayReply(stringInterfaceMapParser)
+                       if err != nil {
+                               return nil, err
+                       }
+
+                       msgs = append(msgs, XMessage{
+                               ID:     id,
+                               Values: v.(map[string]interface{}),
+                       })
+                       return nil, nil
+               })
+               if err != nil {
+                       return nil, err
+               }
+       }
+       return msgs, nil
+}
+
+// Implements proto.MultiBulkParse
+func stringInterfaceMapParser(rd *proto.Reader, n int64) (interface{}, error) {
+       m := make(map[string]interface{}, n/2)
+       for i := int64(0); i < n; i += 2 {
+               key, err := rd.ReadString()
+               if err != nil {
+                       return nil, err
+               }
+
+               value, err := rd.ReadString()
+               if err != nil {
+                       return nil, err
+               }
+
+               m[key] = value
+       }
+       return m, nil
+}
+
 //------------------------------------------------------------------------------
 
+type XStream struct {
+       Stream   string
+       Messages []XMessage
+}
+
 type XStreamSliceCmd struct {
        baseCmd
 
-       val []*XStream
+       val []XStream
 }
 
 var _ Cmder = (*XStreamSliceCmd)(nil)
@@ -740,11 +1027,11 @@ func NewXStreamSliceCmd(args ...interface{}) *XStreamSliceCmd {
        }
 }
 
-func (cmd *XStreamSliceCmd) Val() []*XStream {
+func (cmd *XStreamSliceCmd) Val() []XStream {
        return cmd.val
 }
 
-func (cmd *XStreamSliceCmd) Result() ([]*XStream, error) {
+func (cmd *XStreamSliceCmd) Result() ([]XStream, error) {
        return cmd.val, cmd.err
 }
 
@@ -752,177 +1039,364 @@ func (cmd *XStreamSliceCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-func (cmd *XStreamSliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *XStreamSliceCmd) readReply(rd *proto.Reader) error {
        var v interface{}
-       v, cmd.err = cn.Rd.ReadArrayReply(xStreamSliceParser)
+       v, cmd.err = rd.ReadArrayReply(xStreamSliceParser)
        if cmd.err != nil {
                return cmd.err
        }
-       cmd.val = v.([]*XStream)
+       cmd.val = v.([]XStream)
        return nil
 }
 
 // Implements proto.MultiBulkParse
 func xStreamSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
-       xx := make([]*XStream, n)
+       ret := make([]XStream, 0, n)
        for i := int64(0); i < n; i++ {
-               v, err := rd.ReadArrayReply(xStreamParser)
+               _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+                       if n != 2 {
+                               return nil, fmt.Errorf("got %d, wanted 2", n)
+                       }
+
+                       stream, err := rd.ReadString()
+                       if err != nil {
+                               return nil, err
+                       }
+
+                       v, err := rd.ReadArrayReply(xMessageSliceParser)
+                       if err != nil {
+                               return nil, err
+                       }
+
+                       ret = append(ret, XStream{
+                               Stream:   stream,
+                               Messages: v.([]XMessage),
+                       })
+                       return nil, nil
+               })
                if err != nil {
                        return nil, err
                }
-               xx[i] = v.(*XStream)
        }
-       return xx, nil
+       return ret, nil
 }
 
-// Implements proto.MultiBulkParse
-func xStreamParser(rd *proto.Reader, n int64) (interface{}, error) {
-       if n != 2 {
-               return nil, fmt.Errorf("got %d, wanted 2", n)
+//------------------------------------------------------------------------------
+
+type XPending struct {
+       Count     int64
+       Lower     string
+       Higher    string
+       Consumers map[string]int64
+}
+
+type XPendingCmd struct {
+       baseCmd
+       val *XPending
+}
+
+var _ Cmder = (*XPendingCmd)(nil)
+
+func NewXPendingCmd(args ...interface{}) *XPendingCmd {
+       return &XPendingCmd{
+               baseCmd: baseCmd{_args: args},
        }
+}
+
+func (cmd *XPendingCmd) Val() *XPending {
+       return cmd.val
+}
+
+func (cmd *XPendingCmd) Result() (*XPending, error) {
+       return cmd.val, cmd.err
+}
+
+func (cmd *XPendingCmd) String() string {
+       return cmdString(cmd, cmd.val)
+}
+
+func (cmd *XPendingCmd) readReply(rd *proto.Reader) error {
+       var info interface{}
+       info, cmd.err = rd.ReadArrayReply(xPendingParser)
+       if cmd.err != nil {
+               return cmd.err
+       }
+       cmd.val = info.(*XPending)
+       return nil
+}
 
-       stream, err := rd.ReadStringReply()
+func xPendingParser(rd *proto.Reader, n int64) (interface{}, error) {
+       if n != 4 {
+               return nil, fmt.Errorf("got %d, wanted 4", n)
+       }
+
+       count, err := rd.ReadIntReply()
        if err != nil {
                return nil, err
        }
 
-       v, err := rd.ReadArrayReply(xMessageSliceParser)
-       if err != nil {
+       lower, err := rd.ReadString()
+       if err != nil && err != Nil {
                return nil, err
        }
 
-       return &XStream{
-               Stream:   stream,
-               Messages: v.([]*XMessage),
-       }, nil
+       higher, err := rd.ReadString()
+       if err != nil && err != Nil {
+               return nil, err
+       }
+
+       pending := &XPending{
+               Count:  count,
+               Lower:  lower,
+               Higher: higher,
+       }
+       _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+               for i := int64(0); i < n; i++ {
+                       _, err = rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+                               if n != 2 {
+                                       return nil, fmt.Errorf("got %d, wanted 2", n)
+                               }
+
+                               consumerName, err := rd.ReadString()
+                               if err != nil {
+                                       return nil, err
+                               }
+
+                               consumerPending, err := rd.ReadInt()
+                               if err != nil {
+                                       return nil, err
+                               }
+
+                               if pending.Consumers == nil {
+                                       pending.Consumers = make(map[string]int64)
+                               }
+                               pending.Consumers[consumerName] = consumerPending
+
+                               return nil, nil
+                       })
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+               return nil, nil
+       })
+       if err != nil && err != Nil {
+               return nil, err
+       }
+
+       return pending, nil
 }
 
 //------------------------------------------------------------------------------
 
-type XMessageSliceCmd struct {
-       baseCmd
+type XPendingExt struct {
+       Id         string
+       Consumer   string
+       Idle       time.Duration
+       RetryCount int64
+}
 
-       val []*XMessage
+type XPendingExtCmd struct {
+       baseCmd
+       val []XPendingExt
 }
 
-var _ Cmder = (*XMessageSliceCmd)(nil)
+var _ Cmder = (*XPendingExtCmd)(nil)
 
-func NewXMessageSliceCmd(args ...interface{}) *XMessageSliceCmd {
-       return &XMessageSliceCmd{
+func NewXPendingExtCmd(args ...interface{}) *XPendingExtCmd {
+       return &XPendingExtCmd{
                baseCmd: baseCmd{_args: args},
        }
 }
 
-func (cmd *XMessageSliceCmd) Val() []*XMessage {
+func (cmd *XPendingExtCmd) Val() []XPendingExt {
        return cmd.val
 }
 
-func (cmd *XMessageSliceCmd) Result() ([]*XMessage, error) {
+func (cmd *XPendingExtCmd) Result() ([]XPendingExt, error) {
        return cmd.val, cmd.err
 }
 
-func (cmd *XMessageSliceCmd) String() string {
+func (cmd *XPendingExtCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-func (cmd *XMessageSliceCmd) readReply(cn *pool.Conn) error {
-       var v interface{}
-       v, cmd.err = cn.Rd.ReadArrayReply(xMessageSliceParser)
+func (cmd *XPendingExtCmd) readReply(rd *proto.Reader) error {
+       var info interface{}
+       info, cmd.err = rd.ReadArrayReply(xPendingExtSliceParser)
        if cmd.err != nil {
                return cmd.err
        }
-       cmd.val = v.([]*XMessage)
+       cmd.val = info.([]XPendingExt)
        return nil
 }
 
-// Implements proto.MultiBulkParse
-func xMessageSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
-       msgs := make([]*XMessage, n)
+func xPendingExtSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+       ret := make([]XPendingExt, 0, n)
        for i := int64(0); i < n; i++ {
-               v, err := rd.ReadArrayReply(xMessageParser)
+               _, err := rd.ReadArrayReply(func(rd *proto.Reader, n int64) (interface{}, error) {
+                       if n != 4 {
+                               return nil, fmt.Errorf("got %d, wanted 4", n)
+                       }
+
+                       id, err := rd.ReadString()
+                       if err != nil {
+                               return nil, err
+                       }
+
+                       consumer, err := rd.ReadString()
+                       if err != nil && err != Nil {
+                               return nil, err
+                       }
+
+                       idle, err := rd.ReadIntReply()
+                       if err != nil && err != Nil {
+                               return nil, err
+                       }
+
+                       retryCount, err := rd.ReadIntReply()
+                       if err != nil && err != Nil {
+                               return nil, err
+                       }
+
+                       ret = append(ret, XPendingExt{
+                               Id:         id,
+                               Consumer:   consumer,
+                               Idle:       time.Duration(idle) * time.Millisecond,
+                               RetryCount: retryCount,
+                       })
+                       return nil, nil
+               })
                if err != nil {
                        return nil, err
                }
-               msgs[i] = v.(*XMessage)
        }
-       return msgs, nil
+       return ret, nil
 }
 
-// Implements proto.MultiBulkParse
-func xMessageParser(rd *proto.Reader, n int64) (interface{}, error) {
-       id, err := rd.ReadStringReply()
-       if err != nil {
-               return nil, err
-       }
+//------------------------------------------------------------------------------
 
-       v, err := rd.ReadArrayReply(xKeyValueParser)
-       if err != nil {
-               return nil, err
+//------------------------------------------------------------------------------
+
+type ZSliceCmd struct {
+       baseCmd
+
+       val []Z
+}
+
+var _ Cmder = (*ZSliceCmd)(nil)
+
+func NewZSliceCmd(args ...interface{}) *ZSliceCmd {
+       return &ZSliceCmd{
+               baseCmd: baseCmd{_args: args},
        }
+}
+
+func (cmd *ZSliceCmd) Val() []Z {
+       return cmd.val
+}
 
-       return &XMessage{
-               ID:     id,
-               Values: v.(map[string]interface{}),
-       }, nil
+func (cmd *ZSliceCmd) Result() ([]Z, error) {
+       return cmd.val, cmd.err
+}
+
+func (cmd *ZSliceCmd) String() string {
+       return cmdString(cmd, cmd.val)
+}
+
+func (cmd *ZSliceCmd) readReply(rd *proto.Reader) error {
+       var v interface{}
+       v, cmd.err = rd.ReadArrayReply(zSliceParser)
+       if cmd.err != nil {
+               return cmd.err
+       }
+       cmd.val = v.([]Z)
+       return nil
 }
 
 // Implements proto.MultiBulkParse
-func xKeyValueParser(rd *proto.Reader, n int64) (interface{}, error) {
-       values := make(map[string]interface{}, n)
+func zSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+       zz := make([]Z, n/2)
        for i := int64(0); i < n; i += 2 {
-               key, err := rd.ReadStringReply()
+               var err error
+
+               z := &zz[i/2]
+
+               z.Member, err = rd.ReadString()
                if err != nil {
                        return nil, err
                }
 
-               value, err := rd.ReadStringReply()
+               z.Score, err = rd.ReadFloatReply()
                if err != nil {
                        return nil, err
                }
-
-               values[key] = value
        }
-       return values, nil
+       return zz, nil
 }
 
 //------------------------------------------------------------------------------
 
-type ZSliceCmd struct {
+type ZWithKeyCmd struct {
        baseCmd
 
-       val []Z
+       val ZWithKey
 }
 
-var _ Cmder = (*ZSliceCmd)(nil)
+var _ Cmder = (*ZWithKeyCmd)(nil)
 
-func NewZSliceCmd(args ...interface{}) *ZSliceCmd {
-       return &ZSliceCmd{
+func NewZWithKeyCmd(args ...interface{}) *ZWithKeyCmd {
+       return &ZWithKeyCmd{
                baseCmd: baseCmd{_args: args},
        }
 }
 
-func (cmd *ZSliceCmd) Val() []Z {
+func (cmd *ZWithKeyCmd) Val() ZWithKey {
        return cmd.val
 }
 
-func (cmd *ZSliceCmd) Result() ([]Z, error) {
-       return cmd.val, cmd.err
+func (cmd *ZWithKeyCmd) Result() (ZWithKey, error) {
+       return cmd.Val(), cmd.Err()
 }
 
-func (cmd *ZSliceCmd) String() string {
+func (cmd *ZWithKeyCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-func (cmd *ZSliceCmd) readReply(cn *pool.Conn) error {
+func (cmd *ZWithKeyCmd) readReply(rd *proto.Reader) error {
        var v interface{}
-       v, cmd.err = cn.Rd.ReadArrayReply(zSliceParser)
+       v, cmd.err = rd.ReadArrayReply(zWithKeyParser)
        if cmd.err != nil {
                return cmd.err
        }
-       cmd.val = v.([]Z)
+       cmd.val = v.(ZWithKey)
        return nil
 }
 
+// Implements proto.MultiBulkParse
+func zWithKeyParser(rd *proto.Reader, n int64) (interface{}, error) {
+       if n != 3 {
+               return nil, fmt.Errorf("got %d elements, expected 3", n)
+       }
+
+       var z ZWithKey
+       var err error
+
+       z.Key, err = rd.ReadString()
+       if err != nil {
+               return nil, err
+       }
+       z.Member, err = rd.ReadString()
+       if err != nil {
+               return nil, err
+       }
+       z.Score, err = rd.ReadFloatReply()
+       if err != nil {
+               return nil, err
+       }
+       return z, nil
+}
+
 //------------------------------------------------------------------------------
 
 type ScanCmd struct {
@@ -955,8 +1429,8 @@ func (cmd *ScanCmd) String() string {
        return cmdString(cmd, cmd.page)
 }
 
-func (cmd *ScanCmd) readReply(cn *pool.Conn) error {
-       cmd.page, cmd.cursor, cmd.err = cn.Rd.ReadScanReply()
+func (cmd *ScanCmd) readReply(rd *proto.Reader) error {
+       cmd.page, cmd.cursor, cmd.err = rd.ReadScanReply()
        return cmd.err
 }
 
@@ -1006,9 +1480,9 @@ func (cmd *ClusterSlotsCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error {
+func (cmd *ClusterSlotsCmd) readReply(rd *proto.Reader) error {
        var v interface{}
-       v, cmd.err = cn.Rd.ReadArrayReply(clusterSlotsParser)
+       v, cmd.err = rd.ReadArrayReply(clusterSlotsParser)
        if cmd.err != nil {
                return cmd.err
        }
@@ -1016,6 +1490,70 @@ func (cmd *ClusterSlotsCmd) readReply(cn *pool.Conn) error {
        return nil
 }
 
+// Implements proto.MultiBulkParse
+func clusterSlotsParser(rd *proto.Reader, n int64) (interface{}, error) {
+       slots := make([]ClusterSlot, n)
+       for i := 0; i < len(slots); i++ {
+               n, err := rd.ReadArrayLen()
+               if err != nil {
+                       return nil, err
+               }
+               if n < 2 {
+                       err := fmt.Errorf("redis: got %d elements in cluster info, expected at least 2", n)
+                       return nil, err
+               }
+
+               start, err := rd.ReadIntReply()
+               if err != nil {
+                       return nil, err
+               }
+
+               end, err := rd.ReadIntReply()
+               if err != nil {
+                       return nil, err
+               }
+
+               nodes := make([]ClusterNode, n-2)
+               for j := 0; j < len(nodes); j++ {
+                       n, err := rd.ReadArrayLen()
+                       if err != nil {
+                               return nil, err
+                       }
+                       if n != 2 && n != 3 {
+                               err := fmt.Errorf("got %d elements in cluster info address, expected 2 or 3", n)
+                               return nil, err
+                       }
+
+                       ip, err := rd.ReadString()
+                       if err != nil {
+                               return nil, err
+                       }
+
+                       port, err := rd.ReadString()
+                       if err != nil {
+                               return nil, err
+                       }
+
+                       nodes[j].Addr = net.JoinHostPort(ip, port)
+
+                       if n == 3 {
+                               id, err := rd.ReadString()
+                               if err != nil {
+                                       return nil, err
+                               }
+                               nodes[j].Id = id
+                       }
+               }
+
+               slots[i] = ClusterSlot{
+                       Start: int(start),
+                       End:   int(end),
+                       Nodes: nodes,
+               }
+       }
+       return slots, nil
+}
+
 //------------------------------------------------------------------------------
 
 // GeoLocation is used with GeoAdd to add geospatial location.
@@ -1097,9 +1635,9 @@ func (cmd *GeoLocationCmd) String() string {
        return cmdString(cmd, cmd.locations)
 }
 
-func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error {
+func (cmd *GeoLocationCmd) readReply(rd *proto.Reader) error {
        var v interface{}
-       v, cmd.err = cn.Rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q))
+       v, cmd.err = rd.ReadArrayReply(newGeoLocationSliceParser(cmd.q))
        if cmd.err != nil {
                return cmd.err
        }
@@ -1107,6 +1645,73 @@ func (cmd *GeoLocationCmd) readReply(cn *pool.Conn) error {
        return nil
 }
 
+func newGeoLocationParser(q *GeoRadiusQuery) proto.MultiBulkParse {
+       return func(rd *proto.Reader, n int64) (interface{}, error) {
+               var loc GeoLocation
+               var err error
+
+               loc.Name, err = rd.ReadString()
+               if err != nil {
+                       return nil, err
+               }
+               if q.WithDist {
+                       loc.Dist, err = rd.ReadFloatReply()
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+               if q.WithGeoHash {
+                       loc.GeoHash, err = rd.ReadIntReply()
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+               if q.WithCoord {
+                       n, err := rd.ReadArrayLen()
+                       if err != nil {
+                               return nil, err
+                       }
+                       if n != 2 {
+                               return nil, fmt.Errorf("got %d coordinates, expected 2", n)
+                       }
+
+                       loc.Longitude, err = rd.ReadFloatReply()
+                       if err != nil {
+                               return nil, err
+                       }
+                       loc.Latitude, err = rd.ReadFloatReply()
+                       if err != nil {
+                               return nil, err
+                       }
+               }
+
+               return &loc, nil
+       }
+}
+
+func newGeoLocationSliceParser(q *GeoRadiusQuery) proto.MultiBulkParse {
+       return func(rd *proto.Reader, n int64) (interface{}, error) {
+               locs := make([]GeoLocation, 0, n)
+               for i := int64(0); i < n; i++ {
+                       v, err := rd.ReadReply(newGeoLocationParser(q))
+                       if err != nil {
+                               return nil, err
+                       }
+                       switch vv := v.(type) {
+                       case string:
+                               locs = append(locs, GeoLocation{
+                                       Name: vv,
+                               })
+                       case *GeoLocation:
+                               locs = append(locs, *vv)
+                       default:
+                               return nil, fmt.Errorf("got %T, expected string or *GeoLocation", v)
+                       }
+               }
+               return locs, nil
+       }
+}
+
 //------------------------------------------------------------------------------
 
 type GeoPos struct {
@@ -1139,9 +1744,9 @@ func (cmd *GeoPosCmd) String() string {
        return cmdString(cmd, cmd.positions)
 }
 
-func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error {
+func (cmd *GeoPosCmd) readReply(rd *proto.Reader) error {
        var v interface{}
-       v, cmd.err = cn.Rd.ReadArrayReply(geoPosSliceParser)
+       v, cmd.err = rd.ReadArrayReply(geoPosSliceParser)
        if cmd.err != nil {
                return cmd.err
        }
@@ -1149,6 +1754,44 @@ func (cmd *GeoPosCmd) readReply(cn *pool.Conn) error {
        return nil
 }
 
+func geoPosSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+       positions := make([]*GeoPos, 0, n)
+       for i := int64(0); i < n; i++ {
+               v, err := rd.ReadReply(geoPosParser)
+               if err != nil {
+                       if err == Nil {
+                               positions = append(positions, nil)
+                               continue
+                       }
+                       return nil, err
+               }
+               switch v := v.(type) {
+               case *GeoPos:
+                       positions = append(positions, v)
+               default:
+                       return nil, fmt.Errorf("got %T, expected *GeoPos", v)
+               }
+       }
+       return positions, nil
+}
+
+func geoPosParser(rd *proto.Reader, n int64) (interface{}, error) {
+       var pos GeoPos
+       var err error
+
+       pos.Longitude, err = rd.ReadFloatReply()
+       if err != nil {
+               return nil, err
+       }
+
+       pos.Latitude, err = rd.ReadFloatReply()
+       if err != nil {
+               return nil, err
+       }
+
+       return &pos, nil
+}
+
 //------------------------------------------------------------------------------
 
 type CommandInfo struct {
@@ -1187,9 +1830,9 @@ func (cmd *CommandsInfoCmd) String() string {
        return cmdString(cmd, cmd.val)
 }
 
-func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error {
+func (cmd *CommandsInfoCmd) readReply(rd *proto.Reader) error {
        var v interface{}
-       v, cmd.err = cn.Rd.ReadArrayReply(commandInfoSliceParser)
+       v, cmd.err = rd.ReadArrayReply(commandInfoSliceParser)
        if cmd.err != nil {
                return cmd.err
        }
@@ -1197,6 +1840,74 @@ func (cmd *CommandsInfoCmd) readReply(cn *pool.Conn) error {
        return nil
 }
 
+// Implements proto.MultiBulkParse
+func commandInfoSliceParser(rd *proto.Reader, n int64) (interface{}, error) {
+       m := make(map[string]*CommandInfo, n)
+       for i := int64(0); i < n; i++ {
+               v, err := rd.ReadReply(commandInfoParser)
+               if err != nil {
+                       return nil, err
+               }
+               vv := v.(*CommandInfo)
+               m[vv.Name] = vv
+
+       }
+       return m, nil
+}
+
+func commandInfoParser(rd *proto.Reader, n int64) (interface{}, error) {
+       var cmd CommandInfo
+       var err error
+
+       if n != 6 {
+               return nil, fmt.Errorf("redis: got %d elements in COMMAND reply, wanted 6", n)
+       }
+
+       cmd.Name, err = rd.ReadString()
+       if err != nil {
+               return nil, err
+       }
+
+       arity, err := rd.ReadIntReply()
+       if err != nil {
+               return nil, err
+       }
+       cmd.Arity = int8(arity)
+
+       flags, err := rd.ReadReply(stringSliceParser)
+       if err != nil {
+               return nil, err
+       }
+       cmd.Flags = flags.([]string)
+
+       firstKeyPos, err := rd.ReadIntReply()
+       if err != nil {
+               return nil, err
+       }
+       cmd.FirstKeyPos = int8(firstKeyPos)
+
+       lastKeyPos, err := rd.ReadIntReply()
+       if err != nil {
+               return nil, err
+       }
+       cmd.LastKeyPos = int8(lastKeyPos)
+
+       stepCount, err := rd.ReadIntReply()
+       if err != nil {
+               return nil, err
+       }
+       cmd.StepCount = int8(stepCount)
+
+       for _, flag := range cmd.Flags {
+               if flag == "readonly" {
+                       cmd.ReadOnly = true
+                       break
+               }
+       }
+
+       return &cmd, nil
+}
+
 //------------------------------------------------------------------------------
 
 type cmdsInfoCache struct {