1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-28 06:42:00 +03:00

Add basic redis streams support

This commit is contained in:
nicktylah
2017-11-24 18:06:13 -08:00
committed by Vladimir Mihailenco
parent 4b568cdf1a
commit 39bdfc3fa8
3 changed files with 497 additions and 0 deletions

View File

@ -171,6 +171,16 @@ type Cmdable interface {
SRem(key string, members ...interface{}) *IntCmd
SUnion(keys ...string) *StringSliceCmd
SUnionStore(destination string, keys ...string) *IntCmd
XAdd(stream, id string, els map[string]interface{}) *StringCmd
XAddExt(opt *XAddExt) *StringCmd
XLen(key string) *IntCmd
XRange(stream, start, stop string) *XMessageSliceCmd
XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd
XRevRange(stream string, start, stop string) *XMessageSliceCmd
XRevRangeN(stream string, start, stop string, count int64) *XMessageSliceCmd
XRead(streams ...string) *XStreamSliceCmd
XReadN(count int64, streams ...string) *XStreamSliceCmd
XReadExt(opt *XReadExt) *XStreamSliceCmd
ZAdd(key string, members ...Z) *IntCmd
ZAddNX(key string, members ...Z) *IntCmd
ZAddXX(key string, members ...Z) *IntCmd
@ -1282,6 +1292,127 @@ func (c *cmdable) SUnionStore(destination string, keys ...string) *IntCmd {
//------------------------------------------------------------------------------
type XAddExt struct {
Stream string
MaxLen int64 // MAXLEN N
MaxLenApprox int64 // MAXLEN ~ N
ID string
Values map[string]interface{}
}
func (c *cmdable) XAddExt(opt *XAddExt) *StringCmd {
a := make([]interface{}, 0, 6+len(opt.Values)*2)
a = append(a, "xadd")
a = append(a, opt.Stream)
if opt.MaxLen > 0 {
a = append(a, "maxlen", opt.MaxLen)
} else if opt.MaxLenApprox > 0 {
a = append(a, "maxlen", "~", opt.MaxLenApprox)
}
if opt.ID != "" {
a = append(a, opt.ID)
} else {
a = append(a, "*")
}
for k, v := range opt.Values {
a = append(a, k)
a = append(a, v)
}
cmd := NewStringCmd(a...)
c.process(cmd)
return cmd
}
func (c *cmdable) XAdd(stream, id string, values map[string]interface{}) *StringCmd {
return c.XAddExt(&XAddExt{
Stream: stream,
ID: id,
Values: values,
})
}
func (c *cmdable) XLen(key string) *IntCmd {
cmd := NewIntCmd("xlen", key)
c.process(cmd)
return cmd
}
func (c *cmdable) XRange(stream, start, stop string) *XMessageSliceCmd {
cmd := NewXMessageSliceCmd("xrange", stream, start, stop)
c.process(cmd)
return cmd
}
func (c *cmdable) XRangeN(stream, start, stop string, count int64) *XMessageSliceCmd {
cmd := NewXMessageSliceCmd("xrange", stream, start, stop, "count", count)
c.process(cmd)
return cmd
}
func (c *cmdable) XRevRange(stream, start, stop string) *XMessageSliceCmd {
cmd := NewXMessageSliceCmd("xrevrange", stream, start, stop)
c.process(cmd)
return cmd
}
func (c *cmdable) XRevRangeN(stream, start, stop string, count int64) *XMessageSliceCmd {
cmd := NewXMessageSliceCmd("xrevrange", stream, start, stop, "count", count)
c.process(cmd)
return cmd
}
type XReadExt struct {
Streams []string
Count int64
Block time.Duration
}
func (c *cmdable) XReadExt(opt *XReadExt) *XStreamSliceCmd {
a := make([]interface{}, 0, 5+len(opt.Streams))
a = append(a, "xread")
if opt != nil {
if opt.Count > 0 {
a = append(a, "count")
a = append(a, opt.Count)
}
if opt.Block > 0 {
a = append(a, "block")
a = append(a, int64(opt.Block/time.Millisecond))
}
}
a = append(a, "streams")
for _, s := range opt.Streams {
a = append(a, s)
}
cmd := NewXStreamSliceCmd(a...)
c.process(cmd)
return cmd
}
func (c *cmdable) XRead(streams ...string) *XStreamSliceCmd {
return c.XReadExt(&XReadExt{
Streams: streams,
})
}
func (c *cmdable) XReadN(count int64, streams ...string) *XStreamSliceCmd {
return c.XReadExt(&XReadExt{
Streams: streams,
Count: count,
})
}
func (c *cmdable) XReadBlock(block time.Duration, streams ...string) *XStreamSliceCmd {
return c.XReadExt(&XReadExt{
Streams: streams,
Block: block,
})
}
//------------------------------------------------------------------------------
// Z represents sorted set member.
type Z struct {
Score float64