1
0
mirror of https://github.com/redis/go-redis.git synced 2025-04-16 09:23:06 +03:00

fix: Fixed issue with context cancelled error leading to connection spikes on Primary instances (#3190)

* fix: Fixed issue with context cancelled error leading to connection spikes on Master

* fix: Added tests

* fix: Updated tests

---------

Co-authored-by: Bhargav Dodla <bdodla@expediagroup.com>
Co-authored-by: Nedyalko Dyakov <nedyalko.dyakov@gmail.com>
This commit is contained in:
Bhargav Dodla 2025-03-05 12:08:27 -08:00 committed by GitHub
parent 8e4a2ee5c5
commit 162a15432b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 45 additions and 1 deletions

View File

@ -38,6 +38,15 @@ type Error interface {
var _ Error = proto.RedisError("")
func isContextError(err error) bool {
switch err {
case context.Canceled, context.DeadlineExceeded:
return true
default:
return false
}
}
func shouldRetry(err error, retryTimeout bool) bool {
switch err {
case io.EOF, io.ErrUnexpectedEOF:

View File

@ -1350,7 +1350,9 @@ func (c *ClusterClient) processPipelineNode(
_ = node.Client.withProcessPipelineHook(ctx, cmds, func(ctx context.Context, cmds []Cmder) error {
cn, err := node.Client.getConn(ctx)
if err != nil {
if !isContextError(err) {
node.MarkAsFailing()
}
_ = c.mapCmdsByNode(ctx, failedCmds, cmds)
setCmdsErr(cmds, err)
return err

View File

@ -539,6 +539,39 @@ var _ = Describe("ClusterClient", func() {
AfterEach(func() {})
assertPipeline()
It("doesn't fail node with context.Canceled error", func() {
ctx, cancel := context.WithCancel(context.Background())
cancel()
pipe.Set(ctx, "A", "A_value", 0)
_, err := pipe.Exec(ctx)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, context.Canceled)).To(BeTrue())
clientNodes, _ := client.Nodes(ctx, "A")
for _, node := range clientNodes {
Expect(node.Failing()).To(BeFalse())
}
})
It("doesn't fail node with context.DeadlineExceeded error", func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
pipe.Set(ctx, "A", "A_value", 0)
_, err := pipe.Exec(ctx)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, context.DeadlineExceeded)).To(BeTrue())
clientNodes, _ := client.Nodes(ctx, "A")
for _, node := range clientNodes {
Expect(node.Failing()).To(BeFalse())
}
})
})
Describe("with TxPipeline", func() {