mirror of
https://github.com/redis/go-redis.git
synced 2025-12-02 06:22:31 +03:00
fix flaky test
This commit is contained in:
@@ -320,7 +320,8 @@ func (sm *ConnStateMachine) notifyWaiters() {
|
|||||||
processed = true
|
processed = true
|
||||||
break
|
break
|
||||||
} else {
|
} else {
|
||||||
// State changed - re-add waiter to front of queue and retry
|
// State changed - re-add waiter to front of queue to maintain FIFO ordering
|
||||||
|
// This waiter was first in line and should retain priority
|
||||||
sm.waiters.PushFront(w)
|
sm.waiters.PushFront(w)
|
||||||
sm.waiterCount.Add(1)
|
sm.waiterCount.Add(1)
|
||||||
// Continue to next iteration to re-read state
|
// Continue to next iteration to re-read state
|
||||||
|
|||||||
@@ -401,32 +401,16 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) {
|
|||||||
var orderMu sync.Mutex
|
var orderMu sync.Mutex
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
// Use channels to ensure deterministic queueing order
|
// Launch goroutines one at a time, ensuring each is queued before launching the next
|
||||||
// Each goroutine waits for the previous one to queue before it queues
|
|
||||||
queuedChannels := make([]chan struct{}, numGoroutines)
|
|
||||||
for i := 0; i < numGoroutines; i++ {
|
|
||||||
queuedChannels[i] = make(chan struct{})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Launch goroutines that will all wait
|
|
||||||
for i := 0; i < numGoroutines; i++ {
|
for i := 0; i < numGoroutines; i++ {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
|
expectedWaiters := int32(i + 1)
|
||||||
|
|
||||||
go func(id int) {
|
go func(id int) {
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
|
||||||
// Wait for previous goroutine to queue (except for goroutine 0)
|
|
||||||
if id > 0 {
|
|
||||||
<-queuedChannels[id-1]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Small delay to ensure the previous goroutine's AwaitAndTransition has been called
|
|
||||||
time.Sleep(5 * time.Millisecond)
|
|
||||||
|
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
// Signal that we're about to queue
|
|
||||||
close(queuedChannels[id])
|
|
||||||
|
|
||||||
// This should queue in FIFO order
|
// This should queue in FIFO order
|
||||||
_, err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateInitializing)
|
_, err := sm.AwaitAndTransition(ctx, []ConnState{StateIdle}, StateInitializing)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -441,16 +425,30 @@ func TestConnStateMachine_FIFOOrdering(t *testing.T) {
|
|||||||
|
|
||||||
t.Logf("Goroutine %d: executed (position %d)", id, len(executionOrder))
|
t.Logf("Goroutine %d: executed (position %d)", id, len(executionOrder))
|
||||||
|
|
||||||
// Transition back to READY to allow next waiter
|
// Transition back to IDLE to allow next waiter
|
||||||
sm.Transition(StateIdle)
|
sm.Transition(StateIdle)
|
||||||
}(i)
|
}(i)
|
||||||
|
|
||||||
|
// Wait until this goroutine has been queued before launching the next
|
||||||
|
// Poll the waiter count to ensure the goroutine is actually queued
|
||||||
|
timeout := time.After(100 * time.Millisecond)
|
||||||
|
for {
|
||||||
|
if sm.waiterCount.Load() >= expectedWaiters {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-timeout:
|
||||||
|
t.Fatalf("Timeout waiting for goroutine %d to queue", i)
|
||||||
|
case <-time.After(1 * time.Millisecond):
|
||||||
|
// Continue polling
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for all goroutines to queue up
|
// Give all goroutines time to fully settle in the queue
|
||||||
<-queuedChannels[numGoroutines-1]
|
time.Sleep(10 * time.Millisecond)
|
||||||
time.Sleep(50 * time.Millisecond)
|
|
||||||
|
|
||||||
// Transition to READY to start processing the queue
|
// Transition to IDLE to start processing the queue
|
||||||
sm.Transition(StateIdle)
|
sm.Transition(StateIdle)
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
|||||||
Reference in New Issue
Block a user