mirror of
https://github.com/redis/go-redis.git
synced 2025-07-29 17:41:15 +03:00
fix: Ring.Pipelined return dial timeout error (#3403)
* [ISSUE-3402]: Ring.Pipelined return dial timeout error * review fixes --------- Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com>
This commit is contained in:
16
ring.go
16
ring.go
@ -798,6 +798,8 @@ func (c *Ring) generalProcessPipeline(
|
|||||||
}
|
}
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
errs := make(chan error, len(cmdsMap))
|
||||||
|
|
||||||
for hash, cmds := range cmdsMap {
|
for hash, cmds := range cmdsMap {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(hash string, cmds []Cmder) {
|
go func(hash string, cmds []Cmder) {
|
||||||
@ -810,16 +812,24 @@ func (c *Ring) generalProcessPipeline(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
hook := shard.Client.processPipelineHook
|
||||||
if tx {
|
if tx {
|
||||||
cmds = wrapMultiExec(ctx, cmds)
|
cmds = wrapMultiExec(ctx, cmds)
|
||||||
_ = shard.Client.processTxPipelineHook(ctx, cmds)
|
hook = shard.Client.processTxPipelineHook
|
||||||
} else {
|
}
|
||||||
_ = shard.Client.processPipelineHook(ctx, cmds)
|
|
||||||
|
if err = hook(ctx, cmds); err != nil {
|
||||||
|
errs <- err
|
||||||
}
|
}
|
||||||
}(hash, cmds)
|
}(hash, cmds)
|
||||||
}
|
}
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
close(errs)
|
||||||
|
|
||||||
|
if err := <-errs; err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return cmdsFirstErr(cmds)
|
return cmdsFirstErr(cmds)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
15
ring_test.go
15
ring_test.go
@ -277,6 +277,21 @@ var _ = Describe("Redis Ring", func() {
|
|||||||
Expect(ringShard1.Info(ctx).Val()).ToNot(ContainSubstring("keys="))
|
Expect(ringShard1.Info(ctx).Val()).ToNot(ContainSubstring("keys="))
|
||||||
Expect(ringShard2.Info(ctx).Val()).To(ContainSubstring("keys=100"))
|
Expect(ringShard2.Info(ctx).Val()).To(ContainSubstring("keys=100"))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("return dial timeout error", func() {
|
||||||
|
opt := redisRingOptions()
|
||||||
|
opt.DialTimeout = 250 * time.Millisecond
|
||||||
|
opt.Addrs = map[string]string{"ringShardNotExist": ":1997"}
|
||||||
|
ring = redis.NewRing(opt)
|
||||||
|
|
||||||
|
_, err := ring.Pipelined(ctx, func(pipe redis.Pipeliner) error {
|
||||||
|
pipe.HSet(ctx, "key", "value")
|
||||||
|
pipe.Expire(ctx, "key", time.Minute)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
Expect(err).To(HaveOccurred())
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
Describe("new client callback", func() {
|
Describe("new client callback", func() {
|
||||||
|
Reference in New Issue
Block a user