From 9b71da1801eee13829308380d407dd8373cf298d Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Mon, 16 Jun 2025 18:46:01 +0300 Subject: [PATCH] fix(txpipeline): test normal tx pipeline behaviour --- osscluster_test.go | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/osscluster_test.go b/osscluster_test.go index 99341183..7b0a8b29 100644 --- a/osscluster_test.go +++ b/osscluster_test.go @@ -462,8 +462,7 @@ var _ = Describe("ClusterClient", func() { Describe("pipelining", func() { var pipe *redis.Pipeline - assertPipeline := func() { - keys := []string{"A", "B", "C", "D", "E", "F", "G"} + assertPipeline := func(keys []string) { It("follows redirects", func() { if !failover { @@ -482,13 +481,12 @@ var _ = Describe("ClusterClient", func() { Expect(err).NotTo(HaveOccurred()) Expect(cmds).To(HaveLen(14)) - _ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error { - defer GinkgoRecover() - Eventually(func() int64 { - return node.DBSize(ctx).Val() - }, 30*time.Second).ShouldNot(BeZero()) - return nil - }) + // Check that all keys are set. + for _, key := range keys { + Eventually(func() string { + return client.Get(ctx, key).Val() + }, 30*time.Second).Should(Equal(key + "_value")) + } if !failover { for _, key := range keys { @@ -517,14 +515,14 @@ var _ = Describe("ClusterClient", func() { }) It("works with missing keys", func() { - pipe.Set(ctx, "A", "A_value", 0) - pipe.Set(ctx, "C", "C_value", 0) + pipe.Set(ctx, "A{s}", "A_value", 0) + pipe.Set(ctx, "C{s}", "C_value", 0) _, err := pipe.Exec(ctx) Expect(err).NotTo(HaveOccurred()) - a := pipe.Get(ctx, "A") - b := pipe.Get(ctx, "B") - c := pipe.Get(ctx, "C") + a := pipe.Get(ctx, "A{s}") + b := pipe.Get(ctx, "B{s}") + c := pipe.Get(ctx, "C{s}") cmds, err := pipe.Exec(ctx) Expect(err).To(Equal(redis.Nil)) Expect(cmds).To(HaveLen(3)) @@ -547,7 +545,8 @@ var _ = Describe("ClusterClient", func() { AfterEach(func() {}) - assertPipeline() + keys := []string{"A", "B", "C", "D", "E", "F", "G"} + assertPipeline(keys) It("doesn't fail node with context.Canceled error", func() { ctx, cancel := context.WithCancel(context.Background()) @@ -590,7 +589,10 @@ var _ = Describe("ClusterClient", func() { AfterEach(func() {}) - assertPipeline() + // TxPipeline doesn't support cross slot commands. + // Use hashtag to force all keys to the same slot. + keys := []string{"A{s}", "B{s}", "C{s}", "D{s}", "E{s}", "F{s}", "G{s}"} + assertPipeline(keys) }) })