mirror of
https://github.com/redis/go-redis.git
synced 2025-10-18 22:08:50 +03:00
add Used to clarify the state of the conn
This commit is contained in:
@@ -48,22 +48,37 @@ func (c *ConnReAuthCredentialsListener) OnNext(credentials Credentials) {
|
||||
// this is important because the connection pool may be in the process of reconnecting the connection
|
||||
// and we don't want to interfere with that process
|
||||
// but we also don't want to block for too long, so incorporate a timeout
|
||||
compandswap:
|
||||
for !c.conn.Usable.CompareAndSwap(true, false) {
|
||||
for err == nil && !c.conn.Usable.CompareAndSwap(true, false) {
|
||||
select {
|
||||
case <-timeout:
|
||||
err = pool.ErrConnUnusableTimeout
|
||||
break compandswap
|
||||
default:
|
||||
runtime.Gosched()
|
||||
}
|
||||
}
|
||||
if err == nil {
|
||||
defer c.conn.SetUsable(true)
|
||||
}
|
||||
|
||||
for err == nil && !c.conn.Used.CompareAndSwap(false, true) {
|
||||
select {
|
||||
case <-timeout:
|
||||
err = pool.ErrConnUnusableTimeout
|
||||
default:
|
||||
runtime.Gosched()
|
||||
}
|
||||
}
|
||||
|
||||
// we timed out waiting for the connection to be usable
|
||||
// do not try to re-authenticate, instead call the onErr function
|
||||
// which will handle the error and close the connection if needed
|
||||
if err != nil {
|
||||
c.OnError(err)
|
||||
return
|
||||
}
|
||||
|
||||
defer c.conn.Used.Store(false)
|
||||
// we set the usable flag, so restore it back to usable after we're done
|
||||
defer c.conn.SetUsable(true)
|
||||
if err = c.reAuth(c.conn, credentials); err != nil {
|
||||
c.OnError(err)
|
||||
}
|
||||
|
@@ -57,7 +57,32 @@ type Conn struct {
|
||||
// Only used for the brief period during SetNetConn and HasBufferedData/PeekReplyTypeSafe
|
||||
readerMu sync.RWMutex
|
||||
|
||||
// Design note:
|
||||
// Why have both Usable and Used?
|
||||
// _Usable_ is used to mark a connection as safe for use by clients, the connection can still
|
||||
// be in the pool but not Usable at the moment (e.g. handoff in progress).
|
||||
// _Used_ is used to mark a connection as used when a command is going to be processed on that connection.
|
||||
// this is going to happen once the connection is picked from the pool.
|
||||
//
|
||||
// If a background operation needs to use the connection, it will mark it as Not Usable and only use it when it
|
||||
// is not in use. That way, the connection won't be used to send multiple commands at the same time and
|
||||
// potentially corrupt the command stream.
|
||||
|
||||
// Usable flag to mark connection as safe for use
|
||||
// It is false before initialization and after a handoff is marked
|
||||
// It will be false during other background operations like re-authentication
|
||||
Usable atomic.Bool
|
||||
|
||||
// Used flag to mark connection as used when a command is going to be
|
||||
// processed on that connection. This is used to prevent a race condition with
|
||||
// background operations that may execute commands, like re-authentication.
|
||||
Used atomic.Bool
|
||||
|
||||
// Inited flag to mark connection as initialized, this is almost the same as usable
|
||||
// but it is used to make sure we don't initialize a network connection twice
|
||||
// On handoff, the network connection is replaced, but the Conn struct is reused
|
||||
// this flag will be set to false when the network connection is replaced and
|
||||
// set to true after the new network connection is initialized
|
||||
Inited atomic.Bool
|
||||
|
||||
pooled bool
|
||||
@@ -456,7 +481,7 @@ func (cn *Conn) MarkQueuedForHandoff() error {
|
||||
const maxRetries = 50
|
||||
const baseDelay = time.Microsecond
|
||||
|
||||
connAquired := false
|
||||
connAcquired := false
|
||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||
// If CAS failed, add exponential backoff to reduce contention
|
||||
// the delay will be 1, 2, 4... up to 512 microseconds
|
||||
@@ -468,10 +493,10 @@ func (cn *Conn) MarkQueuedForHandoff() error {
|
||||
|
||||
// first we need to mark the connection as not usable
|
||||
// to prevent the pool from returning it to the caller
|
||||
if !connAquired && !cn.Usable.CompareAndSwap(true, false) {
|
||||
if !connAcquired && !cn.Usable.CompareAndSwap(true, false) {
|
||||
continue
|
||||
}
|
||||
connAquired = true
|
||||
connAcquired = true
|
||||
|
||||
currentState := cn.getHandoffState()
|
||||
// Check if marked for handoff
|
||||
@@ -539,7 +564,8 @@ func (cn *Conn) ClearHandoffState() {
|
||||
// Atomically set clean state
|
||||
cn.setHandoffState(cleanState)
|
||||
cn.setHandoffRetries(0)
|
||||
cn.setUsable(true) // Connection is safe to use again after handoff completes
|
||||
// Clearing handoff state also means the connection is usable again
|
||||
cn.setUsable(true)
|
||||
}
|
||||
|
||||
// IncrementAndGetHandoffRetries atomically increments and returns handoff retries (lock-free).
|
||||
|
@@ -239,6 +239,7 @@ func (p *ConnPool) addIdleConn() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Mark connection as usable after successful creation
|
||||
// This is essential for normal pool operations
|
||||
cn.SetUsable(true)
|
||||
@@ -280,6 +281,7 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Mark connection as usable after successful creation
|
||||
// This is essential for normal pool operations
|
||||
cn.SetUsable(true)
|
||||
@@ -571,9 +573,11 @@ func (p *ConnPool) popIdle() (*Conn, error) {
|
||||
attempts++
|
||||
|
||||
if cn.IsUsable() {
|
||||
if cn.Used.CompareAndSwap(false, true) {
|
||||
p.idleConnsLen.Add(-1)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Connection is not usable, put it back in the pool
|
||||
if p.cfg.PoolFIFO {
|
||||
@@ -667,6 +671,12 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
|
||||
shouldCloseConn = true
|
||||
}
|
||||
|
||||
// Mark connection as not used only
|
||||
// if it's not being closed
|
||||
if !shouldCloseConn {
|
||||
cn.Used.Store(false)
|
||||
}
|
||||
|
||||
p.freeTurn()
|
||||
|
||||
if shouldCloseConn {
|
||||
|
@@ -2,6 +2,7 @@ package pool
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
type SingleConnPool struct {
|
||||
@@ -31,12 +32,26 @@ func (p *SingleConnPool) Get(ctx context.Context) (*Conn, error) {
|
||||
if p.stickyErr != nil {
|
||||
return nil, p.stickyErr
|
||||
}
|
||||
if p.cn == nil {
|
||||
return nil, ErrClosed
|
||||
}
|
||||
p.cn.Used.Store(true)
|
||||
p.cn.SetUsedAt(time.Now())
|
||||
return p.cn, nil
|
||||
}
|
||||
|
||||
func (p *SingleConnPool) Put(ctx context.Context, cn *Conn) {}
|
||||
func (p *SingleConnPool) Put(ctx context.Context, cn *Conn) {
|
||||
if p.cn == nil {
|
||||
return
|
||||
}
|
||||
if p.cn != cn {
|
||||
return
|
||||
}
|
||||
p.cn.Used.Store(false)
|
||||
}
|
||||
|
||||
func (p *SingleConnPool) Remove(ctx context.Context, cn *Conn, reason error) {
|
||||
cn.Used.Store(false)
|
||||
p.cn = nil
|
||||
p.stickyErr = reason
|
||||
}
|
||||
|
@@ -378,8 +378,12 @@ func (hwm *handoffWorkerManager) performConnectionHandoff(ctx context.Context, c
|
||||
}
|
||||
|
||||
// performHandoffInternal performs the actual handoff logic (extracted for circuit breaker integration)
|
||||
func (hwm *handoffWorkerManager) performHandoffInternal(ctx context.Context, conn *pool.Conn, newEndpoint string, connID uint64) (shouldRetry bool, err error) {
|
||||
|
||||
func (hwm *handoffWorkerManager) performHandoffInternal(
|
||||
ctx context.Context,
|
||||
conn *pool.Conn,
|
||||
newEndpoint string,
|
||||
connID uint64,
|
||||
) (shouldRetry bool, err error) {
|
||||
retries := conn.IncrementAndGetHandoffRetries(1)
|
||||
internal.Logger.Printf(ctx, logs.HandoffRetryAttempt(connID, retries, newEndpoint, conn.RemoteAddr().String()))
|
||||
maxRetries := 3 // Default fallback
|
||||
@@ -438,9 +442,14 @@ func (hwm *handoffWorkerManager) performHandoffInternal(ctx context.Context, con
|
||||
}
|
||||
}()
|
||||
|
||||
// Clear handoff state will:
|
||||
// - set the connection as usable again
|
||||
// - clear the handoff state (shouldHandoff, endpoint, seqID)
|
||||
// - reset the handoff retries to 0
|
||||
conn.ClearHandoffState()
|
||||
internal.Logger.Printf(ctx, logs.HandoffSucceeded(connID, newEndpoint))
|
||||
|
||||
// successfully completed the handoff, no retry needed and no error
|
||||
return false, nil
|
||||
}
|
||||
|
||||
|
1
redis.go
1
redis.go
@@ -549,6 +549,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
|
||||
// mark the connection as usable and inited
|
||||
// once returned to the pool as idle, this connection can be used by other clients
|
||||
cn.SetUsable(true)
|
||||
cn.Used.Store(false)
|
||||
cn.Inited.Store(true)
|
||||
|
||||
// Set the connection initialization function for potential reconnections
|
||||
|
Reference in New Issue
Block a user