diff --git a/auth/conn_reauth_credentials_listener.go b/auth/conn_reauth_credentials_listener.go index acbd67ac..5b9711d5 100644 --- a/auth/conn_reauth_credentials_listener.go +++ b/auth/conn_reauth_credentials_listener.go @@ -48,22 +48,37 @@ func (c *ConnReAuthCredentialsListener) OnNext(credentials Credentials) { // this is important because the connection pool may be in the process of reconnecting the connection // and we don't want to interfere with that process // but we also don't want to block for too long, so incorporate a timeout -compandswap: - for !c.conn.Usable.CompareAndSwap(true, false) { + for err == nil && !c.conn.Usable.CompareAndSwap(true, false) { select { case <-timeout: err = pool.ErrConnUnusableTimeout - break compandswap default: runtime.Gosched() } } + if err == nil { + defer c.conn.SetUsable(true) + } + + for err == nil && !c.conn.Used.CompareAndSwap(false, true) { + select { + case <-timeout: + err = pool.ErrConnUnusableTimeout + default: + runtime.Gosched() + } + } + + // we timed out waiting for the connection to be usable + // do not try to re-authenticate, instead call the onErr function + // which will handle the error and close the connection if needed if err != nil { c.OnError(err) return } + + defer c.conn.Used.Store(false) // we set the usable flag, so restore it back to usable after we're done - defer c.conn.SetUsable(true) if err = c.reAuth(c.conn, credentials); err != nil { c.OnError(err) } diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 636be9bc..e1bc4124 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -57,7 +57,32 @@ type Conn struct { // Only used for the brief period during SetNetConn and HasBufferedData/PeekReplyTypeSafe readerMu sync.RWMutex + // Design note: + // Why have both Usable and Used? + // _Usable_ is used to mark a connection as safe for use by clients, the connection can still + // be in the pool but not Usable at the moment (e.g. handoff in progress). + // _Used_ is used to mark a connection as used when a command is going to be processed on that connection. + // this is going to happen once the connection is picked from the pool. + // + // If a background operation needs to use the connection, it will mark it as Not Usable and only use it when it + // is not in use. That way, the connection won't be used to send multiple commands at the same time and + // potentially corrupt the command stream. + + // Usable flag to mark connection as safe for use + // It is false before initialization and after a handoff is marked + // It will be false during other background operations like re-authentication Usable atomic.Bool + + // Used flag to mark connection as used when a command is going to be + // processed on that connection. This is used to prevent a race condition with + // background operations that may execute commands, like re-authentication. + Used atomic.Bool + + // Inited flag to mark connection as initialized, this is almost the same as usable + // but it is used to make sure we don't initialize a network connection twice + // On handoff, the network connection is replaced, but the Conn struct is reused + // this flag will be set to false when the network connection is replaced and + // set to true after the new network connection is initialized Inited atomic.Bool pooled bool @@ -456,7 +481,7 @@ func (cn *Conn) MarkQueuedForHandoff() error { const maxRetries = 50 const baseDelay = time.Microsecond - connAquired := false + connAcquired := false for attempt := 0; attempt < maxRetries; attempt++ { // If CAS failed, add exponential backoff to reduce contention // the delay will be 1, 2, 4... up to 512 microseconds @@ -468,10 +493,10 @@ func (cn *Conn) MarkQueuedForHandoff() error { // first we need to mark the connection as not usable // to prevent the pool from returning it to the caller - if !connAquired && !cn.Usable.CompareAndSwap(true, false) { + if !connAcquired && !cn.Usable.CompareAndSwap(true, false) { continue } - connAquired = true + connAcquired = true currentState := cn.getHandoffState() // Check if marked for handoff @@ -539,7 +564,8 @@ func (cn *Conn) ClearHandoffState() { // Atomically set clean state cn.setHandoffState(cleanState) cn.setHandoffRetries(0) - cn.setUsable(true) // Connection is safe to use again after handoff completes + // Clearing handoff state also means the connection is usable again + cn.setUsable(true) } // IncrementAndGetHandoffRetries atomically increments and returns handoff retries (lock-free). diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 83acb6ff..96bbb6db 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -239,6 +239,7 @@ func (p *ConnPool) addIdleConn() error { if err != nil { return err } + // Mark connection as usable after successful creation // This is essential for normal pool operations cn.SetUsable(true) @@ -280,6 +281,7 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) { if err != nil { return nil, err } + // Mark connection as usable after successful creation // This is essential for normal pool operations cn.SetUsable(true) @@ -571,8 +573,10 @@ func (p *ConnPool) popIdle() (*Conn, error) { attempts++ if cn.IsUsable() { - p.idleConnsLen.Add(-1) - break + if cn.Used.CompareAndSwap(false, true) { + p.idleConnsLen.Add(-1) + break + } } // Connection is not usable, put it back in the pool @@ -667,6 +671,12 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) { shouldCloseConn = true } + // Mark connection as not used only + // if it's not being closed + if !shouldCloseConn { + cn.Used.Store(false) + } + p.freeTurn() if shouldCloseConn { diff --git a/internal/pool/pool_single.go b/internal/pool/pool_single.go index 136d6f2d..6f73e539 100644 --- a/internal/pool/pool_single.go +++ b/internal/pool/pool_single.go @@ -2,6 +2,7 @@ package pool import ( "context" + "time" ) type SingleConnPool struct { @@ -31,12 +32,26 @@ func (p *SingleConnPool) Get(ctx context.Context) (*Conn, error) { if p.stickyErr != nil { return nil, p.stickyErr } + if p.cn == nil { + return nil, ErrClosed + } + p.cn.Used.Store(true) + p.cn.SetUsedAt(time.Now()) return p.cn, nil } -func (p *SingleConnPool) Put(ctx context.Context, cn *Conn) {} +func (p *SingleConnPool) Put(ctx context.Context, cn *Conn) { + if p.cn == nil { + return + } + if p.cn != cn { + return + } + p.cn.Used.Store(false) +} func (p *SingleConnPool) Remove(ctx context.Context, cn *Conn, reason error) { + cn.Used.Store(false) p.cn = nil p.stickyErr = reason } diff --git a/maintnotifications/handoff_worker.go b/maintnotifications/handoff_worker.go index 61dc1e17..dce984c7 100644 --- a/maintnotifications/handoff_worker.go +++ b/maintnotifications/handoff_worker.go @@ -378,8 +378,12 @@ func (hwm *handoffWorkerManager) performConnectionHandoff(ctx context.Context, c } // performHandoffInternal performs the actual handoff logic (extracted for circuit breaker integration) -func (hwm *handoffWorkerManager) performHandoffInternal(ctx context.Context, conn *pool.Conn, newEndpoint string, connID uint64) (shouldRetry bool, err error) { - +func (hwm *handoffWorkerManager) performHandoffInternal( + ctx context.Context, + conn *pool.Conn, + newEndpoint string, + connID uint64, +) (shouldRetry bool, err error) { retries := conn.IncrementAndGetHandoffRetries(1) internal.Logger.Printf(ctx, logs.HandoffRetryAttempt(connID, retries, newEndpoint, conn.RemoteAddr().String())) maxRetries := 3 // Default fallback @@ -438,9 +442,14 @@ func (hwm *handoffWorkerManager) performHandoffInternal(ctx context.Context, con } }() + // Clear handoff state will: + // - set the connection as usable again + // - clear the handoff state (shouldHandoff, endpoint, seqID) + // - reset the handoff retries to 0 conn.ClearHandoffState() internal.Logger.Printf(ctx, logs.HandoffSucceeded(connID, newEndpoint)) + // successfully completed the handoff, no retry needed and no error return false, nil } diff --git a/redis.go b/redis.go index fa3b758b..03bbf239 100644 --- a/redis.go +++ b/redis.go @@ -549,6 +549,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error { // mark the connection as usable and inited // once returned to the pool as idle, this connection can be used by other clients cn.SetUsable(true) + cn.Used.Store(false) cn.Inited.Store(true) // Set the connection initialization function for potential reconnections