mirror of
https://github.com/redis/go-redis.git
synced 2025-12-02 06:22:31 +03:00
try to detect the deadlock x2
This commit is contained in:
@@ -91,6 +91,7 @@ func (m *mockPooler) CloseConn(*pool.Conn) error { return n
|
|||||||
func (m *mockPooler) Get(ctx context.Context) (*pool.Conn, error) { return nil, nil }
|
func (m *mockPooler) Get(ctx context.Context) (*pool.Conn, error) { return nil, nil }
|
||||||
func (m *mockPooler) Put(ctx context.Context, conn *pool.Conn) {}
|
func (m *mockPooler) Put(ctx context.Context, conn *pool.Conn) {}
|
||||||
func (m *mockPooler) Remove(ctx context.Context, conn *pool.Conn, reason error) {}
|
func (m *mockPooler) Remove(ctx context.Context, conn *pool.Conn, reason error) {}
|
||||||
|
func (m *mockPooler) RemoveWithoutTurn(ctx context.Context, conn *pool.Conn, reason error) {}
|
||||||
func (m *mockPooler) Len() int { return 0 }
|
func (m *mockPooler) Len() int { return 0 }
|
||||||
func (m *mockPooler) IdleLen() int { return 0 }
|
func (m *mockPooler) IdleLen() int { return 0 }
|
||||||
func (m *mockPooler) Stats() *pool.Stats { return &pool.Stats{} }
|
func (m *mockPooler) Stats() *pool.Stats { return &pool.Stats{} }
|
||||||
|
|||||||
@@ -77,6 +77,12 @@ type Pooler interface {
|
|||||||
Put(context.Context, *Conn)
|
Put(context.Context, *Conn)
|
||||||
Remove(context.Context, *Conn, error)
|
Remove(context.Context, *Conn, error)
|
||||||
|
|
||||||
|
// RemoveWithoutTurn removes a connection from the pool without freeing a turn.
|
||||||
|
// This should be used when removing a connection from a context that didn't acquire
|
||||||
|
// a turn via Get() (e.g., background workers, cleanup tasks).
|
||||||
|
// For normal removal after Get(), use Remove() instead.
|
||||||
|
RemoveWithoutTurn(context.Context, *Conn, error)
|
||||||
|
|
||||||
Len() int
|
Len() int
|
||||||
IdleLen() int
|
IdleLen() int
|
||||||
Stats() *Stats
|
Stats() *Stats
|
||||||
@@ -479,7 +485,9 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
|
|||||||
}
|
}
|
||||||
if !acceptConn {
|
if !acceptConn {
|
||||||
internal.Logger.Printf(ctx, "redis: connection pool: conn[%d] rejected by hook, returning to pool", cn.GetID())
|
internal.Logger.Printf(ctx, "redis: connection pool: conn[%d] rejected by hook, returning to pool", cn.GetID())
|
||||||
p.Put(ctx, cn)
|
// Return connection to pool without freeing the turn that this Get() call holds.
|
||||||
|
// We use putConnWithoutTurn() to run all the Put hooks and logic without freeing a turn.
|
||||||
|
p.putConnWithoutTurn(ctx, cn)
|
||||||
cn = nil
|
cn = nil
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -615,6 +623,18 @@ func (p *ConnPool) popIdle() (*Conn, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
|
func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
|
||||||
|
p.putConn(ctx, cn, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// putConnWithoutTurn is an internal method that puts a connection back to the pool
|
||||||
|
// without freeing a turn. This is used when returning a rejected connection from
|
||||||
|
// within Get(), where the turn is still held by the Get() call.
|
||||||
|
func (p *ConnPool) putConnWithoutTurn(ctx context.Context, cn *Conn) {
|
||||||
|
p.putConn(ctx, cn, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// putConn is the internal implementation of Put that optionally frees a turn.
|
||||||
|
func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) {
|
||||||
// Process connection using the hooks system
|
// Process connection using the hooks system
|
||||||
shouldPool := true
|
shouldPool := true
|
||||||
shouldRemove := false
|
shouldRemove := false
|
||||||
@@ -625,7 +645,8 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
|
|||||||
if replyType, err := cn.PeekReplyTypeSafe(); err != nil || replyType != proto.RespPush {
|
if replyType, err := cn.PeekReplyTypeSafe(); err != nil || replyType != proto.RespPush {
|
||||||
// Not a push notification or error peeking, remove connection
|
// Not a push notification or error peeking, remove connection
|
||||||
internal.Logger.Printf(ctx, "Conn has unread data (not push notification), removing it")
|
internal.Logger.Printf(ctx, "Conn has unread data (not push notification), removing it")
|
||||||
p.Remove(ctx, cn, err)
|
p.removeConnInternal(ctx, cn, err, freeTurn)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
// It's a push notification, allow pooling (client will handle it)
|
// It's a push notification, allow pooling (client will handle it)
|
||||||
}
|
}
|
||||||
@@ -638,25 +659,25 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
|
|||||||
shouldPool, shouldRemove, err = hookManager.ProcessOnPut(ctx, cn)
|
shouldPool, shouldRemove, err = hookManager.ProcessOnPut(ctx, cn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
internal.Logger.Printf(ctx, "Connection hook error: %v", err)
|
internal.Logger.Printf(ctx, "Connection hook error: %v", err)
|
||||||
p.Remove(ctx, cn, err)
|
p.removeConnInternal(ctx, cn, err, freeTurn)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If hooks say to remove the connection, do so
|
// If hooks say to remove the connection, do so
|
||||||
if shouldRemove {
|
if shouldRemove {
|
||||||
p.Remove(ctx, cn, errors.New("hook requested removal"))
|
p.removeConnInternal(ctx, cn, errors.New("hook requested removal"), freeTurn)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// If processor says not to pool the connection, remove it
|
// If processor says not to pool the connection, remove it
|
||||||
if !shouldPool {
|
if !shouldPool {
|
||||||
p.Remove(ctx, cn, errors.New("hook requested no pooling"))
|
p.removeConnInternal(ctx, cn, errors.New("hook requested no pooling"), freeTurn)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if !cn.pooled {
|
if !cn.pooled {
|
||||||
p.Remove(ctx, cn, errors.New("connection not pooled"))
|
p.removeConnInternal(ctx, cn, errors.New("connection not pooled"), freeTurn)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -698,7 +719,9 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
|
|||||||
shouldCloseConn = true
|
shouldCloseConn = true
|
||||||
}
|
}
|
||||||
|
|
||||||
p.freeTurn()
|
if freeTurn {
|
||||||
|
p.freeTurn()
|
||||||
|
}
|
||||||
|
|
||||||
if shouldCloseConn {
|
if shouldCloseConn {
|
||||||
_ = p.closeConn(cn)
|
_ = p.closeConn(cn)
|
||||||
@@ -706,6 +729,19 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) {
|
func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) {
|
||||||
|
p.removeConnInternal(ctx, cn, reason, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RemoveWithoutTurn removes a connection from the pool without freeing a turn.
|
||||||
|
// This should be used when removing a connection from a context that didn't acquire
|
||||||
|
// a turn via Get() (e.g., background workers, cleanup tasks).
|
||||||
|
// For normal removal after Get(), use Remove() instead.
|
||||||
|
func (p *ConnPool) RemoveWithoutTurn(ctx context.Context, cn *Conn, reason error) {
|
||||||
|
p.removeConnInternal(ctx, cn, reason, false)
|
||||||
|
}
|
||||||
|
|
||||||
|
// removeConnInternal is the internal implementation of Remove that optionally frees a turn.
|
||||||
|
func (p *ConnPool) removeConnInternal(ctx context.Context, cn *Conn, reason error, freeTurn bool) {
|
||||||
p.hookManagerMu.RLock()
|
p.hookManagerMu.RLock()
|
||||||
hookManager := p.hookManager
|
hookManager := p.hookManager
|
||||||
p.hookManagerMu.RUnlock()
|
p.hookManagerMu.RUnlock()
|
||||||
@@ -716,7 +752,9 @@ func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) {
|
|||||||
|
|
||||||
p.removeConnWithLock(cn)
|
p.removeConnWithLock(cn)
|
||||||
|
|
||||||
p.freeTurn()
|
if freeTurn {
|
||||||
|
p.freeTurn()
|
||||||
|
}
|
||||||
|
|
||||||
_ = p.closeConn(cn)
|
_ = p.closeConn(cn)
|
||||||
|
|
||||||
|
|||||||
@@ -72,6 +72,12 @@ func (p *SingleConnPool) Remove(_ context.Context, cn *Conn, reason error) {
|
|||||||
p.stickyErr = reason
|
p.stickyErr = reason
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RemoveWithoutTurn has the same behavior as Remove for SingleConnPool
|
||||||
|
// since SingleConnPool doesn't use a turn-based queue system.
|
||||||
|
func (p *SingleConnPool) RemoveWithoutTurn(ctx context.Context, cn *Conn, reason error) {
|
||||||
|
p.Remove(ctx, cn, reason)
|
||||||
|
}
|
||||||
|
|
||||||
func (p *SingleConnPool) Close() error {
|
func (p *SingleConnPool) Close() error {
|
||||||
p.cn = nil
|
p.cn = nil
|
||||||
p.stickyErr = ErrClosed
|
p.stickyErr = ErrClosed
|
||||||
|
|||||||
@@ -123,6 +123,12 @@ func (p *StickyConnPool) Remove(ctx context.Context, cn *Conn, reason error) {
|
|||||||
p.ch <- cn
|
p.ch <- cn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RemoveWithoutTurn has the same behavior as Remove for StickyConnPool
|
||||||
|
// since StickyConnPool doesn't use a turn-based queue system.
|
||||||
|
func (p *StickyConnPool) RemoveWithoutTurn(ctx context.Context, cn *Conn, reason error) {
|
||||||
|
p.Remove(ctx, cn, reason)
|
||||||
|
}
|
||||||
|
|
||||||
func (p *StickyConnPool) Close() error {
|
func (p *StickyConnPool) Close() error {
|
||||||
if shared := atomic.AddInt32(&p.shared, -1); shared > 0 {
|
if shared := atomic.AddInt32(&p.shared, -1); shared > 0 {
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -481,7 +481,11 @@ func (hwm *handoffWorkerManager) closeConnFromRequest(ctx context.Context, reque
|
|||||||
conn.ClearHandoffState()
|
conn.ClearHandoffState()
|
||||||
|
|
||||||
if pooler != nil {
|
if pooler != nil {
|
||||||
pooler.Remove(ctx, conn, err)
|
// Use RemoveWithoutTurn instead of Remove to avoid freeing a turn that we don't have.
|
||||||
|
// The handoff worker doesn't call Get(), so it doesn't have a turn to free.
|
||||||
|
// Remove() is meant to be called after Get() and frees a turn.
|
||||||
|
// RemoveWithoutTurn() removes and closes the connection without affecting the queue.
|
||||||
|
pooler.RemoveWithoutTurn(ctx, conn, err)
|
||||||
if internal.LogLevel.WarnOrAbove() {
|
if internal.LogLevel.WarnOrAbove() {
|
||||||
internal.Logger.Printf(ctx, logs.RemovingConnectionFromPool(conn.GetID(), err))
|
internal.Logger.Printf(ctx, logs.RemovingConnectionFromPool(conn.GetID(), err))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -75,6 +75,11 @@ func (mp *mockPool) Remove(ctx context.Context, conn *pool.Conn, reason error) {
|
|||||||
mp.removedConnections[conn.GetID()] = true
|
mp.removedConnections[conn.GetID()] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mp *mockPool) RemoveWithoutTurn(ctx context.Context, conn *pool.Conn, reason error) {
|
||||||
|
// For mock pool, same behavior as Remove since we don't have a turn-based queue
|
||||||
|
mp.Remove(ctx, conn, reason)
|
||||||
|
}
|
||||||
|
|
||||||
// WasRemoved safely checks if a connection was removed from the pool
|
// WasRemoved safely checks if a connection was removed from the pool
|
||||||
func (mp *mockPool) WasRemoved(connID uint64) bool {
|
func (mp *mockPool) WasRemoved(connID uint64) bool {
|
||||||
mp.mu.Lock()
|
mp.mu.Lock()
|
||||||
|
|||||||
Reference in New Issue
Block a user