1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-28 06:42:00 +03:00

Add streams group related commands

This commit is contained in:
Kassian Sun
2018-08-02 14:48:46 +03:00
committed by Vladimir Mihailenco
parent 7e6413d467
commit 34916092ba
4 changed files with 1018 additions and 619 deletions

View File

@ -172,16 +172,26 @@ type Cmdable interface {
SRem(key string, members ...interface{}) *IntCmd
SUnion(keys ...string) *StringSliceCmd
SUnionStore(destination string, keys ...string) *IntCmd
XAdd(stream, id string, els map[string]interface{}) *StringCmd
XAddExt(opt *XAddExt) *StringCmd
XLen(key string) *IntCmd
XAdd(a *XAddArgs) *StringCmd
XLen(stream string) *IntCmd
XRange(stream, start, stop string) *XMessageSliceCmd
XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd
XRevRange(stream string, start, stop string) *XMessageSliceCmd
XRevRangeN(stream string, start, stop string, count int64) *XMessageSliceCmd
XRead(streams ...string) *XStreamSliceCmd
XReadN(count int64, streams ...string) *XStreamSliceCmd
XReadExt(opt *XReadExt) *XStreamSliceCmd
XRead(a *XReadArgs) *XStreamSliceCmd
XReadStreams(streams ...string) *XStreamSliceCmd
XGroupCreate(stream, group, start string) *StatusCmd
XGroupSetID(stream, group, start string) *StatusCmd
XGroupDestroy(stream, group string) *IntCmd
XGroupDelConsumer(stream, group, consumer string) *IntCmd
XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd
XAck(stream, group string, ids ...string) *IntCmd
XPending(stream, group string) *XPendingCmd
XPendingExt(a *XPendingExtArgs) *XPendingExtCmd
XClaim(a *XClaimArgs) *XMessageSliceCmd
XClaimJustID(a *XClaimArgs) *StringSliceCmd
XTrim(key string, maxLen int64) *IntCmd
XTrimApprox(key string, maxLen int64) *IntCmd
ZAdd(key string, members ...Z) *IntCmd
ZAddNX(key string, members ...Z) *IntCmd
ZAddXX(key string, members ...Z) *IntCmd
@ -1300,7 +1310,7 @@ func (c *cmdable) SUnionStore(destination string, keys ...string) *IntCmd {
//------------------------------------------------------------------------------
type XAddExt struct {
type XAddArgs struct {
Stream string
MaxLen int64 // MAXLEN N
MaxLenApprox int64 // MAXLEN ~ N
@ -1308,40 +1318,32 @@ type XAddExt struct {
Values map[string]interface{}
}
func (c *cmdable) XAddExt(opt *XAddExt) *StringCmd {
a := make([]interface{}, 0, 6+len(opt.Values)*2)
a = append(a, "xadd")
a = append(a, opt.Stream)
if opt.MaxLen > 0 {
a = append(a, "maxlen", opt.MaxLen)
} else if opt.MaxLenApprox > 0 {
a = append(a, "maxlen", "~", opt.MaxLenApprox)
func (c *cmdable) XAdd(a *XAddArgs) *StringCmd {
args := make([]interface{}, 0, 6+len(a.Values)*2)
args = append(args, "xadd")
args = append(args, a.Stream)
if a.MaxLen > 0 {
args = append(args, "maxlen", a.MaxLen)
} else if a.MaxLenApprox > 0 {
args = append(args, "maxlen", "~", a.MaxLenApprox)
}
if opt.ID != "" {
a = append(a, opt.ID)
if a.ID != "" {
args = append(args, a.ID)
} else {
a = append(a, "*")
args = append(args, "*")
}
for k, v := range opt.Values {
a = append(a, k)
a = append(a, v)
for k, v := range a.Values {
args = append(args, k)
args = append(args, v)
}
cmd := NewStringCmd(a...)
cmd := NewStringCmd(args...)
c.process(cmd)
return cmd
}
func (c *cmdable) XAdd(stream, id string, values map[string]interface{}) *StringCmd {
return c.XAddExt(&XAddExt{
Stream: stream,
ID: id,
Values: values,
})
}
func (c *cmdable) XLen(key string) *IntCmd {
cmd := NewIntCmd("xlen", key)
func (c *cmdable) XLen(stream string) *IntCmd {
cmd := NewIntCmd("xlen", stream)
c.process(cmd)
return cmd
}
@ -1370,55 +1372,173 @@ func (c *cmdable) XRevRangeN(stream, start, stop string, count int64) *XMessageS
return cmd
}
type XReadExt struct {
type XReadArgs struct {
Streams []string
Count int64
Block time.Duration
}
func (c *cmdable) XReadExt(opt *XReadExt) *XStreamSliceCmd {
a := make([]interface{}, 0, 5+len(opt.Streams))
a = append(a, "xread")
if opt != nil {
if opt.Count > 0 {
a = append(a, "count")
a = append(a, opt.Count)
}
if opt.Block >= 0 {
a = append(a, "block")
a = append(a, int64(opt.Block/time.Millisecond))
}
func (c *cmdable) XRead(a *XReadArgs) *XStreamSliceCmd {
args := make([]interface{}, 0, 5+len(a.Streams))
args = append(args, "xread")
if a.Count > 0 {
args = append(args, "count")
args = append(args, a.Count)
}
a = append(a, "streams")
for _, s := range opt.Streams {
a = append(a, s)
if a.Block >= 0 {
args = append(args, "block")
args = append(args, int64(a.Block/time.Millisecond))
}
args = append(args, "streams")
for _, s := range a.Streams {
args = append(args, s)
}
cmd := NewXStreamSliceCmd(a...)
cmd := NewXStreamSliceCmd(args...)
c.process(cmd)
return cmd
}
func (c *cmdable) XRead(streams ...string) *XStreamSliceCmd {
return c.XReadExt(&XReadExt{
func (c *cmdable) XReadStreams(streams ...string) *XStreamSliceCmd {
return c.XRead(&XReadArgs{
Streams: streams,
Block: -1,
})
}
func (c *cmdable) XReadN(count int64, streams ...string) *XStreamSliceCmd {
return c.XReadExt(&XReadExt{
Streams: streams,
Count: count,
Block: -1,
})
func (c *cmdable) XGroupCreate(stream, group, start string) *StatusCmd {
cmd := NewStatusCmd("xgroup", "create", stream, group, start)
c.process(cmd)
return cmd
}
func (c *cmdable) XReadBlock(block time.Duration, streams ...string) *XStreamSliceCmd {
return c.XReadExt(&XReadExt{
Streams: streams,
Block: block,
})
func (c *cmdable) XGroupSetID(stream, group, start string) *StatusCmd {
cmd := NewStatusCmd("xgroup", "setid", stream, group, start)
c.process(cmd)
return cmd
}
func (c *cmdable) XGroupDestroy(stream, group string) *IntCmd {
cmd := NewIntCmd("xgroup", "destroy", stream, group)
c.process(cmd)
return cmd
}
func (c *cmdable) XGroupDelConsumer(stream, group, consumer string) *IntCmd {
cmd := NewIntCmd("xgroup", "delconsumer", stream, group, consumer)
c.process(cmd)
return cmd
}
type XReadGroupArgs struct {
Group string
Consumer string
Streams []string
Count int64
Block time.Duration
}
func (c *cmdable) XReadGroup(a *XReadGroupArgs) *XStreamSliceCmd {
args := make([]interface{}, 0, 8+len(a.Streams))
args = append(args, "xreadgroup", "group", a.Group, a.Consumer)
if a.Count > 0 {
args = append(args, "count", a.Count)
}
if a.Block >= 0 {
args = append(args, "block", int64(a.Block/time.Millisecond))
}
args = append(args, "streams")
for _, s := range a.Streams {
args = append(args, s)
}
cmd := NewXStreamSliceCmd(args...)
c.process(cmd)
return cmd
}
func (c *cmdable) XAck(stream, group string, ids ...string) *IntCmd {
args := []interface{}{"xack", stream, group}
for _, id := range ids {
args = append(args, id)
}
cmd := NewIntCmd(args...)
c.process(cmd)
return cmd
}
func (c *cmdable) XPending(stream, group string) *XPendingCmd {
cmd := NewXPendingCmd("xpending", stream, group)
c.process(cmd)
return cmd
}
type XPendingExtArgs struct {
Stream string
Group string
Start string
End string
Count int64
Consumer string
}
func (c *cmdable) XPendingExt(a *XPendingExtArgs) *XPendingExtCmd {
args := make([]interface{}, 0, 7)
args = append(args, "xpending", a.Stream, a.Group, a.Start, a.End, a.Count)
if a.Consumer != "" {
args = append(args, a.Consumer)
}
cmd := NewXPendingExtCmd(args...)
c.process(cmd)
return cmd
}
type XClaimArgs struct {
Stream string
Group string
Consumer string
MinIdle time.Duration
Messages []string
}
func (c *cmdable) XClaim(a *XClaimArgs) *XMessageSliceCmd {
args := xClaimArgs(a)
cmd := NewXMessageSliceCmd(args...)
c.process(cmd)
return cmd
}
func (c *cmdable) XClaimJustID(a *XClaimArgs) *StringSliceCmd {
args := xClaimArgs(a)
args = append(args, "justid")
cmd := NewStringSliceCmd(args...)
c.process(cmd)
return cmd
}
func xClaimArgs(a *XClaimArgs) []interface{} {
args := make([]interface{}, 0, 4+len(a.Messages))
args = append(args,
"xclaim",
a.Stream,
a.Group, a.Consumer,
int64(a.MinIdle/time.Millisecond))
for _, id := range a.Messages {
args = append(args, id)
}
return args
}
func (c *cmdable) XTrim(key string, maxLen int64) *IntCmd {
cmd := NewIntCmd("xtrim", key, "maxlen", maxLen)
c.process(cmd)
return cmd
}
func (c *cmdable) XTrimApprox(key string, maxLen int64) *IntCmd {
cmd := NewIntCmd("xtrim", key, "maxlen", "~", maxLen)
c.process(cmd)
return cmd
}
//------------------------------------------------------------------------------