mirror of
https://github.com/redis/go-redis.git
synced 2025-09-05 20:24:00 +03:00
Added new stream commands (#3450)
* added new stream commands * updated docker image * fixed command return type * fixed tests * addressed PR comments --------- Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com>
This commit is contained in:
committed by
ofekshenawa
parent
2067991a47
commit
e6ea0056fe
@@ -6169,6 +6169,34 @@ var _ = Describe("Commands", func() {
|
|||||||
Expect(n).To(Equal(int64(3)))
|
Expect(n).To(Equal(int64(3)))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should XTrimMaxLenMode", func() {
|
||||||
|
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
|
||||||
|
n, err := client.XTrimMaxLenMode(ctx, "stream", 0, "KEEPREF").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(BeNumerically(">=", 0))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XTrimMaxLenApproxMode", func() {
|
||||||
|
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
|
||||||
|
n, err := client.XTrimMaxLenApproxMode(ctx, "stream", 0, 0, "KEEPREF").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(BeNumerically(">=", 0))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XTrimMinIDMode", func() {
|
||||||
|
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
|
||||||
|
n, err := client.XTrimMinIDMode(ctx, "stream", "4-0", "KEEPREF").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(BeNumerically(">=", 0))
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XTrimMinIDApproxMode", func() {
|
||||||
|
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
|
||||||
|
n, err := client.XTrimMinIDApproxMode(ctx, "stream", "4-0", 0, "KEEPREF").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(BeNumerically(">=", 0))
|
||||||
|
})
|
||||||
|
|
||||||
It("should XAdd", func() {
|
It("should XAdd", func() {
|
||||||
id, err := client.XAdd(ctx, &redis.XAddArgs{
|
id, err := client.XAdd(ctx, &redis.XAddArgs{
|
||||||
Stream: "stream",
|
Stream: "stream",
|
||||||
@@ -6222,6 +6250,37 @@ var _ = Describe("Commands", func() {
|
|||||||
Expect(n).To(Equal(int64(3)))
|
Expect(n).To(Equal(int64(3)))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should XAckDel", func() {
|
||||||
|
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
|
||||||
|
// First, create a consumer group
|
||||||
|
err := client.XGroupCreate(ctx, "stream", "testgroup", "0").Err()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
// Read messages to create pending entries
|
||||||
|
_, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{
|
||||||
|
Group: "testgroup",
|
||||||
|
Consumer: "testconsumer",
|
||||||
|
Streams: []string{"stream", ">"},
|
||||||
|
}).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
// Test XAckDel with KEEPREF mode
|
||||||
|
n, err := client.XAckDel(ctx, "stream", "testgroup", "KEEPREF", "1-0", "2-0").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(HaveLen(2))
|
||||||
|
|
||||||
|
// Clean up
|
||||||
|
client.XGroupDestroy(ctx, "stream", "testgroup")
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should XDelEx", func() {
|
||||||
|
SkipBeforeRedisVersion(8.2, "doesn't work with older redis stack images")
|
||||||
|
// Test XDelEx with KEEPREF mode
|
||||||
|
n, err := client.XDelEx(ctx, "stream", "KEEPREF", "1-0", "2-0").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(n).To(HaveLen(2))
|
||||||
|
})
|
||||||
|
|
||||||
It("should XLen", func() {
|
It("should XLen", func() {
|
||||||
n, err := client.XLen(ctx, "stream").Result()
|
n, err := client.XLen(ctx, "stream").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
@@ -7,7 +7,9 @@ import (
|
|||||||
|
|
||||||
type StreamCmdable interface {
|
type StreamCmdable interface {
|
||||||
XAdd(ctx context.Context, a *XAddArgs) *StringCmd
|
XAdd(ctx context.Context, a *XAddArgs) *StringCmd
|
||||||
|
XAckDel(ctx context.Context, stream string, group string, mode string, ids ...string) *SliceCmd
|
||||||
XDel(ctx context.Context, stream string, ids ...string) *IntCmd
|
XDel(ctx context.Context, stream string, ids ...string) *IntCmd
|
||||||
|
XDelEx(ctx context.Context, stream string, mode string, ids ...string) *SliceCmd
|
||||||
XLen(ctx context.Context, stream string) *IntCmd
|
XLen(ctx context.Context, stream string) *IntCmd
|
||||||
XRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd
|
XRange(ctx context.Context, stream, start, stop string) *XMessageSliceCmd
|
||||||
XRangeN(ctx context.Context, stream, start, stop string, count int64) *XMessageSliceCmd
|
XRangeN(ctx context.Context, stream, start, stop string, count int64) *XMessageSliceCmd
|
||||||
@@ -31,8 +33,12 @@ type StreamCmdable interface {
|
|||||||
XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd
|
XAutoClaimJustID(ctx context.Context, a *XAutoClaimArgs) *XAutoClaimJustIDCmd
|
||||||
XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd
|
XTrimMaxLen(ctx context.Context, key string, maxLen int64) *IntCmd
|
||||||
XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd
|
XTrimMaxLenApprox(ctx context.Context, key string, maxLen, limit int64) *IntCmd
|
||||||
|
XTrimMaxLenMode(ctx context.Context, key string, maxLen int64, mode string) *IntCmd
|
||||||
|
XTrimMaxLenApproxMode(ctx context.Context, key string, maxLen, limit int64, mode string) *IntCmd
|
||||||
XTrimMinID(ctx context.Context, key string, minID string) *IntCmd
|
XTrimMinID(ctx context.Context, key string, minID string) *IntCmd
|
||||||
XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd
|
XTrimMinIDApprox(ctx context.Context, key string, minID string, limit int64) *IntCmd
|
||||||
|
XTrimMinIDMode(ctx context.Context, key string, minID string, mode string) *IntCmd
|
||||||
|
XTrimMinIDApproxMode(ctx context.Context, key string, minID string, limit int64, mode string) *IntCmd
|
||||||
XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd
|
XInfoGroups(ctx context.Context, key string) *XInfoGroupsCmd
|
||||||
XInfoStream(ctx context.Context, key string) *XInfoStreamCmd
|
XInfoStream(ctx context.Context, key string) *XInfoStreamCmd
|
||||||
XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd
|
XInfoStreamFull(ctx context.Context, key string, count int) *XInfoStreamFullCmd
|
||||||
@@ -54,6 +60,7 @@ type XAddArgs struct {
|
|||||||
// Approx causes MaxLen and MinID to use "~" matcher (instead of "=").
|
// Approx causes MaxLen and MinID to use "~" matcher (instead of "=").
|
||||||
Approx bool
|
Approx bool
|
||||||
Limit int64
|
Limit int64
|
||||||
|
Mode string
|
||||||
ID string
|
ID string
|
||||||
Values interface{}
|
Values interface{}
|
||||||
}
|
}
|
||||||
@@ -81,6 +88,11 @@ func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
|
|||||||
if a.Limit > 0 {
|
if a.Limit > 0 {
|
||||||
args = append(args, "limit", a.Limit)
|
args = append(args, "limit", a.Limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if a.Mode != "" {
|
||||||
|
args = append(args, a.Mode)
|
||||||
|
}
|
||||||
|
|
||||||
if a.ID != "" {
|
if a.ID != "" {
|
||||||
args = append(args, a.ID)
|
args = append(args, a.ID)
|
||||||
} else {
|
} else {
|
||||||
@@ -93,6 +105,16 @@ func (c cmdable) XAdd(ctx context.Context, a *XAddArgs) *StringCmd {
|
|||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c cmdable) XAckDel(ctx context.Context, stream string, group string, mode string, ids ...string) *SliceCmd {
|
||||||
|
args := []interface{}{"xackdel", stream, group, mode, "ids", len(ids)}
|
||||||
|
for _, id := range ids {
|
||||||
|
args = append(args, id)
|
||||||
|
}
|
||||||
|
cmd := NewSliceCmd(ctx, args...)
|
||||||
|
_ = c(ctx, cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
func (c cmdable) XDel(ctx context.Context, stream string, ids ...string) *IntCmd {
|
func (c cmdable) XDel(ctx context.Context, stream string, ids ...string) *IntCmd {
|
||||||
args := []interface{}{"xdel", stream}
|
args := []interface{}{"xdel", stream}
|
||||||
for _, id := range ids {
|
for _, id := range ids {
|
||||||
@@ -103,6 +125,16 @@ func (c cmdable) XDel(ctx context.Context, stream string, ids ...string) *IntCmd
|
|||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c cmdable) XDelEx(ctx context.Context, stream string, mode string, ids ...string) *SliceCmd {
|
||||||
|
args := []interface{}{"xdelex", stream, mode, "ids", len(ids)}
|
||||||
|
for _, id := range ids {
|
||||||
|
args = append(args, id)
|
||||||
|
}
|
||||||
|
cmd := NewSliceCmd(ctx, args...)
|
||||||
|
_ = c(ctx, cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
func (c cmdable) XLen(ctx context.Context, stream string) *IntCmd {
|
func (c cmdable) XLen(ctx context.Context, stream string) *IntCmd {
|
||||||
cmd := NewIntCmd(ctx, "xlen", stream)
|
cmd := NewIntCmd(ctx, "xlen", stream)
|
||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
@@ -375,6 +407,8 @@ func xClaimArgs(a *XClaimArgs) []interface{} {
|
|||||||
return args
|
return args
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: refactor xTrim, xTrimMode and the wrappers over the functions
|
||||||
|
|
||||||
// xTrim If approx is true, add the "~" parameter, otherwise it is the default "=" (redis default).
|
// xTrim If approx is true, add the "~" parameter, otherwise it is the default "=" (redis default).
|
||||||
// example:
|
// example:
|
||||||
//
|
//
|
||||||
@@ -418,6 +452,42 @@ func (c cmdable) XTrimMinIDApprox(ctx context.Context, key string, minID string,
|
|||||||
return c.xTrim(ctx, key, "minid", true, minID, limit)
|
return c.xTrim(ctx, key, "minid", true, minID, limit)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c cmdable) xTrimMode(
|
||||||
|
ctx context.Context, key, strategy string,
|
||||||
|
approx bool, threshold interface{}, limit int64,
|
||||||
|
mode string,
|
||||||
|
) *IntCmd {
|
||||||
|
args := make([]interface{}, 0, 7)
|
||||||
|
args = append(args, "xtrim", key, strategy)
|
||||||
|
if approx {
|
||||||
|
args = append(args, "~")
|
||||||
|
}
|
||||||
|
args = append(args, threshold)
|
||||||
|
if limit > 0 {
|
||||||
|
args = append(args, "limit", limit)
|
||||||
|
}
|
||||||
|
args = append(args, mode)
|
||||||
|
cmd := NewIntCmd(ctx, args...)
|
||||||
|
_ = c(ctx, cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c cmdable) XTrimMaxLenMode(ctx context.Context, key string, maxLen int64, mode string) *IntCmd {
|
||||||
|
return c.xTrimMode(ctx, key, "maxlen", false, maxLen, 0, mode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c cmdable) XTrimMaxLenApproxMode(ctx context.Context, key string, maxLen, limit int64, mode string) *IntCmd {
|
||||||
|
return c.xTrimMode(ctx, key, "maxlen", true, maxLen, limit, mode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c cmdable) XTrimMinIDMode(ctx context.Context, key string, minID string, mode string) *IntCmd {
|
||||||
|
return c.xTrimMode(ctx, key, "minid", false, minID, 0, mode)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c cmdable) XTrimMinIDApproxMode(ctx context.Context, key string, minID string, limit int64, mode string) *IntCmd {
|
||||||
|
return c.xTrimMode(ctx, key, "minid", true, minID, limit, mode)
|
||||||
|
}
|
||||||
|
|
||||||
func (c cmdable) XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd {
|
func (c cmdable) XInfoConsumers(ctx context.Context, key string, group string) *XInfoConsumersCmd {
|
||||||
cmd := NewXInfoConsumersCmd(ctx, key, group)
|
cmd := NewXInfoConsumersCmd(ctx, key, group)
|
||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
|
Reference in New Issue
Block a user