From f14095b12c7a49cc35242f94efc6bc3a99a977bb Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Fri, 17 Oct 2025 17:35:29 +0300 Subject: [PATCH] only async reauth --- .../conn_reauth_credentials_listener.go | 24 ++++--------------- internal/auth/streaming/pool_hook.go | 14 ++++++++--- pubsub.go | 1 - redis.go | 2 ++ 4 files changed, 18 insertions(+), 23 deletions(-) diff --git a/internal/auth/streaming/conn_reauth_credentials_listener.go b/internal/auth/streaming/conn_reauth_credentials_listener.go index 8bda93af..d0ac8a84 100644 --- a/internal/auth/streaming/conn_reauth_credentials_listener.go +++ b/internal/auth/streaming/conn_reauth_credentials_listener.go @@ -35,25 +35,11 @@ func (c *ConnReAuthCredentialsListener) OnNext(credentials auth.Credentials) { return } - // this connection is not in use, so we can re-authenticate it - if c.conn.Used.CompareAndSwap(false, true) { - // try to acquire the connection for background operation - if c.conn.Usable.CompareAndSwap(true, false) { - err := c.reAuth(c.conn, credentials) - if err != nil { - c.OnError(err) - } - c.conn.Usable.Store(true) - c.conn.Used.Store(false) - return - } - c.conn.Used.Store(false) - } - // else if the connection is in use, mark it for re-authentication - // and connection pool hook will re-authenticate it when it is returned to the pool - // or in case the connection WAS in the pool, but handoff is in progress, the pool hook - // will re-authenticate it when the handoff is complete - // and the connection is acquired from the pool + // Always use async reauth to avoid complex pool semaphore issues + // The synchronous path can cause deadlocks in the pool's semaphore mechanism + // when called from the Subscribe goroutine, especially with small pool sizes. + // The connection pool hook will re-authenticate the connection when it is + // returned to the pool in a clean, idle state. c.manager.MarkForReAuth(c.conn, func(err error) { if err != nil { c.OnError(err) diff --git a/internal/auth/streaming/pool_hook.go b/internal/auth/streaming/pool_hook.go index 4bffb2c4..c1d01eee 100644 --- a/internal/auth/streaming/pool_hook.go +++ b/internal/auth/streaming/pool_hook.go @@ -92,8 +92,11 @@ func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool, var err error timeout := time.After(r.reAuthTimeout) - // Try to acquire the connection (set Usable to false) - for !conn.Usable.CompareAndSwap(true, false) { + // Try to acquire the connection + // We need to ensure the connection is both Usable and not Used + // to prevent data races with concurrent operations + acquired := false + for !acquired { select { case <-timeout: // Timeout occurred, cannot acquire connection @@ -101,7 +104,12 @@ func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool, reAuthFn(err) return default: - time.Sleep(time.Millisecond) + // Try to acquire: set Usable=false only if Used=false + if !conn.Used.Load() && conn.Usable.CompareAndSwap(true, false) { + acquired = true + } else { + time.Sleep(time.Millisecond) + } } } diff --git a/pubsub.go b/pubsub.go index 5e02b0bd..959a5c45 100644 --- a/pubsub.go +++ b/pubsub.go @@ -465,7 +465,6 @@ func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (int } // Don't hold the lock to allow subscriptions and pings. - cn, err := c.connWithLock(ctx) if err != nil { return nil, err diff --git a/redis.go b/redis.go index ba421af1..7d4c903c 100644 --- a/redis.go +++ b/redis.go @@ -305,6 +305,8 @@ func (c *baseClient) reAuthConnection() func(poolCn *pool.Conn, credentials auth return func(poolCn *pool.Conn, credentials auth.Credentials) error { var err error username, password := credentials.BasicAuth() + + // Use background context - timeout is handled by ReadTimeout in WithReader/WithWriter ctx := context.Background() connPool := pool.NewSingleConnPool(c.connPool, poolCn)