mirror of
https://github.com/redis/go-redis.git
synced 2025-06-06 17:40:59 +03:00
xinfo-groups: support nil lag in XINFO GROUPS (#3369)
* xinfo-groups: support nil lag in XINFO GROUPS * Add test * docs: clarify XInfoGroup.Lag field behavior with Nil values * docs: clarify XInfoGroup.Lag field behavior
This commit is contained in:
parent
03c2c0b088
commit
b67455e099
@ -2104,7 +2104,9 @@ type XInfoGroup struct {
|
|||||||
Pending int64
|
Pending int64
|
||||||
LastDeliveredID string
|
LastDeliveredID string
|
||||||
EntriesRead int64
|
EntriesRead int64
|
||||||
Lag int64
|
// Lag represents the number of pending messages in the stream not yet
|
||||||
|
// delivered to this consumer group. Returns -1 when the lag cannot be determined.
|
||||||
|
Lag int64
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ Cmder = (*XInfoGroupsCmd)(nil)
|
var _ Cmder = (*XInfoGroupsCmd)(nil)
|
||||||
@ -2187,8 +2189,11 @@ func (cmd *XInfoGroupsCmd) readReply(rd *proto.Reader) error {
|
|||||||
|
|
||||||
// lag: the number of entries in the stream that are still waiting to be delivered
|
// lag: the number of entries in the stream that are still waiting to be delivered
|
||||||
// to the group's consumers, or a NULL(Nil) when that number can't be determined.
|
// to the group's consumers, or a NULL(Nil) when that number can't be determined.
|
||||||
|
// In that case, we return -1.
|
||||||
if err != nil && err != Nil {
|
if err != nil && err != Nil {
|
||||||
return err
|
return err
|
||||||
|
} else if err == Nil {
|
||||||
|
group.Lag = -1
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
return fmt.Errorf("redis: unexpected key %q in XINFO GROUPS reply", key)
|
return fmt.Errorf("redis: unexpected key %q in XINFO GROUPS reply", key)
|
||||||
|
@ -6772,6 +6772,36 @@ var _ = Describe("Commands", func() {
|
|||||||
}))
|
}))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should return -1 for nil lag in XINFO GROUPS", func() {
|
||||||
|
_, err := client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-1", Values: []string{"foo", "1"}}).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-2", Values: []string{"foo", "2"}})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
client.XAdd(ctx, &redis.XAddArgs{Stream: "s", ID: "0-3", Values: []string{"foo", "3"}})
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
err = client.XGroupCreate(ctx, "s", "g", "0").Err()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
err = client.XReadGroup(ctx, &redis.XReadGroupArgs{Group: "g", Consumer: "c", Streams: []string{"s", ">"}, Count: 1, Block: -1, NoAck: false}).Err()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
|
||||||
|
client.XDel(ctx, "s", "0-2")
|
||||||
|
|
||||||
|
res, err := client.XInfoGroups(ctx, "s").Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(res).To(Equal([]redis.XInfoGroup{
|
||||||
|
{
|
||||||
|
Name: "g",
|
||||||
|
Consumers: 1,
|
||||||
|
Pending: 1,
|
||||||
|
LastDeliveredID: "0-1",
|
||||||
|
EntriesRead: 1,
|
||||||
|
Lag: -1, // nil lag from Redis is reported as -1
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
It("should XINFO CONSUMERS", func() {
|
It("should XINFO CONSUMERS", func() {
|
||||||
res, err := client.XInfoConsumers(ctx, "stream", "group1").Result()
|
res, err := client.XInfoConsumers(ctx, "stream", "group1").Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Loading…
x
Reference in New Issue
Block a user