1
0
mirror of https://github.com/redis/go-redis.git synced 2025-12-02 06:22:31 +03:00

wip, used and unusable states

This commit is contained in:
Nedyalko Dyakov
2025-10-23 17:09:22 +03:00
parent 27591cd045
commit 606264ef7f
5 changed files with 581 additions and 115 deletions

View File

@@ -182,9 +182,9 @@ func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool,
var err error var err error
timeout := time.After(r.reAuthTimeout) timeout := time.After(r.reAuthTimeout)
// Try to acquire the connection // Try to acquire the connection for re-authentication
// We need to ensure the connection is both Usable and not Used // We need to ensure the connection is IDLE (not IN_USE) before transitioning to UNUSABLE
// to prevent data races with concurrent operations // This prevents re-authentication from interfering with active commands
const baseDelay = 10 * time.Microsecond const baseDelay = 10 * time.Microsecond
acquired := false acquired := false
attempt := 0 attempt := 0
@@ -196,15 +196,14 @@ func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool,
reAuthFn(err) reAuthFn(err)
return return
default: default:
// Try to acquire: set Usable=false, then check Used // Try to atomically transition from IDLE to UNUSABLE
if conn.CompareAndSwapUsable(true, false) { // This ensures we only acquire connections that are not actively in use
if !conn.IsUsed() { stateMachine := conn.GetStateMachine()
if stateMachine != nil {
err := stateMachine.TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable)
if err == nil {
// Successfully acquired: connection was IDLE, now UNUSABLE
acquired = true acquired = true
} else {
// Release Usable and retry with exponential backoff
// todo(ndyakov): think of a better way to do this without the need
// to release the connection, but just wait till it is not used
conn.SetUsable(true)
} }
} }
if !acquired { if !acquired {
@@ -222,8 +221,11 @@ func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool,
reAuthFn(nil) reAuthFn(nil)
} }
// Release the connection // Release the connection: transition from UNUSABLE back to IDLE
conn.SetUsable(true) stateMachine := conn.GetStateMachine()
if stateMachine != nil {
stateMachine.Transition(pool.StateIdle)
}
}() }()
} }

View File

@@ -0,0 +1,241 @@
package streaming
import (
"sync"
"sync/atomic"
"testing"
"time"
"github.com/redis/go-redis/v9/internal/pool"
)
// TestReAuthOnlyWhenIdle verifies that re-authentication only happens when
// a connection is in IDLE state, not when it's IN_USE.
func TestReAuthOnlyWhenIdle(t *testing.T) {
// Create a connection
cn := pool.NewConn(nil)
// Initialize to IDLE state
cn.GetStateMachine().Transition(pool.StateInitializing)
cn.GetStateMachine().Transition(pool.StateIdle)
// Simulate connection being acquired (IDLE → IN_USE)
if !cn.CompareAndSwapUsed(false, true) {
t.Fatal("Failed to acquire connection")
}
// Verify state is IN_USE
if state := cn.GetStateMachine().GetState(); state != pool.StateInUse {
t.Errorf("Expected state IN_USE, got %s", state)
}
// Try to transition to UNUSABLE (for reauth) - should fail
err := cn.GetStateMachine().TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable)
if err == nil {
t.Error("Expected error when trying to transition IN_USE → UNUSABLE, but got none")
}
// Verify state is still IN_USE
if state := cn.GetStateMachine().GetState(); state != pool.StateInUse {
t.Errorf("Expected state to remain IN_USE, got %s", state)
}
// Release connection (IN_USE → IDLE)
if !cn.CompareAndSwapUsed(true, false) {
t.Fatal("Failed to release connection")
}
// Verify state is IDLE
if state := cn.GetStateMachine().GetState(); state != pool.StateIdle {
t.Errorf("Expected state IDLE, got %s", state)
}
// Now try to transition to UNUSABLE - should succeed
err = cn.GetStateMachine().TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable)
if err != nil {
t.Errorf("Failed to transition IDLE → UNUSABLE: %v", err)
}
// Verify state is UNUSABLE
if state := cn.GetStateMachine().GetState(); state != pool.StateUnusable {
t.Errorf("Expected state UNUSABLE, got %s", state)
}
}
// TestReAuthWaitsForConnectionToBeIdle verifies that the re-auth worker
// waits for a connection to become IDLE before performing re-authentication.
func TestReAuthWaitsForConnectionToBeIdle(t *testing.T) {
// Create a connection
cn := pool.NewConn(nil)
// Initialize to IDLE state
cn.GetStateMachine().Transition(pool.StateInitializing)
cn.GetStateMachine().Transition(pool.StateIdle)
// Simulate connection being acquired (IDLE → IN_USE)
if !cn.CompareAndSwapUsed(false, true) {
t.Fatal("Failed to acquire connection")
}
// Track re-auth attempts
var reAuthAttempts atomic.Int32
var reAuthSucceeded atomic.Bool
// Start a goroutine that tries to acquire the connection for re-auth
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// Try to acquire for re-auth with timeout
timeout := time.After(2 * time.Second)
acquired := false
for !acquired {
select {
case <-timeout:
t.Error("Timeout waiting to acquire connection for re-auth")
return
default:
reAuthAttempts.Add(1)
// Try to atomically transition from IDLE to UNUSABLE
err := cn.GetStateMachine().TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable)
if err == nil {
// Successfully acquired
acquired = true
reAuthSucceeded.Store(true)
} else {
// Connection is still IN_USE, wait a bit
time.Sleep(10 * time.Millisecond)
}
}
}
// Release the connection
cn.GetStateMachine().Transition(pool.StateIdle)
}()
// Keep connection IN_USE for 500ms
time.Sleep(500 * time.Millisecond)
// Verify re-auth hasn't succeeded yet (connection is still IN_USE)
if reAuthSucceeded.Load() {
t.Error("Re-auth succeeded while connection was IN_USE")
}
// Verify there were multiple attempts
attempts := reAuthAttempts.Load()
if attempts < 2 {
t.Errorf("Expected multiple re-auth attempts, got %d", attempts)
}
// Release connection (IN_USE → IDLE)
if !cn.CompareAndSwapUsed(true, false) {
t.Fatal("Failed to release connection")
}
// Wait for re-auth to complete
wg.Wait()
// Verify re-auth succeeded after connection became IDLE
if !reAuthSucceeded.Load() {
t.Error("Re-auth did not succeed after connection became IDLE")
}
// Verify final state is IDLE
if state := cn.GetStateMachine().GetState(); state != pool.StateIdle {
t.Errorf("Expected final state IDLE, got %s", state)
}
}
// TestConcurrentReAuthAndUsage verifies that re-auth and normal usage
// don't interfere with each other.
func TestConcurrentReAuthAndUsage(t *testing.T) {
// Create a connection
cn := pool.NewConn(nil)
// Initialize to IDLE state
cn.GetStateMachine().Transition(pool.StateInitializing)
cn.GetStateMachine().Transition(pool.StateIdle)
var wg sync.WaitGroup
var usageCount atomic.Int32
var reAuthCount atomic.Int32
// Goroutine 1: Simulate normal usage (acquire/release)
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 100; i++ {
// Try to acquire
if cn.CompareAndSwapUsed(false, true) {
usageCount.Add(1)
// Simulate work
time.Sleep(1 * time.Millisecond)
// Release
cn.CompareAndSwapUsed(true, false)
}
time.Sleep(1 * time.Millisecond)
}
}()
// Goroutine 2: Simulate re-auth attempts
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < 50; i++ {
// Try to acquire for re-auth
err := cn.GetStateMachine().TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable)
if err == nil {
reAuthCount.Add(1)
// Simulate re-auth work
time.Sleep(2 * time.Millisecond)
// Release
cn.GetStateMachine().Transition(pool.StateIdle)
}
time.Sleep(2 * time.Millisecond)
}
}()
wg.Wait()
// Verify both operations happened
if usageCount.Load() == 0 {
t.Error("No successful usage operations")
}
if reAuthCount.Load() == 0 {
t.Error("No successful re-auth operations")
}
t.Logf("Usage operations: %d, Re-auth operations: %d", usageCount.Load(), reAuthCount.Load())
// Verify final state is IDLE
if state := cn.GetStateMachine().GetState(); state != pool.StateIdle {
t.Errorf("Expected final state IDLE, got %s", state)
}
}
// TestReAuthRespectsClosed verifies that re-auth doesn't happen on closed connections.
func TestReAuthRespectsClosed(t *testing.T) {
// Create a connection
cn := pool.NewConn(nil)
// Initialize to IDLE state
cn.GetStateMachine().Transition(pool.StateInitializing)
cn.GetStateMachine().Transition(pool.StateIdle)
// Close the connection
cn.GetStateMachine().Transition(pool.StateClosed)
// Try to transition to UNUSABLE - should fail
err := cn.GetStateMachine().TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable)
if err == nil {
t.Error("Expected error when trying to transition CLOSED → UNUSABLE, but got none")
}
// Verify state is still CLOSED
if state := cn.GetStateMachine().GetState(); state != pool.StateClosed {
t.Errorf("Expected state to remain CLOSED, got %s", state)
}
}

View File

@@ -58,8 +58,13 @@ type Conn struct {
readerMu sync.RWMutex readerMu sync.RWMutex
// State machine for connection state management // State machine for connection state management
// Replaces: usable, Inited // Replaces: usable, Inited, used
// Provides thread-safe state transitions with FIFO waiting queue // Provides thread-safe state transitions with FIFO waiting queue
// States: CREATED → INITIALIZING → IDLE ⇄ IN_USE
// ↓
// UNUSABLE (handoff/reauth)
// ↓
// IDLE/CLOSED
stateMachine *ConnStateMachine stateMachine *ConnStateMachine
// Handoff metadata - managed separately from state machine // Handoff metadata - managed separately from state machine
@@ -67,24 +72,6 @@ type Conn struct {
handoffStateAtomic atomic.Value // stores *HandoffState handoffStateAtomic atomic.Value // stores *HandoffState
handoffRetriesAtomic atomic.Uint32 // retry counter handoffRetriesAtomic atomic.Uint32 // retry counter
// Design note:
// Why have both State Machine and Used?
// _State Machine_ tracks the connection lifecycle (CREATED, INITIALIZING, READY, REAUTH_*, CLOSED)
// and determines if the connection is safe for use by clients. A connection can be in the pool but not
// in a usable state (e.g. reauth in progress).
// _Used_ is used to mark a connection as used when a command is going to be processed on that connection.
// this is going to happen once the connection is picked from the pool.
//
// If a background operation needs to use the connection, it will transition to an in-progress state
// (e.g. REAUTH_IN_PROGRESS) and only use it when it is not marked as used.
// That way, the connection won't be used to send multiple commands at the same time and
// potentially corrupt the command stream.
// used flag to mark connection as used when a command is going to be
// processed on that connection. This is used to prevent a race condition with
// background operations that may execute commands, like re-authentication.
used atomic.Bool
pooled bool pooled bool
pubsub bool pubsub bool
closed atomic.Bool closed atomic.Bool
@@ -161,12 +148,12 @@ func (cn *Conn) SetUsedAt(tm time.Time) {
// Returns true if the swap was successful (old value matched), false otherwise. // Returns true if the swap was successful (old value matched), false otherwise.
// //
// Implementation note: This is a compatibility wrapper around the state machine. // Implementation note: This is a compatibility wrapper around the state machine.
// It checks if the current state is "usable" (READY) and transitions accordingly. // It checks if the current state is "usable" (IDLE or IN_USE) and transitions accordingly.
func (cn *Conn) CompareAndSwapUsable(old, new bool) bool { func (cn *Conn) CompareAndSwapUsable(old, new bool) bool {
currentState := cn.stateMachine.GetState() currentState := cn.stateMachine.GetState()
// Check if current state matches the "old" usable value // Check if current state matches the "old" usable value
currentUsable := (currentState == StateReady) currentUsable := (currentState == StateIdle || currentState == StateInUse)
if currentUsable != old { if currentUsable != old {
return false return false
} }
@@ -178,18 +165,21 @@ func (cn *Conn) CompareAndSwapUsable(old, new bool) bool {
// Transition based on new value // Transition based on new value
if new { if new {
// Trying to make usable - transition to READY // Trying to make usable - transition from UNUSABLE to IDLE
// This should only work from certain states // This should only work from UNUSABLE or INITIALIZING states
err := cn.stateMachine.TryTransition( err := cn.stateMachine.TryTransition(
[]ConnState{StateInitializing, StateReauthInProgress}, []ConnState{StateInitializing, StateUnusable},
StateReady, StateIdle,
) )
return err == nil return err == nil
} else { } else {
// Trying to make unusable - this is typically for acquiring the connection // Trying to make unusable - transition from IDLE to UNUSABLE
// for background operations. We don't transition here, just return false // This is typically for acquiring the connection for background operations
// since the caller should use proper state transitions. err := cn.stateMachine.TryTransition(
return false []ConnState{StateIdle},
StateUnusable,
)
return err == nil
} }
} }
@@ -203,8 +193,9 @@ func (cn *Conn) CompareAndSwapUsable(old, new bool) bool {
// - Other background operations that need exclusive access // - Other background operations that need exclusive access
func (cn *Conn) IsUsable() bool { func (cn *Conn) IsUsable() bool {
state := cn.stateMachine.GetState() state := cn.stateMachine.GetState()
// Only READY state is considered usable // IDLE and IN_USE states are considered usable
return state == StateReady // (IN_USE means it's usable but currently acquired by someone)
return state == StateIdle || state == StateInUse
} }
// SetUsable sets the usable flag for the connection (lock-free). // SetUsable sets the usable flag for the connection (lock-free).
@@ -215,13 +206,11 @@ func (cn *Conn) IsUsable() bool {
// Prefer CompareAndSwapUsable() when acquiring exclusive access to avoid race conditions. // Prefer CompareAndSwapUsable() when acquiring exclusive access to avoid race conditions.
func (cn *Conn) SetUsable(usable bool) { func (cn *Conn) SetUsable(usable bool) {
if usable { if usable {
// Transition to READY state // Transition to IDLE state (ready to be acquired)
cn.stateMachine.Transition(StateReady) cn.stateMachine.Transition(StateIdle)
} else { } else {
// This is ambiguous - we don't know which "unusable" state to transition to // Transition to UNUSABLE state (for background operations)
// For now, we'll just log a warning and not transition cn.stateMachine.Transition(StateUnusable)
// Callers should use proper state machine transitions instead
internal.Logger.Printf(context.Background(), "SetUsable(false) called on conn[%d] - use state machine transitions instead", cn.id)
} }
} }
@@ -233,16 +222,33 @@ func (cn *Conn) IsInited() bool {
return state != StateCreated && state != StateInitializing && state != StateClosed return state != StateCreated && state != StateInitializing && state != StateClosed
} }
// Used // Used - State machine based implementation
// CompareAndSwapUsed atomically compares and swaps the used flag (lock-free). // CompareAndSwapUsed atomically compares and swaps the used flag (lock-free).
// //
// This is the preferred method for acquiring a connection from the pool, as it // This is the preferred method for acquiring a connection from the pool, as it
// ensures that only one goroutine marks the connection as used. // ensures that only one goroutine marks the connection as used.
// //
// Implementation: Uses state machine transitions IDLE ⇄ IN_USE
//
// Returns true if the swap was successful (old value matched), false otherwise. // Returns true if the swap was successful (old value matched), false otherwise.
func (cn *Conn) CompareAndSwapUsed(old, new bool) bool { func (cn *Conn) CompareAndSwapUsed(old, new bool) bool {
return cn.used.CompareAndSwap(old, new) if old == new {
// No change needed
currentState := cn.stateMachine.GetState()
currentUsed := (currentState == StateInUse)
return currentUsed == old
}
if !old && new {
// Acquiring: IDLE → IN_USE
err := cn.stateMachine.TryTransition([]ConnState{StateIdle}, StateInUse)
return err == nil
} else {
// Releasing: IN_USE → IDLE
err := cn.stateMachine.TryTransition([]ConnState{StateInUse}, StateIdle)
return err == nil
}
} }
// IsUsed returns true if the connection is currently in use (lock-free). // IsUsed returns true if the connection is currently in use (lock-free).
@@ -251,7 +257,7 @@ func (cn *Conn) CompareAndSwapUsed(old, new bool) bool {
// actively processing a command. Background operations (like re-auth) should // actively processing a command. Background operations (like re-auth) should
// wait until the connection is not used before executing commands. // wait until the connection is not used before executing commands.
func (cn *Conn) IsUsed() bool { func (cn *Conn) IsUsed() bool {
return cn.used.Load() return cn.stateMachine.GetState() == StateInUse
} }
// SetUsed sets the used flag for the connection (lock-free). // SetUsed sets the used flag for the connection (lock-free).
@@ -262,7 +268,11 @@ func (cn *Conn) IsUsed() bool {
// Prefer CompareAndSwapUsed() when acquiring from a multi-connection pool to // Prefer CompareAndSwapUsed() when acquiring from a multi-connection pool to
// avoid race conditions. // avoid race conditions.
func (cn *Conn) SetUsed(val bool) { func (cn *Conn) SetUsed(val bool) {
cn.used.Store(val) if val {
cn.stateMachine.Transition(StateInUse)
} else {
cn.stateMachine.Transition(StateIdle)
}
} }
// getNetConn returns the current network connection using atomic load (lock-free). // getNetConn returns the current network connection using atomic load (lock-free).
@@ -514,11 +524,11 @@ func (cn *Conn) GetNetConn() net.Conn {
// If another goroutine is currently initializing, this will wait for it to complete. // If another goroutine is currently initializing, this will wait for it to complete.
func (cn *Conn) SetNetConnAndInitConn(ctx context.Context, netConn net.Conn) error { func (cn *Conn) SetNetConnAndInitConn(ctx context.Context, netConn net.Conn) error {
// Wait for and transition to INITIALIZING state - this prevents concurrent initializations // Wait for and transition to INITIALIZING state - this prevents concurrent initializations
// Valid from states: CREATED (first init), READY (handoff/reconnect), REAUTH_IN_PROGRESS (after reauth) // Valid from states: CREATED (first init), IDLE (reconnect), UNUSABLE (handoff/reauth)
// If another goroutine is initializing, we'll wait for it to finish // If another goroutine is initializing, we'll wait for it to finish
err := cn.stateMachine.AwaitAndTransition( err := cn.stateMachine.AwaitAndTransition(
ctx, ctx,
[]ConnState{StateCreated, StateReady, StateReauthInProgress}, []ConnState{StateCreated, StateIdle, StateUnusable},
StateInitializing, StateInitializing,
) )
if err != nil { if err != nil {
@@ -536,8 +546,8 @@ func (cn *Conn) SetNetConnAndInitConn(ctx context.Context, netConn net.Conn) err
return initErr return initErr
} }
// Initialization succeeded - transition to READY // Initialization succeeded - transition to IDLE (ready to be acquired)
cn.stateMachine.Transition(StateReady) cn.stateMachine.Transition(StateIdle)
return nil return nil
} }

View File

@@ -11,6 +11,13 @@ import (
// ConnState represents the connection state in the state machine. // ConnState represents the connection state in the state machine.
// States are designed to be lightweight and fast to check. // States are designed to be lightweight and fast to check.
//
// State Transitions:
// CREATED → INITIALIZING → IDLE ⇄ IN_USE
// ↓
// UNUSABLE (handoff/reauth)
// ↓
// IDLE/CLOSED
type ConnState uint32 type ConnState uint32
const ( const (
@@ -20,11 +27,15 @@ const (
// StateInitializing - Connection initialization in progress // StateInitializing - Connection initialization in progress
StateInitializing StateInitializing
// StateReady - Connection ready for use // StateIdle - Connection initialized and idle in pool, ready to be acquired
StateReady StateIdle
// StateReauthInProgress - Reauth actively being processed // StateInUse - Connection actively processing a command (retrieved from pool)
StateReauthInProgress StateInUse
// StateUnusable - Connection temporarily unusable due to background operation
// (handoff, reauth, etc.). Cannot be acquired from pool.
StateUnusable
// StateClosed - Connection closed // StateClosed - Connection closed
StateClosed StateClosed
@@ -37,10 +48,12 @@ func (s ConnState) String() string {
return "CREATED" return "CREATED"
case StateInitializing: case StateInitializing:
return "INITIALIZING" return "INITIALIZING"
case StateReady: case StateIdle:
return "READY" return "IDLE"
case StateReauthInProgress: case StateInUse:
return "REAUTH_IN_PROGRESS" return "IN_USE"
case StateUnusable:
return "UNUSABLE"
case StateClosed: case StateClosed:
return "CLOSED" return "CLOSED"
default: default:

View File

@@ -10,7 +10,7 @@ import (
func TestConnStateMachine_GetState(t *testing.T) { func TestConnStateMachine_GetState(t *testing.T) {
sm := NewConnStateMachine() sm := NewConnStateMachine()
if state := sm.GetState(); state != StateCreated { if state := sm.GetState(); state != StateCreated {
t.Errorf("expected initial state to be CREATED, got %s", state) t.Errorf("expected initial state to be CREATED, got %s", state)
} }
@@ -18,16 +18,16 @@ func TestConnStateMachine_GetState(t *testing.T) {
func TestConnStateMachine_Transition(t *testing.T) { func TestConnStateMachine_Transition(t *testing.T) {
sm := NewConnStateMachine() sm := NewConnStateMachine()
// Unconditional transition // Unconditional transition
sm.Transition(StateInitializing) sm.Transition(StateInitializing)
if state := sm.GetState(); state != StateInitializing { if state := sm.GetState(); state != StateInitializing {
t.Errorf("expected state to be INITIALIZING, got %s", state) t.Errorf("expected state to be INITIALIZING, got %s", state)
} }
sm.Transition(StateReady) sm.Transition(StateIdle)
if state := sm.GetState(); state != StateReady { if state := sm.GetState(); state != StateIdle {
t.Errorf("expected state to be READY, got %s", state) t.Errorf("expected state to be IDLE, got %s", state)
} }
} }
@@ -47,42 +47,42 @@ func TestConnStateMachine_TryTransition(t *testing.T) {
expectError: false, expectError: false,
}, },
{ {
name: "invalid transition from CREATED to READY", name: "invalid transition from CREATED to IDLE",
initialState: StateCreated, initialState: StateCreated,
validStates: []ConnState{StateInitializing}, validStates: []ConnState{StateInitializing},
targetState: StateReady, targetState: StateIdle,
expectError: true, expectError: true,
}, },
{ {
name: "transition to same state", name: "transition to same state",
initialState: StateReady, initialState: StateIdle,
validStates: []ConnState{StateReady}, validStates: []ConnState{StateIdle},
targetState: StateReady, targetState: StateIdle,
expectError: false, expectError: false,
}, },
{ {
name: "multiple valid from states", name: "multiple valid from states",
initialState: StateReady, initialState: StateIdle,
validStates: []ConnState{StateInitializing, StateReady, StateReauthInProgress}, validStates: []ConnState{StateInitializing, StateIdle, StateUnusable},
targetState: StateReauthInProgress, targetState: StateUnusable,
expectError: false, expectError: false,
}, },
} }
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
sm := NewConnStateMachine() sm := NewConnStateMachine()
sm.Transition(tt.initialState) sm.Transition(tt.initialState)
err := sm.TryTransition(tt.validStates, tt.targetState) err := sm.TryTransition(tt.validStates, tt.targetState)
if tt.expectError && err == nil { if tt.expectError && err == nil {
t.Error("expected error but got none") t.Error("expected error but got none")
} }
if !tt.expectError && err != nil { if !tt.expectError && err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
if !tt.expectError { if !tt.expectError {
if state := sm.GetState(); state != tt.targetState { if state := sm.GetState(); state != tt.targetState {
t.Errorf("expected state %s, got %s", tt.targetState, state) t.Errorf("expected state %s, got %s", tt.targetState, state)
@@ -94,17 +94,17 @@ func TestConnStateMachine_TryTransition(t *testing.T) {
func TestConnStateMachine_AwaitAndTransition_FastPath(t *testing.T) { func TestConnStateMachine_AwaitAndTransition_FastPath(t *testing.T) {
sm := NewConnStateMachine() sm := NewConnStateMachine()
sm.Transition(StateReady) sm.Transition(StateIdle)
ctx := context.Background() ctx := context.Background()
// Fast path: already in valid state // Fast path: already in valid state
err := sm.AwaitAndTransition(ctx, []ConnState{StateReady}, StateReauthInProgress) err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateUnusable)
if err != nil { if err != nil {
t.Errorf("unexpected error: %v", err) t.Errorf("unexpected error: %v", err)
} }
if state := sm.GetState(); state != StateReauthInProgress { if state := sm.GetState(); state != StateUnusable {
t.Errorf("expected state REAUTH_IN_PROGRESS, got %s", state) t.Errorf("expected state REAUTH_IN_PROGRESS, got %s", state)
} }
} }
@@ -112,12 +112,12 @@ func TestConnStateMachine_AwaitAndTransition_FastPath(t *testing.T) {
func TestConnStateMachine_AwaitAndTransition_Timeout(t *testing.T) { func TestConnStateMachine_AwaitAndTransition_Timeout(t *testing.T) {
sm := NewConnStateMachine() sm := NewConnStateMachine()
sm.Transition(StateCreated) sm.Transition(StateCreated)
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
defer cancel() defer cancel()
// Wait for a state that will never come // Wait for a state that will never come
err := sm.AwaitAndTransition(ctx, []ConnState{StateReady}, StateReauthInProgress) err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateUnusable)
if err == nil { if err == nil {
t.Error("expected timeout error but got none") t.Error("expected timeout error but got none")
} }
@@ -150,7 +150,7 @@ func TestConnStateMachine_AwaitAndTransition_FIFO(t *testing.T) {
startBarrier.Wait() startBarrier.Wait()
ctx := context.Background() ctx := context.Background()
err := sm.AwaitAndTransition(ctx, []ConnState{StateReady}, StateReady) err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateIdle)
if err != nil { if err != nil {
t.Errorf("waiter %d got error: %v", waiterID, err) t.Errorf("waiter %d got error: %v", waiterID, err)
return return
@@ -161,7 +161,7 @@ func TestConnStateMachine_AwaitAndTransition_FIFO(t *testing.T) {
orderMu.Unlock() orderMu.Unlock()
// Transition back to READY for next waiter // Transition back to READY for next waiter
sm.Transition(StateReady) sm.Transition(StateIdle)
}() }()
} }
@@ -169,7 +169,7 @@ func TestConnStateMachine_AwaitAndTransition_FIFO(t *testing.T) {
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
// Transition to READY to start processing waiters // Transition to READY to start processing waiters
sm.Transition(StateReady) sm.Transition(StateIdle)
// Wait for all waiters to complete // Wait for all waiters to complete
wg.Wait() wg.Wait()
@@ -191,7 +191,7 @@ func TestConnStateMachine_AwaitAndTransition_FIFO(t *testing.T) {
func TestConnStateMachine_ConcurrentAccess(t *testing.T) { func TestConnStateMachine_ConcurrentAccess(t *testing.T) {
sm := NewConnStateMachine() sm := NewConnStateMachine()
sm.Transition(StateReady) sm.Transition(StateIdle)
const numGoroutines = 100 const numGoroutines = 100
const numIterations = 100 const numIterations = 100
@@ -206,11 +206,11 @@ func TestConnStateMachine_ConcurrentAccess(t *testing.T) {
for j := 0; j < numIterations; j++ { for j := 0; j < numIterations; j++ {
// Try to transition from READY to REAUTH_IN_PROGRESS // Try to transition from READY to REAUTH_IN_PROGRESS
err := sm.TryTransition([]ConnState{StateReady}, StateReauthInProgress) err := sm.TryTransition([]ConnState{StateIdle}, StateUnusable)
if err == nil { if err == nil {
successCount.Add(1) successCount.Add(1)
// Transition back to READY // Transition back to READY
sm.Transition(StateReady) sm.Transition(StateIdle)
} }
// Read state (hot path) // Read state (hot path)
@@ -238,8 +238,9 @@ func TestConnStateMachine_StateString(t *testing.T) {
}{ }{
{StateCreated, "CREATED"}, {StateCreated, "CREATED"},
{StateInitializing, "INITIALIZING"}, {StateInitializing, "INITIALIZING"},
{StateReady, "READY"}, {StateIdle, "IDLE"},
{StateReauthInProgress, "REAUTH_IN_PROGRESS"}, {StateInUse, "IN_USE"},
{StateUnusable, "UNUSABLE"},
{StateClosed, "CLOSED"}, {StateClosed, "CLOSED"},
{ConnState(999), "UNKNOWN(999)"}, {ConnState(999), "UNKNOWN(999)"},
} }
@@ -255,8 +256,8 @@ func TestConnStateMachine_StateString(t *testing.T) {
func BenchmarkConnStateMachine_GetState(b *testing.B) { func BenchmarkConnStateMachine_GetState(b *testing.B) {
sm := NewConnStateMachine() sm := NewConnStateMachine()
sm.Transition(StateReady) sm.Transition(StateIdle)
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_ = sm.GetState() _ = sm.GetState()
@@ -265,7 +266,7 @@ func BenchmarkConnStateMachine_GetState(b *testing.B) {
func TestConnStateMachine_PreventsConcurrentInitialization(t *testing.T) { func TestConnStateMachine_PreventsConcurrentInitialization(t *testing.T) {
sm := NewConnStateMachine() sm := NewConnStateMachine()
sm.Transition(StateReady) sm.Transition(StateIdle)
const numGoroutines = 10 const numGoroutines = 10
var inInitializing atomic.Int32 var inInitializing atomic.Int32
@@ -286,7 +287,7 @@ func TestConnStateMachine_PreventsConcurrentInitialization(t *testing.T) {
startBarrier.Wait() startBarrier.Wait()
// Try to transition to INITIALIZING // Try to transition to INITIALIZING
err := sm.TryTransition([]ConnState{StateReady}, StateInitializing) err := sm.TryTransition([]ConnState{StateIdle}, StateInitializing)
if err == nil { if err == nil {
successCount.Add(1) successCount.Add(1)
@@ -310,7 +311,7 @@ func TestConnStateMachine_PreventsConcurrentInitialization(t *testing.T) {
inInitializing.Add(-1) inInitializing.Add(-1)
// Transition back to READY // Transition back to READY
sm.Transition(StateReady) sm.Transition(StateIdle)
} else { } else {
t.Logf("Goroutine %d: failed to enter INITIALIZING - %v", id, err) t.Logf("Goroutine %d: failed to enter INITIALIZING - %v", id, err)
} }
@@ -329,7 +330,7 @@ func TestConnStateMachine_PreventsConcurrentInitialization(t *testing.T) {
func TestConnStateMachine_AwaitAndTransitionWaitsForInitialization(t *testing.T) { func TestConnStateMachine_AwaitAndTransitionWaitsForInitialization(t *testing.T) {
sm := NewConnStateMachine() sm := NewConnStateMachine()
sm.Transition(StateReady) sm.Transition(StateIdle)
const numGoroutines = 5 const numGoroutines = 5
var completedCount atomic.Int32 var completedCount atomic.Int32
@@ -352,7 +353,7 @@ func TestConnStateMachine_AwaitAndTransitionWaitsForInitialization(t *testing.T)
ctx := context.Background() ctx := context.Background()
// Try to transition to INITIALIZING - should wait if another is initializing // Try to transition to INITIALIZING - should wait if another is initializing
err := sm.AwaitAndTransition(ctx, []ConnState{StateReady}, StateInitializing) err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateInitializing)
if err != nil { if err != nil {
t.Errorf("Goroutine %d: failed to transition: %v", id, err) t.Errorf("Goroutine %d: failed to transition: %v", id, err)
return return
@@ -369,7 +370,7 @@ func TestConnStateMachine_AwaitAndTransitionWaitsForInitialization(t *testing.T)
time.Sleep(10 * time.Millisecond) time.Sleep(10 * time.Millisecond)
// Transition back to READY // Transition back to READY
sm.Transition(StateReady) sm.Transition(StateIdle)
completedCount.Add(1) completedCount.Add(1)
t.Logf("Goroutine %d: completed initialization (total=%d)", id, completedCount.Load()) t.Logf("Goroutine %d: completed initialization (total=%d)", id, completedCount.Load())
@@ -384,7 +385,7 @@ func TestConnStateMachine_AwaitAndTransitionWaitsForInitialization(t *testing.T)
} }
// Final state should be READY // Final state should be READY
if sm.GetState() != StateReady { if sm.GetState() != StateIdle {
t.Errorf("expected final state READY, got %s", sm.GetState()) t.Errorf("expected final state READY, got %s", sm.GetState())
} }
@@ -418,7 +419,7 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// This should queue in FIFO order // This should queue in FIFO order
err := sm.AwaitAndTransition(ctx, []ConnState{StateReady}, StateInitializing) err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateInitializing)
if err != nil { if err != nil {
t.Errorf("Goroutine %d: failed to transition: %v", id, err) t.Errorf("Goroutine %d: failed to transition: %v", id, err)
return return
@@ -432,7 +433,7 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) {
t.Logf("Goroutine %d: executed (position %d)", id, len(executionOrder)) t.Logf("Goroutine %d: executed (position %d)", id, len(executionOrder))
// Transition back to READY to allow next waiter // Transition back to READY to allow next waiter
sm.Transition(StateReady) sm.Transition(StateIdle)
}(i) }(i)
} }
@@ -440,7 +441,7 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) {
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
// Transition to READY to start processing the queue // Transition to READY to start processing the queue
sm.Transition(StateReady) sm.Transition(StateIdle)
wg.Wait() wg.Wait()
@@ -456,7 +457,7 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) {
func TestConnStateMachine_FIFOWithFastPath(t *testing.T) { func TestConnStateMachine_FIFOWithFastPath(t *testing.T) {
sm := NewConnStateMachine() sm := NewConnStateMachine()
sm.Transition(StateReady) // Start in READY so fast path is available sm.Transition(StateIdle) // Start in READY so fast path is available
const numGoroutines = 10 const numGoroutines = 10
var executionOrder []int var executionOrder []int
@@ -481,7 +482,7 @@ func TestConnStateMachine_FIFOWithFastPath(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// This might use fast path (CAS) or slow path (queue) // This might use fast path (CAS) or slow path (queue)
err := sm.AwaitAndTransition(ctx, []ConnState{StateReady}, StateInitializing) err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateInitializing)
if err != nil { if err != nil {
t.Errorf("Goroutine %d: failed to transition: %v", id, err) t.Errorf("Goroutine %d: failed to transition: %v", id, err)
return return
@@ -498,7 +499,7 @@ func TestConnStateMachine_FIFOWithFastPath(t *testing.T) {
time.Sleep(5 * time.Millisecond) time.Sleep(5 * time.Millisecond)
// Transition back to READY to allow next waiter // Transition back to READY to allow next waiter
sm.Transition(StateReady) sm.Transition(StateIdle)
}(i) }(i)
} }
@@ -523,12 +524,211 @@ func TestConnStateMachine_FIFOWithFastPath(t *testing.T) {
func BenchmarkConnStateMachine_TryTransition(b *testing.B) { func BenchmarkConnStateMachine_TryTransition(b *testing.B) {
sm := NewConnStateMachine() sm := NewConnStateMachine()
sm.Transition(StateReady) sm.Transition(StateIdle)
b.ResetTimer() b.ResetTimer()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_ = sm.TryTransition([]ConnState{StateReady}, StateReauthInProgress) _ = sm.TryTransition([]ConnState{StateIdle}, StateUnusable)
sm.Transition(StateReady) sm.Transition(StateIdle)
} }
} }
func TestConnStateMachine_IdleInUseTransitions(t *testing.T) {
sm := NewConnStateMachine()
// Initialize to IDLE state
sm.Transition(StateInitializing)
sm.Transition(StateIdle)
// Test IDLE → IN_USE transition
err := sm.TryTransition([]ConnState{StateIdle}, StateInUse)
if err != nil {
t.Errorf("failed to transition from IDLE to IN_USE: %v", err)
}
if state := sm.GetState(); state != StateInUse {
t.Errorf("expected state IN_USE, got %s", state)
}
// Test IN_USE → IDLE transition
err = sm.TryTransition([]ConnState{StateInUse}, StateIdle)
if err != nil {
t.Errorf("failed to transition from IN_USE to IDLE: %v", err)
}
if state := sm.GetState(); state != StateIdle {
t.Errorf("expected state IDLE, got %s", state)
}
// Test concurrent acquisition (only one should succeed)
sm.Transition(StateIdle)
var successCount atomic.Int32
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
err := sm.TryTransition([]ConnState{StateIdle}, StateInUse)
if err == nil {
successCount.Add(1)
}
}()
}
wg.Wait()
if count := successCount.Load(); count != 1 {
t.Errorf("expected exactly 1 successful transition, got %d", count)
}
if state := sm.GetState(); state != StateInUse {
t.Errorf("expected final state IN_USE, got %s", state)
}
}
func TestConn_UsedMethods(t *testing.T) {
cn := NewConn(nil)
// Initialize connection to IDLE state
cn.stateMachine.Transition(StateInitializing)
cn.stateMachine.Transition(StateIdle)
// Test IsUsed - should be false when IDLE
if cn.IsUsed() {
t.Error("expected IsUsed to be false for IDLE connection")
}
// Test CompareAndSwapUsed - acquire connection
if !cn.CompareAndSwapUsed(false, true) {
t.Error("failed to acquire connection with CompareAndSwapUsed")
}
// Test IsUsed - should be true when IN_USE
if !cn.IsUsed() {
t.Error("expected IsUsed to be true for IN_USE connection")
}
// Test CompareAndSwapUsed - release connection
if !cn.CompareAndSwapUsed(true, false) {
t.Error("failed to release connection with CompareAndSwapUsed")
}
// Test IsUsed - should be false again
if cn.IsUsed() {
t.Error("expected IsUsed to be false after release")
}
// Test SetUsed
cn.SetUsed(true)
if !cn.IsUsed() {
t.Error("expected IsUsed to be true after SetUsed(true)")
}
cn.SetUsed(false)
if cn.IsUsed() {
t.Error("expected IsUsed to be false after SetUsed(false)")
}
}
func TestConnStateMachine_UnusableState(t *testing.T) {
sm := NewConnStateMachine()
// Initialize to IDLE state
sm.Transition(StateInitializing)
sm.Transition(StateIdle)
// Test IDLE → UNUSABLE transition (for background operations)
err := sm.TryTransition([]ConnState{StateIdle}, StateUnusable)
if err != nil {
t.Errorf("failed to transition from IDLE to UNUSABLE: %v", err)
}
if state := sm.GetState(); state != StateUnusable {
t.Errorf("expected state UNUSABLE, got %s", state)
}
// Test UNUSABLE → IDLE transition (after background operation completes)
err = sm.TryTransition([]ConnState{StateUnusable}, StateIdle)
if err != nil {
t.Errorf("failed to transition from UNUSABLE to IDLE: %v", err)
}
if state := sm.GetState(); state != StateIdle {
t.Errorf("expected state IDLE, got %s", state)
}
// Test that we can transition from IN_USE to UNUSABLE if needed
// (e.g., for urgent handoff while connection is in use)
sm.Transition(StateInUse)
err = sm.TryTransition([]ConnState{StateInUse}, StateUnusable)
if err != nil {
t.Errorf("failed to transition from IN_USE to UNUSABLE: %v", err)
}
if state := sm.GetState(); state != StateUnusable {
t.Errorf("expected state UNUSABLE, got %s", state)
}
// Test UNUSABLE → INITIALIZING transition (for handoff)
sm.Transition(StateIdle)
sm.Transition(StateUnusable)
err = sm.TryTransition([]ConnState{StateUnusable}, StateInitializing)
if err != nil {
t.Errorf("failed to transition from UNUSABLE to INITIALIZING: %v", err)
}
if state := sm.GetState(); state != StateInitializing {
t.Errorf("expected state INITIALIZING, got %s", state)
}
}
func TestConn_UsableUnusable(t *testing.T) {
cn := NewConn(nil)
// Initialize connection to IDLE state
cn.stateMachine.Transition(StateInitializing)
cn.stateMachine.Transition(StateIdle)
// Test IsUsable - should be true when IDLE
if !cn.IsUsable() {
t.Error("expected IsUsable to be true for IDLE connection")
}
// Test CompareAndSwapUsable - make unusable for background operation
if !cn.CompareAndSwapUsable(true, false) {
t.Error("failed to make connection unusable with CompareAndSwapUsable")
}
// Verify state is UNUSABLE
if state := cn.stateMachine.GetState(); state != StateUnusable {
t.Errorf("expected state UNUSABLE, got %s", state)
}
// Test IsUsable - should be false when UNUSABLE
if cn.IsUsable() {
t.Error("expected IsUsable to be false for UNUSABLE connection")
}
// Test CompareAndSwapUsable - make usable again
if !cn.CompareAndSwapUsable(false, true) {
t.Error("failed to make connection usable with CompareAndSwapUsable")
}
// Verify state is IDLE
if state := cn.stateMachine.GetState(); state != StateIdle {
t.Errorf("expected state IDLE, got %s", state)
}
// Test SetUsable(false)
cn.SetUsable(false)
if state := cn.stateMachine.GetState(); state != StateUnusable {
t.Errorf("expected state UNUSABLE after SetUsable(false), got %s", state)
}
// Test SetUsable(true)
cn.SetUsable(true)
if state := cn.stateMachine.GetState(); state != StateIdle {
t.Errorf("expected state IDLE after SetUsable(true), got %s", state)
}
}