From 606264ef7f123d691dc401ba5d2ce6ce2d19f111 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Thu, 23 Oct 2025 17:09:22 +0300 Subject: [PATCH] wip, used and unusable states --- internal/auth/streaming/pool_hook.go | 28 +- .../auth/streaming/pool_hook_state_test.go | 241 ++++++++++++++ internal/pool/conn.go | 100 +++--- internal/pool/conn_state.go | 29 +- internal/pool/conn_state_test.go | 298 +++++++++++++++--- 5 files changed, 581 insertions(+), 115 deletions(-) create mode 100644 internal/auth/streaming/pool_hook_state_test.go diff --git a/internal/auth/streaming/pool_hook.go b/internal/auth/streaming/pool_hook.go index c135e169..b29d7905 100644 --- a/internal/auth/streaming/pool_hook.go +++ b/internal/auth/streaming/pool_hook.go @@ -182,9 +182,9 @@ func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool, var err error timeout := time.After(r.reAuthTimeout) - // Try to acquire the connection - // We need to ensure the connection is both Usable and not Used - // to prevent data races with concurrent operations + // Try to acquire the connection for re-authentication + // We need to ensure the connection is IDLE (not IN_USE) before transitioning to UNUSABLE + // This prevents re-authentication from interfering with active commands const baseDelay = 10 * time.Microsecond acquired := false attempt := 0 @@ -196,15 +196,14 @@ func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool, reAuthFn(err) return default: - // Try to acquire: set Usable=false, then check Used - if conn.CompareAndSwapUsable(true, false) { - if !conn.IsUsed() { + // Try to atomically transition from IDLE to UNUSABLE + // This ensures we only acquire connections that are not actively in use + stateMachine := conn.GetStateMachine() + if stateMachine != nil { + err := stateMachine.TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable) + if err == nil { + // Successfully acquired: connection was IDLE, now UNUSABLE acquired = true - } else { - // Release Usable and retry with exponential backoff - // todo(ndyakov): think of a better way to do this without the need - // to release the connection, but just wait till it is not used - conn.SetUsable(true) } } if !acquired { @@ -222,8 +221,11 @@ func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool, reAuthFn(nil) } - // Release the connection - conn.SetUsable(true) + // Release the connection: transition from UNUSABLE back to IDLE + stateMachine := conn.GetStateMachine() + if stateMachine != nil { + stateMachine.Transition(pool.StateIdle) + } }() } diff --git a/internal/auth/streaming/pool_hook_state_test.go b/internal/auth/streaming/pool_hook_state_test.go new file mode 100644 index 00000000..3160f0f5 --- /dev/null +++ b/internal/auth/streaming/pool_hook_state_test.go @@ -0,0 +1,241 @@ +package streaming + +import ( + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/redis/go-redis/v9/internal/pool" +) + +// TestReAuthOnlyWhenIdle verifies that re-authentication only happens when +// a connection is in IDLE state, not when it's IN_USE. +func TestReAuthOnlyWhenIdle(t *testing.T) { + // Create a connection + cn := pool.NewConn(nil) + + // Initialize to IDLE state + cn.GetStateMachine().Transition(pool.StateInitializing) + cn.GetStateMachine().Transition(pool.StateIdle) + + // Simulate connection being acquired (IDLE → IN_USE) + if !cn.CompareAndSwapUsed(false, true) { + t.Fatal("Failed to acquire connection") + } + + // Verify state is IN_USE + if state := cn.GetStateMachine().GetState(); state != pool.StateInUse { + t.Errorf("Expected state IN_USE, got %s", state) + } + + // Try to transition to UNUSABLE (for reauth) - should fail + err := cn.GetStateMachine().TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable) + if err == nil { + t.Error("Expected error when trying to transition IN_USE → UNUSABLE, but got none") + } + + // Verify state is still IN_USE + if state := cn.GetStateMachine().GetState(); state != pool.StateInUse { + t.Errorf("Expected state to remain IN_USE, got %s", state) + } + + // Release connection (IN_USE → IDLE) + if !cn.CompareAndSwapUsed(true, false) { + t.Fatal("Failed to release connection") + } + + // Verify state is IDLE + if state := cn.GetStateMachine().GetState(); state != pool.StateIdle { + t.Errorf("Expected state IDLE, got %s", state) + } + + // Now try to transition to UNUSABLE - should succeed + err = cn.GetStateMachine().TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable) + if err != nil { + t.Errorf("Failed to transition IDLE → UNUSABLE: %v", err) + } + + // Verify state is UNUSABLE + if state := cn.GetStateMachine().GetState(); state != pool.StateUnusable { + t.Errorf("Expected state UNUSABLE, got %s", state) + } +} + +// TestReAuthWaitsForConnectionToBeIdle verifies that the re-auth worker +// waits for a connection to become IDLE before performing re-authentication. +func TestReAuthWaitsForConnectionToBeIdle(t *testing.T) { + // Create a connection + cn := pool.NewConn(nil) + + // Initialize to IDLE state + cn.GetStateMachine().Transition(pool.StateInitializing) + cn.GetStateMachine().Transition(pool.StateIdle) + + // Simulate connection being acquired (IDLE → IN_USE) + if !cn.CompareAndSwapUsed(false, true) { + t.Fatal("Failed to acquire connection") + } + + // Track re-auth attempts + var reAuthAttempts atomic.Int32 + var reAuthSucceeded atomic.Bool + + // Start a goroutine that tries to acquire the connection for re-auth + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + + // Try to acquire for re-auth with timeout + timeout := time.After(2 * time.Second) + acquired := false + + for !acquired { + select { + case <-timeout: + t.Error("Timeout waiting to acquire connection for re-auth") + return + default: + reAuthAttempts.Add(1) + // Try to atomically transition from IDLE to UNUSABLE + err := cn.GetStateMachine().TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable) + if err == nil { + // Successfully acquired + acquired = true + reAuthSucceeded.Store(true) + } else { + // Connection is still IN_USE, wait a bit + time.Sleep(10 * time.Millisecond) + } + } + } + + // Release the connection + cn.GetStateMachine().Transition(pool.StateIdle) + }() + + // Keep connection IN_USE for 500ms + time.Sleep(500 * time.Millisecond) + + // Verify re-auth hasn't succeeded yet (connection is still IN_USE) + if reAuthSucceeded.Load() { + t.Error("Re-auth succeeded while connection was IN_USE") + } + + // Verify there were multiple attempts + attempts := reAuthAttempts.Load() + if attempts < 2 { + t.Errorf("Expected multiple re-auth attempts, got %d", attempts) + } + + // Release connection (IN_USE → IDLE) + if !cn.CompareAndSwapUsed(true, false) { + t.Fatal("Failed to release connection") + } + + // Wait for re-auth to complete + wg.Wait() + + // Verify re-auth succeeded after connection became IDLE + if !reAuthSucceeded.Load() { + t.Error("Re-auth did not succeed after connection became IDLE") + } + + // Verify final state is IDLE + if state := cn.GetStateMachine().GetState(); state != pool.StateIdle { + t.Errorf("Expected final state IDLE, got %s", state) + } +} + +// TestConcurrentReAuthAndUsage verifies that re-auth and normal usage +// don't interfere with each other. +func TestConcurrentReAuthAndUsage(t *testing.T) { + // Create a connection + cn := pool.NewConn(nil) + + // Initialize to IDLE state + cn.GetStateMachine().Transition(pool.StateInitializing) + cn.GetStateMachine().Transition(pool.StateIdle) + + var wg sync.WaitGroup + var usageCount atomic.Int32 + var reAuthCount atomic.Int32 + + // Goroutine 1: Simulate normal usage (acquire/release) + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 100; i++ { + // Try to acquire + if cn.CompareAndSwapUsed(false, true) { + usageCount.Add(1) + // Simulate work + time.Sleep(1 * time.Millisecond) + // Release + cn.CompareAndSwapUsed(true, false) + } + time.Sleep(1 * time.Millisecond) + } + }() + + // Goroutine 2: Simulate re-auth attempts + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < 50; i++ { + // Try to acquire for re-auth + err := cn.GetStateMachine().TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable) + if err == nil { + reAuthCount.Add(1) + // Simulate re-auth work + time.Sleep(2 * time.Millisecond) + // Release + cn.GetStateMachine().Transition(pool.StateIdle) + } + time.Sleep(2 * time.Millisecond) + } + }() + + wg.Wait() + + // Verify both operations happened + if usageCount.Load() == 0 { + t.Error("No successful usage operations") + } + if reAuthCount.Load() == 0 { + t.Error("No successful re-auth operations") + } + + t.Logf("Usage operations: %d, Re-auth operations: %d", usageCount.Load(), reAuthCount.Load()) + + // Verify final state is IDLE + if state := cn.GetStateMachine().GetState(); state != pool.StateIdle { + t.Errorf("Expected final state IDLE, got %s", state) + } +} + +// TestReAuthRespectsClosed verifies that re-auth doesn't happen on closed connections. +func TestReAuthRespectsClosed(t *testing.T) { + // Create a connection + cn := pool.NewConn(nil) + + // Initialize to IDLE state + cn.GetStateMachine().Transition(pool.StateInitializing) + cn.GetStateMachine().Transition(pool.StateIdle) + + // Close the connection + cn.GetStateMachine().Transition(pool.StateClosed) + + // Try to transition to UNUSABLE - should fail + err := cn.GetStateMachine().TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable) + if err == nil { + t.Error("Expected error when trying to transition CLOSED → UNUSABLE, but got none") + } + + // Verify state is still CLOSED + if state := cn.GetStateMachine().GetState(); state != pool.StateClosed { + t.Errorf("Expected state to remain CLOSED, got %s", state) + } +} + diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 976631e5..cd3503d5 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -58,8 +58,13 @@ type Conn struct { readerMu sync.RWMutex // State machine for connection state management - // Replaces: usable, Inited + // Replaces: usable, Inited, used // Provides thread-safe state transitions with FIFO waiting queue + // States: CREATED → INITIALIZING → IDLE ⇄ IN_USE + // ↓ + // UNUSABLE (handoff/reauth) + // ↓ + // IDLE/CLOSED stateMachine *ConnStateMachine // Handoff metadata - managed separately from state machine @@ -67,24 +72,6 @@ type Conn struct { handoffStateAtomic atomic.Value // stores *HandoffState handoffRetriesAtomic atomic.Uint32 // retry counter - // Design note: - // Why have both State Machine and Used? - // _State Machine_ tracks the connection lifecycle (CREATED, INITIALIZING, READY, REAUTH_*, CLOSED) - // and determines if the connection is safe for use by clients. A connection can be in the pool but not - // in a usable state (e.g. reauth in progress). - // _Used_ is used to mark a connection as used when a command is going to be processed on that connection. - // this is going to happen once the connection is picked from the pool. - // - // If a background operation needs to use the connection, it will transition to an in-progress state - // (e.g. REAUTH_IN_PROGRESS) and only use it when it is not marked as used. - // That way, the connection won't be used to send multiple commands at the same time and - // potentially corrupt the command stream. - - // used flag to mark connection as used when a command is going to be - // processed on that connection. This is used to prevent a race condition with - // background operations that may execute commands, like re-authentication. - used atomic.Bool - pooled bool pubsub bool closed atomic.Bool @@ -161,12 +148,12 @@ func (cn *Conn) SetUsedAt(tm time.Time) { // Returns true if the swap was successful (old value matched), false otherwise. // // Implementation note: This is a compatibility wrapper around the state machine. -// It checks if the current state is "usable" (READY) and transitions accordingly. +// It checks if the current state is "usable" (IDLE or IN_USE) and transitions accordingly. func (cn *Conn) CompareAndSwapUsable(old, new bool) bool { currentState := cn.stateMachine.GetState() // Check if current state matches the "old" usable value - currentUsable := (currentState == StateReady) + currentUsable := (currentState == StateIdle || currentState == StateInUse) if currentUsable != old { return false } @@ -178,18 +165,21 @@ func (cn *Conn) CompareAndSwapUsable(old, new bool) bool { // Transition based on new value if new { - // Trying to make usable - transition to READY - // This should only work from certain states + // Trying to make usable - transition from UNUSABLE to IDLE + // This should only work from UNUSABLE or INITIALIZING states err := cn.stateMachine.TryTransition( - []ConnState{StateInitializing, StateReauthInProgress}, - StateReady, + []ConnState{StateInitializing, StateUnusable}, + StateIdle, ) return err == nil } else { - // Trying to make unusable - this is typically for acquiring the connection - // for background operations. We don't transition here, just return false - // since the caller should use proper state transitions. - return false + // Trying to make unusable - transition from IDLE to UNUSABLE + // This is typically for acquiring the connection for background operations + err := cn.stateMachine.TryTransition( + []ConnState{StateIdle}, + StateUnusable, + ) + return err == nil } } @@ -203,8 +193,9 @@ func (cn *Conn) CompareAndSwapUsable(old, new bool) bool { // - Other background operations that need exclusive access func (cn *Conn) IsUsable() bool { state := cn.stateMachine.GetState() - // Only READY state is considered usable - return state == StateReady + // IDLE and IN_USE states are considered usable + // (IN_USE means it's usable but currently acquired by someone) + return state == StateIdle || state == StateInUse } // SetUsable sets the usable flag for the connection (lock-free). @@ -215,13 +206,11 @@ func (cn *Conn) IsUsable() bool { // Prefer CompareAndSwapUsable() when acquiring exclusive access to avoid race conditions. func (cn *Conn) SetUsable(usable bool) { if usable { - // Transition to READY state - cn.stateMachine.Transition(StateReady) + // Transition to IDLE state (ready to be acquired) + cn.stateMachine.Transition(StateIdle) } else { - // This is ambiguous - we don't know which "unusable" state to transition to - // For now, we'll just log a warning and not transition - // Callers should use proper state machine transitions instead - internal.Logger.Printf(context.Background(), "SetUsable(false) called on conn[%d] - use state machine transitions instead", cn.id) + // Transition to UNUSABLE state (for background operations) + cn.stateMachine.Transition(StateUnusable) } } @@ -233,16 +222,33 @@ func (cn *Conn) IsInited() bool { return state != StateCreated && state != StateInitializing && state != StateClosed } -// Used +// Used - State machine based implementation // CompareAndSwapUsed atomically compares and swaps the used flag (lock-free). // // This is the preferred method for acquiring a connection from the pool, as it // ensures that only one goroutine marks the connection as used. // +// Implementation: Uses state machine transitions IDLE ⇄ IN_USE +// // Returns true if the swap was successful (old value matched), false otherwise. func (cn *Conn) CompareAndSwapUsed(old, new bool) bool { - return cn.used.CompareAndSwap(old, new) + if old == new { + // No change needed + currentState := cn.stateMachine.GetState() + currentUsed := (currentState == StateInUse) + return currentUsed == old + } + + if !old && new { + // Acquiring: IDLE → IN_USE + err := cn.stateMachine.TryTransition([]ConnState{StateIdle}, StateInUse) + return err == nil + } else { + // Releasing: IN_USE → IDLE + err := cn.stateMachine.TryTransition([]ConnState{StateInUse}, StateIdle) + return err == nil + } } // IsUsed returns true if the connection is currently in use (lock-free). @@ -251,7 +257,7 @@ func (cn *Conn) CompareAndSwapUsed(old, new bool) bool { // actively processing a command. Background operations (like re-auth) should // wait until the connection is not used before executing commands. func (cn *Conn) IsUsed() bool { - return cn.used.Load() + return cn.stateMachine.GetState() == StateInUse } // SetUsed sets the used flag for the connection (lock-free). @@ -262,7 +268,11 @@ func (cn *Conn) IsUsed() bool { // Prefer CompareAndSwapUsed() when acquiring from a multi-connection pool to // avoid race conditions. func (cn *Conn) SetUsed(val bool) { - cn.used.Store(val) + if val { + cn.stateMachine.Transition(StateInUse) + } else { + cn.stateMachine.Transition(StateIdle) + } } // getNetConn returns the current network connection using atomic load (lock-free). @@ -514,11 +524,11 @@ func (cn *Conn) GetNetConn() net.Conn { // If another goroutine is currently initializing, this will wait for it to complete. func (cn *Conn) SetNetConnAndInitConn(ctx context.Context, netConn net.Conn) error { // Wait for and transition to INITIALIZING state - this prevents concurrent initializations - // Valid from states: CREATED (first init), READY (handoff/reconnect), REAUTH_IN_PROGRESS (after reauth) + // Valid from states: CREATED (first init), IDLE (reconnect), UNUSABLE (handoff/reauth) // If another goroutine is initializing, we'll wait for it to finish err := cn.stateMachine.AwaitAndTransition( ctx, - []ConnState{StateCreated, StateReady, StateReauthInProgress}, + []ConnState{StateCreated, StateIdle, StateUnusable}, StateInitializing, ) if err != nil { @@ -536,8 +546,8 @@ func (cn *Conn) SetNetConnAndInitConn(ctx context.Context, netConn net.Conn) err return initErr } - // Initialization succeeded - transition to READY - cn.stateMachine.Transition(StateReady) + // Initialization succeeded - transition to IDLE (ready to be acquired) + cn.stateMachine.Transition(StateIdle) return nil } diff --git a/internal/pool/conn_state.go b/internal/pool/conn_state.go index a9a20da4..89fac85f 100644 --- a/internal/pool/conn_state.go +++ b/internal/pool/conn_state.go @@ -11,6 +11,13 @@ import ( // ConnState represents the connection state in the state machine. // States are designed to be lightweight and fast to check. +// +// State Transitions: +// CREATED → INITIALIZING → IDLE ⇄ IN_USE +// ↓ +// UNUSABLE (handoff/reauth) +// ↓ +// IDLE/CLOSED type ConnState uint32 const ( @@ -20,11 +27,15 @@ const ( // StateInitializing - Connection initialization in progress StateInitializing - // StateReady - Connection ready for use - StateReady + // StateIdle - Connection initialized and idle in pool, ready to be acquired + StateIdle - // StateReauthInProgress - Reauth actively being processed - StateReauthInProgress + // StateInUse - Connection actively processing a command (retrieved from pool) + StateInUse + + // StateUnusable - Connection temporarily unusable due to background operation + // (handoff, reauth, etc.). Cannot be acquired from pool. + StateUnusable // StateClosed - Connection closed StateClosed @@ -37,10 +48,12 @@ func (s ConnState) String() string { return "CREATED" case StateInitializing: return "INITIALIZING" - case StateReady: - return "READY" - case StateReauthInProgress: - return "REAUTH_IN_PROGRESS" + case StateIdle: + return "IDLE" + case StateInUse: + return "IN_USE" + case StateUnusable: + return "UNUSABLE" case StateClosed: return "CLOSED" default: diff --git a/internal/pool/conn_state_test.go b/internal/pool/conn_state_test.go index a25ec366..1f2e23a7 100644 --- a/internal/pool/conn_state_test.go +++ b/internal/pool/conn_state_test.go @@ -10,7 +10,7 @@ import ( func TestConnStateMachine_GetState(t *testing.T) { sm := NewConnStateMachine() - + if state := sm.GetState(); state != StateCreated { t.Errorf("expected initial state to be CREATED, got %s", state) } @@ -18,16 +18,16 @@ func TestConnStateMachine_GetState(t *testing.T) { func TestConnStateMachine_Transition(t *testing.T) { sm := NewConnStateMachine() - + // Unconditional transition sm.Transition(StateInitializing) if state := sm.GetState(); state != StateInitializing { t.Errorf("expected state to be INITIALIZING, got %s", state) } - - sm.Transition(StateReady) - if state := sm.GetState(); state != StateReady { - t.Errorf("expected state to be READY, got %s", state) + + sm.Transition(StateIdle) + if state := sm.GetState(); state != StateIdle { + t.Errorf("expected state to be IDLE, got %s", state) } } @@ -47,42 +47,42 @@ func TestConnStateMachine_TryTransition(t *testing.T) { expectError: false, }, { - name: "invalid transition from CREATED to READY", + name: "invalid transition from CREATED to IDLE", initialState: StateCreated, validStates: []ConnState{StateInitializing}, - targetState: StateReady, + targetState: StateIdle, expectError: true, }, { name: "transition to same state", - initialState: StateReady, - validStates: []ConnState{StateReady}, - targetState: StateReady, + initialState: StateIdle, + validStates: []ConnState{StateIdle}, + targetState: StateIdle, expectError: false, }, { name: "multiple valid from states", - initialState: StateReady, - validStates: []ConnState{StateInitializing, StateReady, StateReauthInProgress}, - targetState: StateReauthInProgress, + initialState: StateIdle, + validStates: []ConnState{StateInitializing, StateIdle, StateUnusable}, + targetState: StateUnusable, expectError: false, }, } - + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { sm := NewConnStateMachine() sm.Transition(tt.initialState) - + err := sm.TryTransition(tt.validStates, tt.targetState) - + if tt.expectError && err == nil { t.Error("expected error but got none") } if !tt.expectError && err != nil { t.Errorf("unexpected error: %v", err) } - + if !tt.expectError { if state := sm.GetState(); state != tt.targetState { t.Errorf("expected state %s, got %s", tt.targetState, state) @@ -94,17 +94,17 @@ func TestConnStateMachine_TryTransition(t *testing.T) { func TestConnStateMachine_AwaitAndTransition_FastPath(t *testing.T) { sm := NewConnStateMachine() - sm.Transition(StateReady) + sm.Transition(StateIdle) ctx := context.Background() // Fast path: already in valid state - err := sm.AwaitAndTransition(ctx, []ConnState{StateReady}, StateReauthInProgress) + err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateUnusable) if err != nil { t.Errorf("unexpected error: %v", err) } - if state := sm.GetState(); state != StateReauthInProgress { + if state := sm.GetState(); state != StateUnusable { t.Errorf("expected state REAUTH_IN_PROGRESS, got %s", state) } } @@ -112,12 +112,12 @@ func TestConnStateMachine_AwaitAndTransition_FastPath(t *testing.T) { func TestConnStateMachine_AwaitAndTransition_Timeout(t *testing.T) { sm := NewConnStateMachine() sm.Transition(StateCreated) - + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) defer cancel() // Wait for a state that will never come - err := sm.AwaitAndTransition(ctx, []ConnState{StateReady}, StateReauthInProgress) + err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateUnusable) if err == nil { t.Error("expected timeout error but got none") } @@ -150,7 +150,7 @@ func TestConnStateMachine_AwaitAndTransition_FIFO(t *testing.T) { startBarrier.Wait() ctx := context.Background() - err := sm.AwaitAndTransition(ctx, []ConnState{StateReady}, StateReady) + err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateIdle) if err != nil { t.Errorf("waiter %d got error: %v", waiterID, err) return @@ -161,7 +161,7 @@ func TestConnStateMachine_AwaitAndTransition_FIFO(t *testing.T) { orderMu.Unlock() // Transition back to READY for next waiter - sm.Transition(StateReady) + sm.Transition(StateIdle) }() } @@ -169,7 +169,7 @@ func TestConnStateMachine_AwaitAndTransition_FIFO(t *testing.T) { time.Sleep(100 * time.Millisecond) // Transition to READY to start processing waiters - sm.Transition(StateReady) + sm.Transition(StateIdle) // Wait for all waiters to complete wg.Wait() @@ -191,7 +191,7 @@ func TestConnStateMachine_AwaitAndTransition_FIFO(t *testing.T) { func TestConnStateMachine_ConcurrentAccess(t *testing.T) { sm := NewConnStateMachine() - sm.Transition(StateReady) + sm.Transition(StateIdle) const numGoroutines = 100 const numIterations = 100 @@ -206,11 +206,11 @@ func TestConnStateMachine_ConcurrentAccess(t *testing.T) { for j := 0; j < numIterations; j++ { // Try to transition from READY to REAUTH_IN_PROGRESS - err := sm.TryTransition([]ConnState{StateReady}, StateReauthInProgress) + err := sm.TryTransition([]ConnState{StateIdle}, StateUnusable) if err == nil { successCount.Add(1) // Transition back to READY - sm.Transition(StateReady) + sm.Transition(StateIdle) } // Read state (hot path) @@ -238,8 +238,9 @@ func TestConnStateMachine_StateString(t *testing.T) { }{ {StateCreated, "CREATED"}, {StateInitializing, "INITIALIZING"}, - {StateReady, "READY"}, - {StateReauthInProgress, "REAUTH_IN_PROGRESS"}, + {StateIdle, "IDLE"}, + {StateInUse, "IN_USE"}, + {StateUnusable, "UNUSABLE"}, {StateClosed, "CLOSED"}, {ConnState(999), "UNKNOWN(999)"}, } @@ -255,8 +256,8 @@ func TestConnStateMachine_StateString(t *testing.T) { func BenchmarkConnStateMachine_GetState(b *testing.B) { sm := NewConnStateMachine() - sm.Transition(StateReady) - + sm.Transition(StateIdle) + b.ResetTimer() for i := 0; i < b.N; i++ { _ = sm.GetState() @@ -265,7 +266,7 @@ func BenchmarkConnStateMachine_GetState(b *testing.B) { func TestConnStateMachine_PreventsConcurrentInitialization(t *testing.T) { sm := NewConnStateMachine() - sm.Transition(StateReady) + sm.Transition(StateIdle) const numGoroutines = 10 var inInitializing atomic.Int32 @@ -286,7 +287,7 @@ func TestConnStateMachine_PreventsConcurrentInitialization(t *testing.T) { startBarrier.Wait() // Try to transition to INITIALIZING - err := sm.TryTransition([]ConnState{StateReady}, StateInitializing) + err := sm.TryTransition([]ConnState{StateIdle}, StateInitializing) if err == nil { successCount.Add(1) @@ -310,7 +311,7 @@ func TestConnStateMachine_PreventsConcurrentInitialization(t *testing.T) { inInitializing.Add(-1) // Transition back to READY - sm.Transition(StateReady) + sm.Transition(StateIdle) } else { t.Logf("Goroutine %d: failed to enter INITIALIZING - %v", id, err) } @@ -329,7 +330,7 @@ func TestConnStateMachine_PreventsConcurrentInitialization(t *testing.T) { func TestConnStateMachine_AwaitAndTransitionWaitsForInitialization(t *testing.T) { sm := NewConnStateMachine() - sm.Transition(StateReady) + sm.Transition(StateIdle) const numGoroutines = 5 var completedCount atomic.Int32 @@ -352,7 +353,7 @@ func TestConnStateMachine_AwaitAndTransitionWaitsForInitialization(t *testing.T) ctx := context.Background() // Try to transition to INITIALIZING - should wait if another is initializing - err := sm.AwaitAndTransition(ctx, []ConnState{StateReady}, StateInitializing) + err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateInitializing) if err != nil { t.Errorf("Goroutine %d: failed to transition: %v", id, err) return @@ -369,7 +370,7 @@ func TestConnStateMachine_AwaitAndTransitionWaitsForInitialization(t *testing.T) time.Sleep(10 * time.Millisecond) // Transition back to READY - sm.Transition(StateReady) + sm.Transition(StateIdle) completedCount.Add(1) t.Logf("Goroutine %d: completed initialization (total=%d)", id, completedCount.Load()) @@ -384,7 +385,7 @@ func TestConnStateMachine_AwaitAndTransitionWaitsForInitialization(t *testing.T) } // Final state should be READY - if sm.GetState() != StateReady { + if sm.GetState() != StateIdle { t.Errorf("expected final state READY, got %s", sm.GetState()) } @@ -418,7 +419,7 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) { ctx := context.Background() // This should queue in FIFO order - err := sm.AwaitAndTransition(ctx, []ConnState{StateReady}, StateInitializing) + err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateInitializing) if err != nil { t.Errorf("Goroutine %d: failed to transition: %v", id, err) return @@ -432,7 +433,7 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) { t.Logf("Goroutine %d: executed (position %d)", id, len(executionOrder)) // Transition back to READY to allow next waiter - sm.Transition(StateReady) + sm.Transition(StateIdle) }(i) } @@ -440,7 +441,7 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) { time.Sleep(50 * time.Millisecond) // Transition to READY to start processing the queue - sm.Transition(StateReady) + sm.Transition(StateIdle) wg.Wait() @@ -456,7 +457,7 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) { func TestConnStateMachine_FIFOWithFastPath(t *testing.T) { sm := NewConnStateMachine() - sm.Transition(StateReady) // Start in READY so fast path is available + sm.Transition(StateIdle) // Start in READY so fast path is available const numGoroutines = 10 var executionOrder []int @@ -481,7 +482,7 @@ func TestConnStateMachine_FIFOWithFastPath(t *testing.T) { ctx := context.Background() // This might use fast path (CAS) or slow path (queue) - err := sm.AwaitAndTransition(ctx, []ConnState{StateReady}, StateInitializing) + err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateInitializing) if err != nil { t.Errorf("Goroutine %d: failed to transition: %v", id, err) return @@ -498,7 +499,7 @@ func TestConnStateMachine_FIFOWithFastPath(t *testing.T) { time.Sleep(5 * time.Millisecond) // Transition back to READY to allow next waiter - sm.Transition(StateReady) + sm.Transition(StateIdle) }(i) } @@ -523,12 +524,211 @@ func TestConnStateMachine_FIFOWithFastPath(t *testing.T) { func BenchmarkConnStateMachine_TryTransition(b *testing.B) { sm := NewConnStateMachine() - sm.Transition(StateReady) + sm.Transition(StateIdle) b.ResetTimer() for i := 0; i < b.N; i++ { - _ = sm.TryTransition([]ConnState{StateReady}, StateReauthInProgress) - sm.Transition(StateReady) + _ = sm.TryTransition([]ConnState{StateIdle}, StateUnusable) + sm.Transition(StateIdle) } } + + +func TestConnStateMachine_IdleInUseTransitions(t *testing.T) { + sm := NewConnStateMachine() + + // Initialize to IDLE state + sm.Transition(StateInitializing) + sm.Transition(StateIdle) + + // Test IDLE → IN_USE transition + err := sm.TryTransition([]ConnState{StateIdle}, StateInUse) + if err != nil { + t.Errorf("failed to transition from IDLE to IN_USE: %v", err) + } + if state := sm.GetState(); state != StateInUse { + t.Errorf("expected state IN_USE, got %s", state) + } + + // Test IN_USE → IDLE transition + err = sm.TryTransition([]ConnState{StateInUse}, StateIdle) + if err != nil { + t.Errorf("failed to transition from IN_USE to IDLE: %v", err) + } + if state := sm.GetState(); state != StateIdle { + t.Errorf("expected state IDLE, got %s", state) + } + + // Test concurrent acquisition (only one should succeed) + sm.Transition(StateIdle) + + var successCount atomic.Int32 + var wg sync.WaitGroup + + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + err := sm.TryTransition([]ConnState{StateIdle}, StateInUse) + if err == nil { + successCount.Add(1) + } + }() + } + + wg.Wait() + + if count := successCount.Load(); count != 1 { + t.Errorf("expected exactly 1 successful transition, got %d", count) + } + + if state := sm.GetState(); state != StateInUse { + t.Errorf("expected final state IN_USE, got %s", state) + } +} + +func TestConn_UsedMethods(t *testing.T) { + cn := NewConn(nil) + + // Initialize connection to IDLE state + cn.stateMachine.Transition(StateInitializing) + cn.stateMachine.Transition(StateIdle) + + // Test IsUsed - should be false when IDLE + if cn.IsUsed() { + t.Error("expected IsUsed to be false for IDLE connection") + } + + // Test CompareAndSwapUsed - acquire connection + if !cn.CompareAndSwapUsed(false, true) { + t.Error("failed to acquire connection with CompareAndSwapUsed") + } + + // Test IsUsed - should be true when IN_USE + if !cn.IsUsed() { + t.Error("expected IsUsed to be true for IN_USE connection") + } + + // Test CompareAndSwapUsed - release connection + if !cn.CompareAndSwapUsed(true, false) { + t.Error("failed to release connection with CompareAndSwapUsed") + } + + // Test IsUsed - should be false again + if cn.IsUsed() { + t.Error("expected IsUsed to be false after release") + } + + // Test SetUsed + cn.SetUsed(true) + if !cn.IsUsed() { + t.Error("expected IsUsed to be true after SetUsed(true)") + } + + cn.SetUsed(false) + if cn.IsUsed() { + t.Error("expected IsUsed to be false after SetUsed(false)") + } +} + + +func TestConnStateMachine_UnusableState(t *testing.T) { + sm := NewConnStateMachine() + + // Initialize to IDLE state + sm.Transition(StateInitializing) + sm.Transition(StateIdle) + + // Test IDLE → UNUSABLE transition (for background operations) + err := sm.TryTransition([]ConnState{StateIdle}, StateUnusable) + if err != nil { + t.Errorf("failed to transition from IDLE to UNUSABLE: %v", err) + } + if state := sm.GetState(); state != StateUnusable { + t.Errorf("expected state UNUSABLE, got %s", state) + } + + // Test UNUSABLE → IDLE transition (after background operation completes) + err = sm.TryTransition([]ConnState{StateUnusable}, StateIdle) + if err != nil { + t.Errorf("failed to transition from UNUSABLE to IDLE: %v", err) + } + if state := sm.GetState(); state != StateIdle { + t.Errorf("expected state IDLE, got %s", state) + } + + // Test that we can transition from IN_USE to UNUSABLE if needed + // (e.g., for urgent handoff while connection is in use) + sm.Transition(StateInUse) + err = sm.TryTransition([]ConnState{StateInUse}, StateUnusable) + if err != nil { + t.Errorf("failed to transition from IN_USE to UNUSABLE: %v", err) + } + if state := sm.GetState(); state != StateUnusable { + t.Errorf("expected state UNUSABLE, got %s", state) + } + + // Test UNUSABLE → INITIALIZING transition (for handoff) + sm.Transition(StateIdle) + sm.Transition(StateUnusable) + err = sm.TryTransition([]ConnState{StateUnusable}, StateInitializing) + if err != nil { + t.Errorf("failed to transition from UNUSABLE to INITIALIZING: %v", err) + } + if state := sm.GetState(); state != StateInitializing { + t.Errorf("expected state INITIALIZING, got %s", state) + } +} + +func TestConn_UsableUnusable(t *testing.T) { + cn := NewConn(nil) + + // Initialize connection to IDLE state + cn.stateMachine.Transition(StateInitializing) + cn.stateMachine.Transition(StateIdle) + + // Test IsUsable - should be true when IDLE + if !cn.IsUsable() { + t.Error("expected IsUsable to be true for IDLE connection") + } + + // Test CompareAndSwapUsable - make unusable for background operation + if !cn.CompareAndSwapUsable(true, false) { + t.Error("failed to make connection unusable with CompareAndSwapUsable") + } + + // Verify state is UNUSABLE + if state := cn.stateMachine.GetState(); state != StateUnusable { + t.Errorf("expected state UNUSABLE, got %s", state) + } + + // Test IsUsable - should be false when UNUSABLE + if cn.IsUsable() { + t.Error("expected IsUsable to be false for UNUSABLE connection") + } + + // Test CompareAndSwapUsable - make usable again + if !cn.CompareAndSwapUsable(false, true) { + t.Error("failed to make connection usable with CompareAndSwapUsable") + } + + // Verify state is IDLE + if state := cn.stateMachine.GetState(); state != StateIdle { + t.Errorf("expected state IDLE, got %s", state) + } + + // Test SetUsable(false) + cn.SetUsable(false) + if state := cn.stateMachine.GetState(); state != StateUnusable { + t.Errorf("expected state UNUSABLE after SetUsable(false), got %s", state) + } + + // Test SetUsable(true) + cn.SetUsable(true) + if state := cn.stateMachine.GetState(); state != StateIdle { + t.Errorf("expected state IDLE after SetUsable(true), got %s", state) + } +} + +