From edf6bd794f6d023c3248e09d8f706aaff1b93331 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Fri, 7 Nov 2025 16:25:06 +0200 Subject: [PATCH] fix flaky test --- internal/pool/conn_state.go | 3 ++- internal/pool/conn_state_test.go | 46 +++++++++++++++----------------- 2 files changed, 24 insertions(+), 25 deletions(-) diff --git a/internal/pool/conn_state.go b/internal/pool/conn_state.go index 4d3672bd..a3c3a57f 100644 --- a/internal/pool/conn_state.go +++ b/internal/pool/conn_state.go @@ -320,7 +320,8 @@ func (sm *ConnStateMachine) notifyWaiters() { processed = true break } else { - // State changed - re-add waiter to front of queue and retry + // State changed - re-add waiter to front of queue to maintain FIFO ordering + // This waiter was first in line and should retain priority sm.waiters.PushFront(w) sm.waiterCount.Add(1) // Continue to next iteration to re-read state diff --git a/internal/pool/conn_state_test.go b/internal/pool/conn_state_test.go index 92be0dc5..d1825615 100644 --- a/internal/pool/conn_state_test.go +++ b/internal/pool/conn_state_test.go @@ -401,32 +401,16 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) { var orderMu sync.Mutex var wg sync.WaitGroup - // Use channels to ensure deterministic queueing order - // Each goroutine waits for the previous one to queue before it queues - queuedChannels := make([]chan struct{}, numGoroutines) - for i := 0; i < numGoroutines; i++ { - queuedChannels[i] = make(chan struct{}) - } - - // Launch goroutines that will all wait + // Launch goroutines one at a time, ensuring each is queued before launching the next for i := 0; i < numGoroutines; i++ { wg.Add(1) + expectedWaiters := int32(i + 1) + go func(id int) { defer wg.Done() - // Wait for previous goroutine to queue (except for goroutine 0) - if id > 0 { - <-queuedChannels[id-1] - } - - // Small delay to ensure the previous goroutine's AwaitAndTransition has been called - time.Sleep(5 * time.Millisecond) - ctx := context.Background() - // Signal that we're about to queue - close(queuedChannels[id]) - // This should queue in FIFO order _, err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateInitializing) if err != nil { @@ -441,16 +425,30 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) { t.Logf("Goroutine %d: executed (position %d)", id, len(executionOrder)) - // Transition back to READY to allow next waiter + // Transition back to IDLE to allow next waiter sm.Transition(StateIdle) }(i) + + // Wait until this goroutine has been queued before launching the next + // Poll the waiter count to ensure the goroutine is actually queued + timeout := time.After(100 * time.Millisecond) + for { + if sm.waiterCount.Load() >= expectedWaiters { + break + } + select { + case <-timeout: + t.Fatalf("Timeout waiting for goroutine %d to queue", i) + case <-time.After(1 * time.Millisecond): + // Continue polling + } + } } - // Wait for all goroutines to queue up - <-queuedChannels[numGoroutines-1] - time.Sleep(50 * time.Millisecond) + // Give all goroutines time to fully settle in the queue + time.Sleep(10 * time.Millisecond) - // Transition to READY to start processing the queue + // Transition to IDLE to start processing the queue sm.Transition(StateIdle) wg.Wait()