1
0
mirror of https://github.com/redis/go-redis.git synced 2025-10-20 09:52:25 +03:00

fix: pipeline repeatedly sets the error (#3525)

* fix: pipeline repeatedly sets the error

Signed-off-by: Xiaolong Chen <fukua95@gmail.com>

* add test

Signed-off-by: Xiaolong Chen <fukua95@gmail.com>

* CI

Signed-off-by: Xiaolong Chen <fukua95@gmail.com>

---------

Signed-off-by: Xiaolong Chen <fukua95@gmail.com>
Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com>
This commit is contained in:
cxljs
2025-09-17 22:32:24 +08:00
committed by GitHub
parent 286735bef1
commit 113a18ae75
2 changed files with 38 additions and 2 deletions

View File

@@ -60,6 +60,39 @@ var _ = Describe("pipelining", func() {
Expect(cmds).To(BeEmpty())
})
It("pipeline: basic exec", func() {
p := client.Pipeline()
p.Get(ctx, "key")
p.Set(ctx, "key", "value", 0)
p.Get(ctx, "key")
cmds, err := p.Exec(ctx)
Expect(err).To(Equal(redis.Nil))
Expect(cmds).To(HaveLen(3))
Expect(cmds[0].Err()).To(Equal(redis.Nil))
Expect(cmds[1].(*redis.StatusCmd).Val()).To(Equal("OK"))
Expect(cmds[1].Err()).NotTo(HaveOccurred())
Expect(cmds[2].(*redis.StringCmd).Val()).To(Equal("value"))
Expect(cmds[2].Err()).NotTo(HaveOccurred())
})
It("pipeline: exec pipeline when get conn failed", func() {
p := client.Pipeline()
p.Get(ctx, "key")
p.Set(ctx, "key", "value", 0)
p.Get(ctx, "key")
client.Close()
cmds, err := p.Exec(ctx)
Expect(err).To(Equal(redis.ErrClosed))
Expect(cmds).To(HaveLen(3))
for _, cmd := range cmds {
Expect(cmd.Err()).To(Equal(redis.ErrClosed))
}
client = redis.NewClient(redisOptions())
})
assertPipeline := func() {
It("returns no errors when there are no commands", func() {
_, err := pipe.Exec(ctx)

View File

@@ -768,7 +768,10 @@ func (c *baseClient) generalProcessPipeline(
return err
})
if lastErr == nil || !canRetry || !shouldRetry(lastErr, true) {
setCmdsErr(cmds, lastErr)
// The error should be set here only when failing to obtain the conn.
if !isRedisError(lastErr) {
setCmdsErr(cmds, lastErr)
}
return lastErr
}
}
@@ -864,7 +867,7 @@ func (c *baseClient) txPipelineReadQueued(ctx context.Context, cn *pool.Conn, rd
}
// Parse +QUEUED.
for _, cmd := range cmds {
for _, cmd := range cmds {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)