diff --git a/internal/semaphore.go b/internal/semaphore.go index df30d0a9..d9fb7a75 100644 --- a/internal/semaphore.go +++ b/internal/semaphore.go @@ -107,17 +107,6 @@ func (s *FastSemaphore) dequeue() *waiter { return w } -// notifyOne wakes up the first waiter in the queue if any. -func (s *FastSemaphore) notifyOne() { - s.lock.Lock() - w := s.dequeue() - s.lock.Unlock() - - if w != nil { - close(w.ready) - } -} - // Acquire acquires a token, blocking if necessary until one is available or the context is cancelled. // Returns an error if the context is cancelled or the timeout expires. // Returns timeoutErr when the timeout expires. @@ -273,16 +262,47 @@ func (s *FastSemaphore) AcquireBlocking() { // This should be called when a waiter was notified but then cancelled/timed out. // We need to pass the token to another waiter if any, otherwise put it back in the channel. func (s *FastSemaphore) releaseToPool() { - s.lock.Lock() - w := s.dequeue() - s.lock.Unlock() + // Try to give the token to a waiter first + for { + s.lock.Lock() + w := s.dequeue() + s.lock.Unlock() - if w != nil { - // Transfer the token to another waiter + if w == nil { + // No waiters, put the token back in the channel + // Use select to avoid blocking (should never block, but just in case) + select { + case s.tokens <- struct{}{}: + return + default: + // Channel is full - this should never happen! + // It means we have a logic error in token accounting + panic("semaphore: releaseToPool: channel is full") + } + } + + // Try to claim this waiter by setting notified flag + // If the waiter is being cancelled concurrently, one of us will win + if !w.notified.CompareAndSwap(false, true) { + // Someone else (the waiter itself) already claimed it + // This means the waiter is cancelling, skip to next + close(w.ready) // Still need to close to unblock them + continue + } + + // We successfully claimed the waiter + // Check if it was cancelled after we claimed it (shouldn't happen, but check anyway) + if w.cancelled.Load() { + // Waiter was cancelled, but we claimed it first + // We need to release the token + close(w.ready) // Unblock the waiter + // Continue to try next waiter + continue + } + + // We successfully claimed the waiter, transfer the token close(w.ready) - } else { - // No waiters, put the token back in the channel - s.tokens <- struct{}{} + return } } @@ -301,14 +321,6 @@ func (s *FastSemaphore) Release() { return } - // Check if this waiter was cancelled before we notify them - if w.cancelled.Load() { - // This waiter was cancelled, skip them and try the next one - // We still have the token, so continue the loop - close(w.ready) // Still need to close to unblock them - continue - } - // Try to claim this waiter by setting notified flag // If the waiter is being cancelled concurrently, one of us will win if !w.notified.CompareAndSwap(false, true) { @@ -318,6 +330,16 @@ func (s *FastSemaphore) Release() { continue } + // We successfully claimed the waiter + // Check if it was cancelled after we claimed it (shouldn't happen, but check anyway) + if w.cancelled.Load() { + // Waiter was cancelled, but we claimed it first + // We need to release the token + close(w.ready) // Unblock the waiter + // Continue to try next waiter + continue + } + // We successfully claimed the waiter, transfer the token close(w.ready) return