barometer: update DMA's vendoring packages
[barometer.git] / src / dma / vendor / github.com / go-redis / redis / commands.go
index dddf8ac..653e4ab 100644 (file)
@@ -8,13 +8,6 @@ import (
        "github.com/go-redis/redis/internal"
 )
 
-func readTimeout(timeout time.Duration) time.Duration {
-       if timeout == 0 {
-               return 0
-       }
-       return timeout + 10*time.Second
-}
-
 func usePrecise(dur time.Duration) bool {
        return dur < time.Second || dur%time.Second != 0
 }
@@ -172,16 +165,30 @@ type Cmdable interface {
        SRem(key string, members ...interface{}) *IntCmd
        SUnion(keys ...string) *StringSliceCmd
        SUnionStore(destination string, keys ...string) *IntCmd
-       XAdd(stream, id string, els map[string]interface{}) *StringCmd
-       XAddExt(opt *XAddExt) *StringCmd
-       XLen(key string) *IntCmd
+       XAdd(a *XAddArgs) *StringCmd
+       XDel(stream string, ids ...string) *IntCmd
+       XLen(stream string) *IntCmd
        XRange(stream, start, stop string) *XMessageSliceCmd
        XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd
        XRevRange(stream string, start, stop string) *XMessageSliceCmd
        XRevRangeN(stream string, start, stop string, count int64) *XMessageSliceCmd
-       XRead(streams ...string) *XStreamSliceCmd
-       XReadN(count int64, streams ...string) *XStreamSliceCmd
-       XReadExt(opt *XReadExt) *XStreamSliceCmd
+       XRead(a *XReadArgs) *XStreamSliceCmd
+       XReadStreams(streams ...string) *XStreamSliceCmd
+       XGroupCreate(stream, group, start string) *StatusCmd
+       XGroupCreateMkStream(stream, group, start string) *StatusCmd
+       XGroupSetID(stream, group, start string) *StatusCmd
+       XGroupDestroy(stream, group string) *IntCmd
+       XGroupDelConsumer(stream, group, consumer string) *IntCmd
+       XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd
+       XAck(stream, group string, ids ...string) *IntCmd
+       XPending(stream, group string) *XPendingCmd
+       XPendingExt(a *XPendingExtArgs) *XPendingExtCmd
+       XClaim(a *XClaimArgs) *XMessageSliceCmd
+       XClaimJustID(a *XClaimArgs) *StringSliceCmd
+       XTrim(key string, maxLen int64) *IntCmd
+       XTrimApprox(key string, maxLen int64) *IntCmd
+       BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd
+       BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd
        ZAdd(key string, members ...Z) *IntCmd
        ZAddNX(key string, members ...Z) *IntCmd
        ZAddXX(key string, members ...Z) *IntCmd
@@ -196,6 +203,8 @@ type Cmdable interface {
        ZLexCount(key, min, max string) *IntCmd
        ZIncrBy(key string, increment float64, member string) *FloatCmd
        ZInterStore(destination string, store ZStore, keys ...string) *IntCmd
+       ZPopMax(key string, count ...int64) *ZSliceCmd
+       ZPopMin(key string, count ...int64) *ZSliceCmd
        ZRange(key string, start, stop int64) *StringSliceCmd
        ZRangeWithScores(key string, start, stop int64) *ZSliceCmd
        ZRangeByScore(key string, opt ZRangeBy) *StringSliceCmd
@@ -223,6 +232,7 @@ type Cmdable interface {
        ClientKillByFilter(keys ...string) *IntCmd
        ClientList() *StringCmd
        ClientPause(dur time.Duration) *BoolCmd
+       ClientID() *IntCmd
        ConfigGet(parameter string) *SliceCmd
        ConfigResetStat() *StatusCmd
        ConfigSet(parameter, value string) *StatusCmd
@@ -260,6 +270,7 @@ type Cmdable interface {
        ClusterResetHard() *StatusCmd
        ClusterInfo() *StringCmd
        ClusterKeySlot(key string) *IntCmd
+       ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd
        ClusterCountFailureReports(nodeID string) *IntCmd
        ClusterCountKeysInSlot(slot int) *IntCmd
        ClusterDelSlots(slots ...int) *StatusCmd
@@ -1300,7 +1311,7 @@ func (c *cmdable) SUnionStore(destination string, keys ...string) *IntCmd {
 
 //------------------------------------------------------------------------------
 
-type XAddExt struct {
+type XAddArgs struct {
        Stream       string
        MaxLen       int64 // MAXLEN N
        MaxLenApprox int64 // MAXLEN ~ N
@@ -1308,40 +1319,42 @@ type XAddExt struct {
        Values       map[string]interface{}
 }
 
-func (c *cmdable) XAddExt(opt *XAddExt) *StringCmd {
-       a := make([]interface{}, 0, 6+len(opt.Values)*2)
-       a = append(a, "xadd")
-       a = append(a, opt.Stream)
-       if opt.MaxLen > 0 {
-               a = append(a, "maxlen", opt.MaxLen)
-       } else if opt.MaxLenApprox > 0 {
-               a = append(a, "maxlen", "~", opt.MaxLenApprox)
+func (c *cmdable) XAdd(a *XAddArgs) *StringCmd {
+       args := make([]interface{}, 0, 6+len(a.Values)*2)
+       args = append(args, "xadd")
+       args = append(args, a.Stream)
+       if a.MaxLen > 0 {
+               args = append(args, "maxlen", a.MaxLen)
+       } else if a.MaxLenApprox > 0 {
+               args = append(args, "maxlen", "~", a.MaxLenApprox)
        }
-       if opt.ID != "" {
-               a = append(a, opt.ID)
+       if a.ID != "" {
+               args = append(args, a.ID)
        } else {
-               a = append(a, "*")
+               args = append(args, "*")
        }
-       for k, v := range opt.Values {
-               a = append(a, k)
-               a = append(a, v)
+       for k, v := range a.Values {
+               args = append(args, k)
+               args = append(args, v)
        }
 
-       cmd := NewStringCmd(a...)
+       cmd := NewStringCmd(args...)
        c.process(cmd)
        return cmd
 }
 
-func (c *cmdable) XAdd(stream, id string, values map[string]interface{}) *StringCmd {
-       return c.XAddExt(&XAddExt{
-               Stream: stream,
-               ID:     id,
-               Values: values,
-       })
+func (c *cmdable) XDel(stream string, ids ...string) *IntCmd {
+       args := []interface{}{"xdel", stream}
+       for _, id := range ids {
+               args = append(args, id)
+       }
+       cmd := NewIntCmd(args...)
+       c.process(cmd)
+       return cmd
 }
 
-func (c *cmdable) XLen(key string) *IntCmd {
-       cmd := NewIntCmd("xlen", key)
+func (c *cmdable) XLen(stream string) *IntCmd {
+       cmd := NewIntCmd("xlen", stream)
        c.process(cmd)
        return cmd
 }
@@ -1370,55 +1383,190 @@ func (c *cmdable) XRevRangeN(stream, start, stop string, count int64) *XMessageS
        return cmd
 }
 
-type XReadExt struct {
+type XReadArgs struct {
        Streams []string
        Count   int64
        Block   time.Duration
 }
 
-func (c *cmdable) XReadExt(opt *XReadExt) *XStreamSliceCmd {
-       a := make([]interface{}, 0, 5+len(opt.Streams))
-       a = append(a, "xread")
-       if opt != nil {
-               if opt.Count > 0 {
-                       a = append(a, "count")
-                       a = append(a, opt.Count)
-               }
-               if opt.Block >= 0 {
-                       a = append(a, "block")
-                       a = append(a, int64(opt.Block/time.Millisecond))
-               }
+func (c *cmdable) XRead(a *XReadArgs) *XStreamSliceCmd {
+       args := make([]interface{}, 0, 5+len(a.Streams))
+       args = append(args, "xread")
+       if a.Count > 0 {
+               args = append(args, "count")
+               args = append(args, a.Count)
+       }
+       if a.Block >= 0 {
+               args = append(args, "block")
+               args = append(args, int64(a.Block/time.Millisecond))
        }
-       a = append(a, "streams")
-       for _, s := range opt.Streams {
-               a = append(a, s)
+       args = append(args, "streams")
+       for _, s := range a.Streams {
+               args = append(args, s)
        }
 
-       cmd := NewXStreamSliceCmd(a...)
+       cmd := NewXStreamSliceCmd(args...)
+       if a.Block >= 0 {
+               cmd.setReadTimeout(a.Block)
+       }
        c.process(cmd)
        return cmd
 }
 
-func (c *cmdable) XRead(streams ...string) *XStreamSliceCmd {
-       return c.XReadExt(&XReadExt{
+func (c *cmdable) XReadStreams(streams ...string) *XStreamSliceCmd {
+       return c.XRead(&XReadArgs{
                Streams: streams,
                Block:   -1,
        })
 }
 
-func (c *cmdable) XReadN(count int64, streams ...string) *XStreamSliceCmd {
-       return c.XReadExt(&XReadExt{
-               Streams: streams,
-               Count:   count,
-               Block:   -1,
-       })
+func (c *cmdable) XGroupCreate(stream, group, start string) *StatusCmd {
+       cmd := NewStatusCmd("xgroup", "create", stream, group, start)
+       c.process(cmd)
+       return cmd
 }
 
-func (c *cmdable) XReadBlock(block time.Duration, streams ...string) *XStreamSliceCmd {
-       return c.XReadExt(&XReadExt{
-               Streams: streams,
-               Block:   block,
-       })
+func (c *cmdable) XGroupCreateMkStream(stream, group, start string) *StatusCmd {
+       cmd := NewStatusCmd("xgroup", "create", stream, group, start, "mkstream")
+       c.process(cmd)
+       return cmd
+}
+
+func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd {
+       cmd := NewStatusCmd("xgroup", "setid", stream, group, start)
+       c.process(cmd)
+       return cmd
+}
+
+func (c *cmdable) XGroupDestroy(stream, group string) *IntCmd {
+       cmd := NewIntCmd("xgroup", "destroy", stream, group)
+       c.process(cmd)
+       return cmd
+}
+
+func (c *cmdable) XGroupDelConsumer(stream, group, consumer string) *IntCmd {
+       cmd := NewIntCmd("xgroup", "delconsumer", stream, group, consumer)
+       c.process(cmd)
+       return cmd
+}
+
+type XReadGroupArgs struct {
+       Group    string
+       Consumer string
+       // List of streams and ids.
+       Streams []string
+       Count   int64
+       Block   time.Duration
+       NoAck   bool
+}
+
+func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd {
+       args := make([]interface{}, 0, 8+len(a.Streams))
+       args = append(args, "xreadgroup", "group", a.Group, a.Consumer)
+       if a.Count > 0 {
+               args = append(args, "count", a.Count)
+       }
+       if a.Block >= 0 {
+               args = append(args, "block", int64(a.Block/time.Millisecond))
+       }
+       if a.NoAck {
+               args = append(args, "noack")
+       }
+       args = append(args, "streams")
+       for _, s := range a.Streams {
+               args = append(args, s)
+       }
+
+       cmd := NewXStreamSliceCmd(args...)
+       if a.Block >= 0 {
+               cmd.setReadTimeout(a.Block)
+       }
+       c.process(cmd)
+       return cmd
+}
+
+func (c *cmdable) XAck(stream, group string, ids ...string) *IntCmd {
+       args := []interface{}{"xack", stream, group}
+       for _, id := range ids {
+               args = append(args, id)
+       }
+       cmd := NewIntCmd(args...)
+       c.process(cmd)
+       return cmd
+}
+
+func (c *cmdable) XPending(stream, group string) *XPendingCmd {
+       cmd := NewXPendingCmd("xpending", stream, group)
+       c.process(cmd)
+       return cmd
+}
+
+type XPendingExtArgs struct {
+       Stream   string
+       Group    string
+       Start    string
+       End      string
+       Count    int64
+       Consumer string
+}
+
+func (c *cmdable) XPendingExt(a *XPendingExtArgs) *XPendingExtCmd {
+       args := make([]interface{}, 0, 7)
+       args = append(args, "xpending", a.Stream, a.Group, a.Start, a.End, a.Count)
+       if a.Consumer != "" {
+               args = append(args, a.Consumer)
+       }
+       cmd := NewXPendingExtCmd(args...)
+       c.process(cmd)
+       return cmd
+}
+
+type XClaimArgs struct {
+       Stream   string
+       Group    string
+       Consumer string
+       MinIdle  time.Duration
+       Messages []string
+}
+
+func (c *cmdable) XClaim(a *XClaimArgs) *XMessageSliceCmd {
+       args := xClaimArgs(a)
+       cmd := NewXMessageSliceCmd(args...)
+       c.process(cmd)
+       return cmd
+}
+
+func (c *cmdable) XClaimJustID(a *XClaimArgs) *StringSliceCmd {
+       args := xClaimArgs(a)
+       args = append(args, "justid")
+       cmd := NewStringSliceCmd(args...)
+       c.process(cmd)
+       return cmd
+}
+
+func xClaimArgs(a *XClaimArgs) []interface{} {
+       args := make([]interface{}, 0, 4+len(a.Messages))
+       args = append(args,
+               "xclaim",
+               a.Stream,
+               a.Group, a.Consumer,
+               int64(a.MinIdle/time.Millisecond))
+       for _, id := range a.Messages {
+               args = append(args, id)
+       }
+       return args
+}
+
+func (c *cmdable) XTrim(key string, maxLen int64) *IntCmd {
+       cmd := NewIntCmd("xtrim", key, "maxlen", maxLen)
+       c.process(cmd)
+       return cmd
+}
+
+func (c *cmdable) XTrimApprox(key string, maxLen int64) *IntCmd {
+       cmd := NewIntCmd("xtrim", key, "maxlen", "~", maxLen)
+       c.process(cmd)
+       return cmd
 }
 
 //------------------------------------------------------------------------------
@@ -1429,6 +1577,12 @@ type Z struct {
        Member interface{}
 }
 
+// ZWithKey represents sorted set member including the name of the key where it was popped.
+type ZWithKey struct {
+       Z
+       Key string
+}
+
 // ZStore is used as an arg to ZInterStore and ZUnionStore.
 type ZStore struct {
        Weights []float64
@@ -1436,6 +1590,34 @@ type ZStore struct {
        Aggregate string
 }
 
+// Redis `BZPOPMAX key [key ...] timeout` command.
+func (c *cmdable) BZPopMax(timeout time.Duration, keys ...string) *ZWithKeyCmd {
+       args := make([]interface{}, 1+len(keys)+1)
+       args[0] = "bzpopmax"
+       for i, key := range keys {
+               args[1+i] = key
+       }
+       args[len(args)-1] = formatSec(timeout)
+       cmd := NewZWithKeyCmd(args...)
+       cmd.setReadTimeout(timeout)
+       c.process(cmd)
+       return cmd
+}
+
+// Redis `BZPOPMIN key [key ...] timeout` command.
+func (c *cmdable) BZPopMin(timeout time.Duration, keys ...string) *ZWithKeyCmd {
+       args := make([]interface{}, 1+len(keys)+1)
+       args[0] = "bzpopmin"
+       for i, key := range keys {
+               args[1+i] = key
+       }
+       args[len(args)-1] = formatSec(timeout)
+       cmd := NewZWithKeyCmd(args...)
+       cmd.setReadTimeout(timeout)
+       c.process(cmd)
+       return cmd
+}
+
 func (c *cmdable) zAdd(a []interface{}, n int, members ...Z) *IntCmd {
        for i, m := range members {
                a[n+2*i] = m.Score
@@ -1574,6 +1756,46 @@ func (c *cmdable) ZInterStore(destination string, store ZStore, keys ...string)
        return cmd
 }
 
+func (c *cmdable) ZPopMax(key string, count ...int64) *ZSliceCmd {
+       args := []interface{}{
+               "zpopmax",
+               key,
+       }
+
+       switch len(count) {
+       case 0:
+               break
+       case 1:
+               args = append(args, count[0])
+       default:
+               panic("too many arguments")
+       }
+
+       cmd := NewZSliceCmd(args...)
+       c.process(cmd)
+       return cmd
+}
+
+func (c *cmdable) ZPopMin(key string, count ...int64) *ZSliceCmd {
+       args := []interface{}{
+               "zpopmin",
+               key,
+       }
+
+       switch len(count) {
+       case 0:
+               break
+       case 1:
+               args = append(args, count[0])
+       default:
+               panic("too many arguments")
+       }
+
+       cmd := NewZSliceCmd(args...)
+       c.process(cmd)
+       return cmd
+}
+
 func (c *cmdable) zRange(key string, start, stop int64, withScores bool) *StringSliceCmd {
        args := []interface{}{
                "zrange",
@@ -1849,6 +2071,24 @@ func (c *cmdable) ClientPause(dur time.Duration) *BoolCmd {
        return cmd
 }
 
+func (c *cmdable) ClientID() *IntCmd {
+       cmd := NewIntCmd("client", "id")
+       c.process(cmd)
+       return cmd
+}
+
+func (c *cmdable) ClientUnblock(id int64) *IntCmd {
+       cmd := NewIntCmd("client", "unblock", id)
+       c.process(cmd)
+       return cmd
+}
+
+func (c *cmdable) ClientUnblockWithError(id int64) *IntCmd {
+       cmd := NewIntCmd("client", "unblock", id, "error")
+       c.process(cmd)
+       return cmd
+}
+
 // ClientSetName assigns a name to the connection.
 func (c *statefulCmdable) ClientSetName(name string) *BoolCmd {
        cmd := NewBoolCmd("client", "setname", name)
@@ -2164,6 +2404,12 @@ func (c *cmdable) ClusterKeySlot(key string) *IntCmd {
        return cmd
 }
 
+func (c *cmdable) ClusterGetKeysInSlot(slot int, count int) *StringSliceCmd {
+       cmd := NewStringSliceCmd("cluster", "getkeysinslot", slot, count)
+       c.process(cmd)
+       return cmd
+}
+
 func (c *cmdable) ClusterCountFailureReports(nodeID string) *IntCmd {
        cmd := NewIntCmd("cluster", "count-failure-reports", nodeID)
        c.process(cmd)