From 71bb3cae4fcc63b08881c507b6ac86594f0041b6 Mon Sep 17 00:00:00 2001 From: ofekshenawa <104765379+ofekshenawa@users.noreply.github.com> Date: Mon, 3 Nov 2025 18:58:14 +0200 Subject: [PATCH] Add support for XReadGroup CLAIM argument (#3578) * Add support for XReadGroup CLAIM argument * modify tutorial tests --------- Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com> --- command.go | 32 ++++- commands_test.go | 236 +++++++++++++++++++++++++++++++ doctests/stream_tutorial_test.go | 32 ++--- stream_commands.go | 5 + 4 files changed, 286 insertions(+), 19 deletions(-) diff --git a/command.go b/command.go index d3fb231b..c3ed6e1d 100644 --- a/command.go +++ b/command.go @@ -1585,6 +1585,12 @@ func (cmd *StringStructMapCmd) readReply(rd *proto.Reader) error { type XMessage struct { ID string Values map[string]interface{} + // MillisElapsedFromDelivery is the number of milliseconds since the entry was last delivered. + // Only populated when using XREADGROUP with CLAIM argument for claimed entries. + MillisElapsedFromDelivery int64 + // DeliveredCount is the number of times the entry was delivered. + // Only populated when using XREADGROUP with CLAIM argument for claimed entries. + DeliveredCount int64 } type XMessageSliceCmd struct { @@ -1641,10 +1647,16 @@ func readXMessageSlice(rd *proto.Reader) ([]XMessage, error) { } func readXMessage(rd *proto.Reader) (XMessage, error) { - if err := rd.ReadFixedArrayLen(2); err != nil { + // Read array length can be 2 or 4 (with CLAIM metadata) + n, err := rd.ReadArrayLen() + if err != nil { return XMessage{}, err } + if n != 2 && n != 4 { + return XMessage{}, fmt.Errorf("redis: got %d elements in the XMessage array, expected 2 or 4", n) + } + id, err := rd.ReadString() if err != nil { return XMessage{}, err @@ -1657,10 +1669,24 @@ func readXMessage(rd *proto.Reader) (XMessage, error) { } } - return XMessage{ + msg := XMessage{ ID: id, Values: v, - }, nil + } + + if n == 4 { + msg.MillisElapsedFromDelivery, err = rd.ReadInt() + if err != nil { + return XMessage{}, err + } + + msg.DeliveredCount, err = rd.ReadInt() + if err != nil { + return XMessage{}, err + } + } + + return msg, nil } func stringInterfaceMapParser(rd *proto.Reader) (map[string]interface{}, error) { diff --git a/commands_test.go b/commands_test.go index 17b4dd03..5a5664cc 100644 --- a/commands_test.go +++ b/commands_test.go @@ -6749,6 +6749,242 @@ var _ = Describe("Commands", func() { Expect(err).NotTo(HaveOccurred()) Expect(n).To(Equal(int64(2))) }) + + It("should XReadGroup with CLAIM argument", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + time.Sleep(100 * time.Millisecond) + + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer2", + Streams: []string{"stream", ">"}, + Claim: 50 * time.Millisecond, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(HaveLen(1)) + Expect(res[0].Stream).To(Equal("stream")) + + messages := res[0].Messages + Expect(len(messages)).To(BeNumerically(">=", 1)) + + for _, msg := range messages { + if msg.MillisElapsedFromDelivery > 0 { + Expect(msg.MillisElapsedFromDelivery).To(BeNumerically(">=", 50)) + Expect(msg.DeliveredCount).To(BeNumerically(">=", 1)) + } + } + }) + + It("should XReadGroup with CLAIM and COUNT", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + time.Sleep(100 * time.Millisecond) + + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer3", + Streams: []string{"stream", ">"}, + Claim: 50 * time.Millisecond, + Count: 2, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + if len(res) > 0 && len(res[0].Messages) > 0 { + Expect(len(res[0].Messages)).To(BeNumerically("<=", 2)) + } + }) + + It("should XReadGroup with CLAIM and NOACK", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + time.Sleep(100 * time.Millisecond) + + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer4", + Streams: []string{"stream", ">"}, + Claim: 50 * time.Millisecond, + NoAck: true, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + if len(res) > 0 { + Expect(res[0].Stream).To(Equal("stream")) + } + }) + + It("should XReadGroup CLAIM empties PEL after acknowledgment", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + time.Sleep(100 * time.Millisecond) + + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer5", + Streams: []string{"stream", ">"}, + Claim: 50 * time.Millisecond, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + if len(res) > 0 && len(res[0].Messages) > 0 { + ids := make([]string, len(res[0].Messages)) + for i, msg := range res[0].Messages { + ids[i] = msg.ID + } + + n, err := client.XAck(ctx, "stream", "group", ids...).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(n).To(BeNumerically(">=", 1)) + + pending, err := client.XPending(ctx, "stream", "group").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(pending.Count).To(BeNumerically("<", 3)) + } + }) + + It("should XReadGroup backward compatibility without CLAIM", func() { + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group", + Consumer: "consumer_compat", + Streams: []string{"stream", "0"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).To(HaveLen(1)) + Expect(res[0].Stream).To(Equal("stream")) + + for _, msg := range res[0].Messages { + Expect(msg.MillisElapsedFromDelivery).To(Equal(int64(0))) + Expect(msg.DeliveredCount).To(Equal(int64(0))) + } + }) + + It("should XReadGroup CLAIM with multiple streams", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + id, err := client.XAdd(ctx, &redis.XAddArgs{ + Stream: "stream2", + ID: "1-0", + Values: map[string]interface{}{"field1": "value1"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(Equal("1-0")) + + id, err = client.XAdd(ctx, &redis.XAddArgs{ + Stream: "stream2", + ID: "2-0", + Values: map[string]interface{}{"field2": "value2"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(id).To(Equal("2-0")) + + err = client.XGroupCreate(ctx, "stream2", "group2", "0").Err() + Expect(err).NotTo(HaveOccurred()) + + _, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group2", + Consumer: "consumer1", + Streams: []string{"stream2", ">"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + time.Sleep(100 * time.Millisecond) + + res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: "group2", + Consumer: "consumer2", + Streams: []string{"stream2", ">"}, + Claim: 50 * time.Millisecond, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + if len(res) > 0 { + Expect(res[0].Stream).To(Equal("stream2")) + if len(res[0].Messages) > 0 { + for _, msg := range res[0].Messages { + if msg.MillisElapsedFromDelivery > 0 { + Expect(msg.DeliveredCount).To(BeNumerically(">=", 1)) + } + } + } + } + }) + + It("should XReadGroup CLAIM work consistently on RESP2 and RESP3", func() { + SkipBeforeRedisVersion(8.3, "XREADGROUP CLAIM requires Redis 8.3+") + + streamName := "stream-resp-test" + err := client.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + Values: map[string]interface{}{"field1": "value1"}, + }).Err() + Expect(err).NotTo(HaveOccurred()) + + err = client.XAdd(ctx, &redis.XAddArgs{ + Stream: streamName, + Values: map[string]interface{}{"field2": "value2"}, + }).Err() + Expect(err).NotTo(HaveOccurred()) + + groupName := "resp-test-group" + err = client.XGroupCreate(ctx, streamName, groupName, "0").Err() + Expect(err).NotTo(HaveOccurred()) + + _, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "consumer1", + Streams: []string{streamName, ">"}, + }).Result() + Expect(err).NotTo(HaveOccurred()) + + time.Sleep(100 * time.Millisecond) + + // Test with RESP2 (protocol 2) + resp2Client := redis.NewClient(&redis.Options{ + Addr: redisAddr, + Protocol: 2, + }) + defer resp2Client.Close() + + resp2Result, err := resp2Client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "consumer2", + Streams: []string{streamName, "0"}, + Claim: 50 * time.Millisecond, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resp2Result).To(HaveLen(1)) + + // Test with RESP3 (protocol 3) + resp3Client := redis.NewClient(&redis.Options{ + Addr: redisAddr, + Protocol: 3, + }) + defer resp3Client.Close() + + resp3Result, err := resp3Client.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: groupName, + Consumer: "consumer3", + Streams: []string{streamName, "0"}, + Claim: 50 * time.Millisecond, + }).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(resp3Result).To(HaveLen(1)) + + Expect(len(resp2Result[0].Messages)).To(Equal(len(resp3Result[0].Messages))) + + for i := range resp2Result[0].Messages { + msg2 := resp2Result[0].Messages[i] + msg3 := resp3Result[0].Messages[i] + + Expect(msg2.ID).To(Equal(msg3.ID)) + + if msg2.MillisElapsedFromDelivery > 0 { + Expect(msg3.MillisElapsedFromDelivery).To(BeNumerically(">", 0)) + Expect(msg2.DeliveredCount).To(Equal(msg3.DeliveredCount)) + } + } + }) }) Describe("xinfo", func() { diff --git a/doctests/stream_tutorial_test.go b/doctests/stream_tutorial_test.go index e39919ea..655021fa 100644 --- a/doctests/stream_tutorial_test.go +++ b/doctests/stream_tutorial_test.go @@ -216,8 +216,8 @@ func ExampleClient_racefrance1() { // REMOVE_END // Output: - // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}] - // [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]}]}] + // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}] + // [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0}]}] // 4 } @@ -467,13 +467,13 @@ func ExampleClient_racefrance2() { // STEP_END // Output: - // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}] - // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]}] - // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}] - // [{1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7]} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}] + // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0} {1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}] + // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0}] + // [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}] + // [{1692632102976-0 map[location_id:1 position:2 rider:Prickett speed:29.7] 0 0} {1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}] // [] - // [{1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9]}] - // [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2]} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8]}]}] + // [{1692632147973-0 map[location_id:2 position:1 rider:Castilla speed:29.9] 0 0}] + // [{race:france [{1692632086370-0 map[location_id:1 position:1 rider:Castilla speed:30.2] 0 0} {1692632094485-0 map[location_id:1 position:3 rider:Norem speed:28.8] 0 0}]}] } func ExampleClient_xgroupcreate() { @@ -999,18 +999,18 @@ func ExampleClient_raceitaly() { // REMOVE_END // Output: - // [{race:italy [{1692632639151-0 map[rider:Castilla]}]}] + // [{race:italy [{1692632639151-0 map[rider:Castilla] 0 0}]}] // 1 // [{race:italy []}] - // [{race:italy [{1692632647899-0 map[rider:Royce]} {1692632662819-0 map[rider:Sam-Bodden]}]}] + // [{race:italy [{1692632647899-0 map[rider:Royce] 0 0} {1692632662819-0 map[rider:Sam-Bodden] 0 0}]}] // &{2 1692632647899-0 1692632662819-0 map[Bob:2]} - // [{1692632647899-0 map[rider:Royce]}] - // [{1692632647899-0 map[rider:Royce]}] - // [{1692632647899-0 map[rider:Royce]}] + // [{1692632647899-0 map[rider:Royce] 0 0}] + // [{1692632647899-0 map[rider:Royce] 0 0}] + // [{1692632647899-0 map[rider:Royce] 0 0}] // 1692632662819-0 // [] // 0-0 - // &{5 1 2 1 1692632678249-0 0-0 5 {1692632639151-0 map[rider:Castilla]} {1692632678249-0 map[rider:Norem]} 1692632639151-0} + // &{5 1 2 1 1692632678249-0 0-0 5 {1692632639151-0 map[rider:Castilla] 0 0} {1692632678249-0 map[rider:Norem] 0 0} 1692632639151-0} // [{italy_riders 3 2 1692632662819-0 3 2}] // 2 // 0 @@ -1085,7 +1085,7 @@ func ExampleClient_xdel() { // STEP_END // Output: - // [{1692633198206-0 map[rider:Wood]} {1692633208557-0 map[rider:Henshaw]}] + // [{1692633198206-0 map[rider:Wood] 0 0} {1692633208557-0 map[rider:Henshaw] 0 0}] // 1 - // [{1692633198206-0 map[rider:Wood]}] + // [{1692633198206-0 map[rider:Wood] 0 0}] } diff --git a/stream_commands.go b/stream_commands.go index 4b84e00f..5573e48b 100644 --- a/stream_commands.go +++ b/stream_commands.go @@ -263,6 +263,7 @@ type XReadGroupArgs struct { Count int64 Block time.Duration NoAck bool + Claim time.Duration // Claim idle pending entries older than this duration } func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSliceCmd { @@ -282,6 +283,10 @@ func (c cmdable) XReadGroup(ctx context.Context, a *XReadGroupArgs) *XStreamSlic args = append(args, "noack") keyPos++ } + if a.Claim > 0 { + args = append(args, "claim", int64(a.Claim/time.Millisecond)) + keyPos += 2 + } args = append(args, "streams") keyPos++ for _, s := range a.Streams {