From 7ac4021ae06b4c6ada18b13de5300e9d53e8e376 Mon Sep 17 00:00:00 2001 From: Nikita Semenov Date: Thu, 3 Jul 2025 14:48:06 +0700 Subject: [PATCH] fix: Ring.Pipelined return dial timeout error (#3403) * [ISSUE-3402]: Ring.Pipelined return dial timeout error * review fixes --------- Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com> --- ring.go | 16 +++++++++++++--- ring_test.go | 15 +++++++++++++++ 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/ring.go b/ring.go index ba4f94ee..0c156601 100644 --- a/ring.go +++ b/ring.go @@ -798,6 +798,8 @@ func (c *Ring) generalProcessPipeline( } var wg sync.WaitGroup + errs := make(chan error, len(cmdsMap)) + for hash, cmds := range cmdsMap { wg.Add(1) go func(hash string, cmds []Cmder) { @@ -810,16 +812,24 @@ func (c *Ring) generalProcessPipeline( return } + hook := shard.Client.processPipelineHook if tx { cmds = wrapMultiExec(ctx, cmds) - _ = shard.Client.processTxPipelineHook(ctx, cmds) - } else { - _ = shard.Client.processPipelineHook(ctx, cmds) + hook = shard.Client.processTxPipelineHook + } + + if err = hook(ctx, cmds); err != nil { + errs <- err } }(hash, cmds) } wg.Wait() + close(errs) + + if err := <-errs; err != nil { + return err + } return cmdsFirstErr(cmds) } diff --git a/ring_test.go b/ring_test.go index 5fd7d982..d35c0c5e 100644 --- a/ring_test.go +++ b/ring_test.go @@ -277,6 +277,21 @@ var _ = Describe("Redis Ring", func() { Expect(ringShard1.Info(ctx).Val()).ToNot(ContainSubstring("keys=")) Expect(ringShard2.Info(ctx).Val()).To(ContainSubstring("keys=100")) }) + + It("return dial timeout error", func() { + opt := redisRingOptions() + opt.DialTimeout = 250 * time.Millisecond + opt.Addrs = map[string]string{"ringShardNotExist": ":1997"} + ring = redis.NewRing(opt) + + _, err := ring.Pipelined(ctx, func(pipe redis.Pipeliner) error { + pipe.HSet(ctx, "key", "value") + pipe.Expire(ctx, "key", time.Minute) + return nil + }) + + Expect(err).To(HaveOccurred()) + }) }) Describe("new client callback", func() {