diff --git a/async_handoff_integration_test.go b/async_handoff_integration_test.go index 44ebc2f6..e82baf46 100644 --- a/async_handoff_integration_test.go +++ b/async_handoff_integration_test.go @@ -8,9 +8,9 @@ import ( "testing" "time" - "github.com/redis/go-redis/v9/maintnotifications" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/logging" + "github.com/redis/go-redis/v9/maintnotifications" ) // mockNetConn implements net.Conn for testing @@ -80,7 +80,7 @@ func TestEventDrivenHandoffIntegration(t *testing.T) { var initConnCalled atomic.Bool initConnStarted := make(chan struct{}) initConnFunc := func(ctx context.Context, cn *pool.Conn) error { - close(initConnStarted) // Signal that InitConn has started + close(initConnStarted) // Signal that InitConn has started time.Sleep(50 * time.Millisecond) // Add delay to keep handoff pending initConnCalled.Store(true) return nil @@ -164,6 +164,10 @@ func TestEventDrivenHandoffIntegration(t *testing.T) { // Could be the original connection (now handed off) or a new one testPool.Put(ctx, conn3) + + if !initConnCalled.Load() { + t.Error("InitConn should have been called during handoff") + } }) t.Run("ConcurrentHandoffs", func(t *testing.T) { diff --git a/internal/pool/conn_state_test.go b/internal/pool/conn_state_test.go index 40d83155..13690254 100644 --- a/internal/pool/conn_state_test.go +++ b/internal/pool/conn_state_test.go @@ -400,8 +400,13 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) { var executionOrder []int var orderMu sync.Mutex var wg sync.WaitGroup - var startBarrier sync.WaitGroup - startBarrier.Add(numGoroutines) + + // Use channels to ensure deterministic queueing order + // Each goroutine waits for the previous one to queue before it queues + queuedChannels := make([]chan struct{}, numGoroutines) + for i := 0; i < numGoroutines; i++ { + queuedChannels[i] = make(chan struct{}) + } // Launch goroutines that will all wait for i := 0; i < numGoroutines; i++ { @@ -409,15 +414,19 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) { go func(id int) { defer wg.Done() - // Wait for all goroutines to be ready - startBarrier.Done() - startBarrier.Wait() + // Wait for previous goroutine to queue (except for goroutine 0) + if id > 0 { + <-queuedChannels[id-1] + } - // Small stagger to ensure queue order - time.Sleep(time.Duration(id) * time.Millisecond) + // Small delay to ensure the previous goroutine's AwaitAndTransition has been called + time.Sleep(5 * time.Millisecond) ctx := context.Background() + // Signal that we're about to queue + close(queuedChannels[id]) + // This should queue in FIFO order _, err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateInitializing) if err != nil { @@ -437,7 +446,8 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) { }(i) } - // Wait a bit for all goroutines to queue up + // Wait for all goroutines to queue up + <-queuedChannels[numGoroutines-1] time.Sleep(50 * time.Millisecond) // Transition to READY to start processing the queue