diff --git a/internal/auth/streaming/manager_test.go b/internal/auth/streaming/manager_test.go index e4ff813e..83748142 100644 --- a/internal/auth/streaming/manager_test.go +++ b/internal/auth/streaming/manager_test.go @@ -91,6 +91,7 @@ func (m *mockPooler) CloseConn(*pool.Conn) error { return n func (m *mockPooler) Get(ctx context.Context) (*pool.Conn, error) { return nil, nil } func (m *mockPooler) Put(ctx context.Context, conn *pool.Conn) {} func (m *mockPooler) Remove(ctx context.Context, conn *pool.Conn, reason error) {} +func (m *mockPooler) RemoveWithoutTurn(ctx context.Context, conn *pool.Conn, reason error) {} func (m *mockPooler) Len() int { return 0 } func (m *mockPooler) IdleLen() int { return 0 } func (m *mockPooler) Stats() *pool.Stats { return &pool.Stats{} } diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 3fe8bfa5..59b8e194 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -77,6 +77,12 @@ type Pooler interface { Put(context.Context, *Conn) Remove(context.Context, *Conn, error) + // RemoveWithoutTurn removes a connection from the pool without freeing a turn. + // This should be used when removing a connection from a context that didn't acquire + // a turn via Get() (e.g., background workers, cleanup tasks). + // For normal removal after Get(), use Remove() instead. + RemoveWithoutTurn(context.Context, *Conn, error) + Len() int IdleLen() int Stats() *Stats @@ -479,7 +485,9 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) { } if !acceptConn { internal.Logger.Printf(ctx, "redis: connection pool: conn[%d] rejected by hook, returning to pool", cn.GetID()) - p.Put(ctx, cn) + // Return connection to pool without freeing the turn that this Get() call holds. + // We use putConnWithoutTurn() to run all the Put hooks and logic without freeing a turn. + p.putConnWithoutTurn(ctx, cn) cn = nil continue } @@ -615,6 +623,18 @@ func (p *ConnPool) popIdle() (*Conn, error) { } func (p *ConnPool) Put(ctx context.Context, cn *Conn) { + p.putConn(ctx, cn, true) +} + +// putConnWithoutTurn is an internal method that puts a connection back to the pool +// without freeing a turn. This is used when returning a rejected connection from +// within Get(), where the turn is still held by the Get() call. +func (p *ConnPool) putConnWithoutTurn(ctx context.Context, cn *Conn) { + p.putConn(ctx, cn, false) +} + +// putConn is the internal implementation of Put that optionally frees a turn. +func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) { // Process connection using the hooks system shouldPool := true shouldRemove := false @@ -625,7 +645,8 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) { if replyType, err := cn.PeekReplyTypeSafe(); err != nil || replyType != proto.RespPush { // Not a push notification or error peeking, remove connection internal.Logger.Printf(ctx, "Conn has unread data (not push notification), removing it") - p.Remove(ctx, cn, err) + p.removeConnInternal(ctx, cn, err, freeTurn) + return } // It's a push notification, allow pooling (client will handle it) } @@ -638,25 +659,25 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) { shouldPool, shouldRemove, err = hookManager.ProcessOnPut(ctx, cn) if err != nil { internal.Logger.Printf(ctx, "Connection hook error: %v", err) - p.Remove(ctx, cn, err) + p.removeConnInternal(ctx, cn, err, freeTurn) return } } // If hooks say to remove the connection, do so if shouldRemove { - p.Remove(ctx, cn, errors.New("hook requested removal")) + p.removeConnInternal(ctx, cn, errors.New("hook requested removal"), freeTurn) return } // If processor says not to pool the connection, remove it if !shouldPool { - p.Remove(ctx, cn, errors.New("hook requested no pooling")) + p.removeConnInternal(ctx, cn, errors.New("hook requested no pooling"), freeTurn) return } if !cn.pooled { - p.Remove(ctx, cn, errors.New("connection not pooled")) + p.removeConnInternal(ctx, cn, errors.New("connection not pooled"), freeTurn) return } @@ -698,7 +719,9 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) { shouldCloseConn = true } - p.freeTurn() + if freeTurn { + p.freeTurn() + } if shouldCloseConn { _ = p.closeConn(cn) @@ -706,6 +729,19 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) { } func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) { + p.removeConnInternal(ctx, cn, reason, true) +} + +// RemoveWithoutTurn removes a connection from the pool without freeing a turn. +// This should be used when removing a connection from a context that didn't acquire +// a turn via Get() (e.g., background workers, cleanup tasks). +// For normal removal after Get(), use Remove() instead. +func (p *ConnPool) RemoveWithoutTurn(ctx context.Context, cn *Conn, reason error) { + p.removeConnInternal(ctx, cn, reason, false) +} + +// removeConnInternal is the internal implementation of Remove that optionally frees a turn. +func (p *ConnPool) removeConnInternal(ctx context.Context, cn *Conn, reason error, freeTurn bool) { p.hookManagerMu.RLock() hookManager := p.hookManager p.hookManagerMu.RUnlock() @@ -716,7 +752,9 @@ func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) { p.removeConnWithLock(cn) - p.freeTurn() + if freeTurn { + p.freeTurn() + } _ = p.closeConn(cn) diff --git a/internal/pool/pool_single.go b/internal/pool/pool_single.go index 648e5ae4..365219a5 100644 --- a/internal/pool/pool_single.go +++ b/internal/pool/pool_single.go @@ -72,6 +72,12 @@ func (p *SingleConnPool) Remove(_ context.Context, cn *Conn, reason error) { p.stickyErr = reason } +// RemoveWithoutTurn has the same behavior as Remove for SingleConnPool +// since SingleConnPool doesn't use a turn-based queue system. +func (p *SingleConnPool) RemoveWithoutTurn(ctx context.Context, cn *Conn, reason error) { + p.Remove(ctx, cn, reason) +} + func (p *SingleConnPool) Close() error { p.cn = nil p.stickyErr = ErrClosed diff --git a/internal/pool/pool_sticky.go b/internal/pool/pool_sticky.go index 22e5a941..be869b56 100644 --- a/internal/pool/pool_sticky.go +++ b/internal/pool/pool_sticky.go @@ -123,6 +123,12 @@ func (p *StickyConnPool) Remove(ctx context.Context, cn *Conn, reason error) { p.ch <- cn } +// RemoveWithoutTurn has the same behavior as Remove for StickyConnPool +// since StickyConnPool doesn't use a turn-based queue system. +func (p *StickyConnPool) RemoveWithoutTurn(ctx context.Context, cn *Conn, reason error) { + p.Remove(ctx, cn, reason) +} + func (p *StickyConnPool) Close() error { if shared := atomic.AddInt32(&p.shared, -1); shared > 0 { return nil diff --git a/maintnotifications/handoff_worker.go b/maintnotifications/handoff_worker.go index e042e4c6..c4c68186 100644 --- a/maintnotifications/handoff_worker.go +++ b/maintnotifications/handoff_worker.go @@ -481,7 +481,11 @@ func (hwm *handoffWorkerManager) closeConnFromRequest(ctx context.Context, reque conn.ClearHandoffState() if pooler != nil { - pooler.Remove(ctx, conn, err) + // Use RemoveWithoutTurn instead of Remove to avoid freeing a turn that we don't have. + // The handoff worker doesn't call Get(), so it doesn't have a turn to free. + // Remove() is meant to be called after Get() and frees a turn. + // RemoveWithoutTurn() removes and closes the connection without affecting the queue. + pooler.RemoveWithoutTurn(ctx, conn, err) if internal.LogLevel.WarnOrAbove() { internal.Logger.Printf(ctx, logs.RemovingConnectionFromPool(conn.GetID(), err)) } diff --git a/maintnotifications/pool_hook_test.go b/maintnotifications/pool_hook_test.go index 963780aa..41120af2 100644 --- a/maintnotifications/pool_hook_test.go +++ b/maintnotifications/pool_hook_test.go @@ -75,6 +75,11 @@ func (mp *mockPool) Remove(ctx context.Context, conn *pool.Conn, reason error) { mp.removedConnections[conn.GetID()] = true } +func (mp *mockPool) RemoveWithoutTurn(ctx context.Context, conn *pool.Conn, reason error) { + // For mock pool, same behavior as Remove since we don't have a turn-based queue + mp.Remove(ctx, conn, reason) +} + // WasRemoved safely checks if a connection was removed from the pool func (mp *mockPool) WasRemoved(connID uint64) bool { mp.mu.Lock()