1
0
mirror of https://github.com/redis/go-redis.git synced 2025-12-02 06:22:31 +03:00

fix(init): conn init should be thread safe

This commit is contained in:
Nedyalko Dyakov
2025-10-22 16:34:49 +03:00
parent 70dfa383fe
commit dfda8eb64e
2 changed files with 54 additions and 13 deletions

View File

@@ -83,13 +83,14 @@ type Conn struct {
// On handoff, the network connection is replaced, but the Conn struct is reused // On handoff, the network connection is replaced, but the Conn struct is reused
// this flag will be set to false when the network connection is replaced and // this flag will be set to false when the network connection is replaced and
// set to true after the new network connection is initialized // set to true after the new network connection is initialized
Inited atomic.Bool inited atomic.Bool
pooled bool initializing atomic.Bool
pubsub bool pooled bool
closed atomic.Bool pubsub bool
createdAt time.Time closed atomic.Bool
expiresAt time.Time createdAt time.Time
expiresAt time.Time
// maintenanceNotifications upgrade support: relaxed timeouts during migrations/failovers // maintenanceNotifications upgrade support: relaxed timeouts during migrations/failovers
// Using atomic operations for lock-free access to avoid mutex contention // Using atomic operations for lock-free access to avoid mutex contention
@@ -306,7 +307,27 @@ func (cn *Conn) IsPubSub() bool {
} }
func (cn *Conn) IsInited() bool { func (cn *Conn) IsInited() bool {
return cn.Inited.Load() return cn.inited.Load()
}
func (cn *Conn) SetInited(inited bool) {
cn.inited.Store(inited)
}
func (cn *Conn) CompareAndSwapInited(old, new bool) bool {
return cn.inited.CompareAndSwap(old, new)
}
func (cn *Conn) IsInitializing() bool {
return cn.initializing.Load()
}
func (cn *Conn) SetInitializing(initializing bool) {
cn.initializing.Store(initializing)
}
func (cn *Conn) CompareAndSwapInitializing(old, new bool) bool {
return cn.initializing.CompareAndSwap(old, new)
} }
// SetRelaxedTimeout sets relaxed timeouts for this connection during maintenanceNotifications upgrades. // SetRelaxedTimeout sets relaxed timeouts for this connection during maintenanceNotifications upgrades.
@@ -478,8 +499,17 @@ func (cn *Conn) GetNetConn() net.Conn {
// SetNetConnAndInitConn replaces the underlying connection and executes the initialization. // SetNetConnAndInitConn replaces the underlying connection and executes the initialization.
func (cn *Conn) SetNetConnAndInitConn(ctx context.Context, netConn net.Conn) error { func (cn *Conn) SetNetConnAndInitConn(ctx context.Context, netConn net.Conn) error {
// max retries of 100ms * 20 = 2 second
maxRetries := 20
for cn.IsInitializing() || cn.IsUsed() {
time.Sleep(100 * time.Millisecond)
maxRetries--
if maxRetries <= 0 {
return fmt.Errorf("failed to set net conn after %d attempts due to high contention", maxRetries)
}
}
// New connection is not initialized yet // New connection is not initialized yet
cn.Inited.Store(false) cn.SetInited(false)
// Replace the underlying connection // Replace the underlying connection
cn.SetNetConn(netConn) cn.SetNetConn(netConn)
return cn.ExecuteInitConn(ctx) return cn.ExecuteInitConn(ctx)

View File

@@ -366,9 +366,20 @@ func (c *baseClient) wrappedOnClose(newOnClose func() error) func() error {
} }
func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error { func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
if !cn.Inited.CompareAndSwap(false, true) { if !cn.CompareAndSwapInited(false, true) {
return nil return nil
} }
defer func() {
// if the initialization did not complete successfully
// we need to mark the connection as not initialized
if cn.CompareAndSwapInitializing(true, false) {
internal.Logger.Printf(ctx, "redis: failed to initialize connection conn[%d]", cn.GetID())
cn.SetInited(false)
}
}()
cn.SetInitializing(true)
var err error var err error
connPool := pool.NewSingleConnPool(c.connPool, cn) connPool := pool.NewSingleConnPool(c.connPool, cn)
conn := newConn(c.opt, connPool, &c.hooksMixin) conn := newConn(c.opt, connPool, &c.hooksMixin)
@@ -510,14 +521,14 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
} }
} }
// Set the connection initialization function for potential reconnections
cn.SetInitConnFunc(c.createInitConnFunc())
// mark the connection as usable and inited // mark the connection as usable and inited
// once returned to the pool as idle, this connection can be used by other clients // once returned to the pool as idle, this connection can be used by other clients
cn.SetUsable(true) cn.SetUsable(true)
cn.SetUsed(false) cn.SetUsed(false)
cn.Inited.Store(true) cn.SetInitializing(false)
// Set the connection initialization function for potential reconnections
cn.SetInitConnFunc(c.createInitConnFunc())
if c.opt.OnConnect != nil { if c.opt.OnConnect != nil {
return c.opt.OnConnect(ctx, conn) return c.opt.OnConnect(ctx, conn)