1
0
mirror of https://github.com/redis/go-redis.git synced 2025-12-02 06:22:31 +03:00

fix fast semaphore that could have been starved

This commit is contained in:
Nedyalko Dyakov
2025-11-10 03:21:36 +02:00
parent edf6bd794f
commit 1dfa805f31

View File

@@ -89,8 +89,6 @@ func (s *FastSemaphore) Acquire(ctx context.Context, timeout time.Duration, time
defer semTimers.Put(timer)
timer.Reset(timeout)
start := time.Now()
for {
select {
case <-ctx.Done():
@@ -100,23 +98,15 @@ func (s *FastSemaphore) Acquire(ctx context.Context, timeout time.Duration, time
return ctx.Err()
case <-s.waitCh:
// Someone released a token, try to acquire it
if s.TryAcquire() {
if !timer.Stop() {
<-timer.C
}
return nil
// Someone released a token, we got the spot
// no need to touch the counter
if !timer.Stop() {
<-timer.C
}
// Failed to acquire (race with another goroutine), continue waiting
return nil
case <-timer.C:
return timeoutErr
}
// Periodically check if we can acquire (handles race conditions)
if time.Since(start) > timeout {
return timeoutErr
}
}
}
@@ -132,25 +122,26 @@ func (s *FastSemaphore) AcquireBlocking() {
// Slow path: wait for a token
for {
<-s.waitCh
if s.TryAcquire() {
return
}
// Failed to acquire (race with another goroutine), continue waiting
// Someone released a token, we got the spot
// no need to touch the counter
return
}
}
// Release releases a token back to the semaphore.
// This wakes up one waiting goroutine if any are blocked.
func (s *FastSemaphore) Release() {
s.count.Add(-1)
// Try to wake up a waiter (non-blocking)
// If no one is waiting, this is a no-op
select {
case s.waitCh <- struct{}{}:
// Successfully notified a waiter
// no need to decrement the counter, the waiter will use this spot
default:
// No waiters, that's fine
// decrement the counter
s.count.Add(-1)
}
}