mirror of
https://github.com/redis/go-redis.git
synced 2025-06-12 14:21:52 +03:00
support "XINFO CONSUMERS" (#1649)
* support "XINFO CONSUMERS" * add "xinfo" test
This commit is contained in:
105
commands_test.go
105
commands_test.go
@ -3891,6 +3891,111 @@ var _ = Describe("Commands", func() {
|
||||
Expect(n).To(Equal(int64(2)))
|
||||
})
|
||||
})
|
||||
|
||||
Describe("xinfo", func() {
|
||||
BeforeEach(func() {
|
||||
err := client.XGroupCreate(ctx, "stream", "group1", "0").Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
res, err := client.XReadGroup(ctx, &redis.XReadGroupArgs{
|
||||
Group: "group1",
|
||||
Consumer: "consumer1",
|
||||
Streams: []string{"stream", ">"},
|
||||
Count: 2,
|
||||
}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]redis.XStream{
|
||||
{
|
||||
Stream: "stream",
|
||||
Messages: []redis.XMessage{
|
||||
{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||
},
|
||||
},
|
||||
}))
|
||||
|
||||
res, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{
|
||||
Group: "group1",
|
||||
Consumer: "consumer2",
|
||||
Streams: []string{"stream", ">"},
|
||||
}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]redis.XStream{
|
||||
{
|
||||
Stream: "stream",
|
||||
Messages: []redis.XMessage{
|
||||
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||
},
|
||||
},
|
||||
}))
|
||||
|
||||
err = client.XGroupCreate(ctx, "stream", "group2", "1-0").Err()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
|
||||
res, err = client.XReadGroup(ctx, &redis.XReadGroupArgs{
|
||||
Group: "group2",
|
||||
Consumer: "consumer1",
|
||||
Streams: []string{"stream", ">"},
|
||||
}).Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]redis.XStream{
|
||||
{
|
||||
Stream: "stream",
|
||||
Messages: []redis.XMessage{
|
||||
{ID: "2-0", Values: map[string]interface{}{"dos": "deux"}},
|
||||
{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||
},
|
||||
},
|
||||
}))
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
n, err := client.XGroupDestroy(ctx, "stream", "group1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(n).To(Equal(int64(1)))
|
||||
n, err = client.XGroupDestroy(ctx, "stream", "group2").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(n).To(Equal(int64(1)))
|
||||
})
|
||||
|
||||
It("should XINFO STREAM", func() {
|
||||
res, err := client.XInfoStream(ctx, "stream").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
res.RadixTreeKeys = 0
|
||||
res.RadixTreeNodes = 0
|
||||
|
||||
Expect(res).To(Equal(&redis.XInfoStream{
|
||||
Length: 3,
|
||||
RadixTreeKeys: 0,
|
||||
RadixTreeNodes: 0,
|
||||
Groups: 2,
|
||||
LastGeneratedID: "3-0",
|
||||
FirstEntry: redis.XMessage{ID: "1-0", Values: map[string]interface{}{"uno": "un"}},
|
||||
LastEntry: redis.XMessage{ID: "3-0", Values: map[string]interface{}{"tres": "troix"}},
|
||||
}))
|
||||
})
|
||||
|
||||
It("should XINFO GROUPS", func() {
|
||||
res, err := client.XInfoGroups(ctx, "stream").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(res).To(Equal([]redis.XInfoGroup{
|
||||
{Name: "group1", Consumers: 2, Pending: 3, LastDeliveredID: "3-0"},
|
||||
{Name: "group2", Consumers: 1, Pending: 2, LastDeliveredID: "3-0"},
|
||||
}))
|
||||
})
|
||||
|
||||
It("should XINFO CONSUMERS", func() {
|
||||
res, err := client.XInfoConsumers(ctx, "stream", "group1").Result()
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
for i := range res {
|
||||
res[i].Idle = 0
|
||||
}
|
||||
Expect(res).To(Equal([]redis.XInfoConsumer{
|
||||
{Name: "consumer1", Pending: 2, Idle: 0},
|
||||
{Name: "consumer2", Pending: 1, Idle: 0},
|
||||
}))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Geo add and radius search", func() {
|
||||
|
Reference in New Issue
Block a user