diff --git a/internal/semaphore.go b/internal/semaphore.go index a3f3e960..84dad742 100644 --- a/internal/semaphore.go +++ b/internal/semaphore.go @@ -3,7 +3,6 @@ package internal import ( "context" "sync" - "sync/atomic" "time" ) @@ -17,10 +16,8 @@ var semTimers = sync.Pool{ // waiter represents a goroutine waiting for a token. type waiter struct { - ready chan struct{} - next *waiter - cancelled atomic.Bool // Set to true if this waiter was cancelled/timed out - notified atomic.Bool // Set to true when Release() notifies this waiter + ready chan struct{} + next *waiter } // FastSemaphore is a counting semaphore implementation using a hybrid approach. @@ -132,8 +129,16 @@ func (s *FastSemaphore) Acquire(ctx context.Context, timeout time.Duration, time } s.lock.Lock() - s.enqueue(w) - s.lock.Unlock() + // After acquiring lock, try the channel again (someone might have released) + select { + case <-s.tokens: + s.lock.Unlock() + return nil + default: + // Still empty, add to queue + s.enqueue(w) + s.lock.Unlock() + } // Use timer pool to avoid allocation timer := semTimers.Get().(*time.Timer) @@ -145,28 +150,19 @@ func (s *FastSemaphore) Acquire(ctx context.Context, timeout time.Duration, time if !timer.Stop() { <-timer.C } - // Mark as cancelled and try to claim ourselves - w.cancelled.Store(true) - if w.notified.CompareAndSwap(false, true) { - // We successfully claimed ourselves, we're cancelling - // Try to remove from queue - s.lock.Lock() - removed := s.removeWaiter(w) - s.lock.Unlock() + // Try to remove ourselves from the queue + s.lock.Lock() + removed := s.removeWaiter(w) + s.lock.Unlock() - if !removed { - // Already dequeued, wait for ready to be closed - <-w.ready - } - // We claimed it, so no token was given to us - return ctx.Err() - } else { - // Release() already claimed us and is giving us a token - // Wait for the notification and then release the token - <-w.ready - s.releaseToPool() + if removed { + // We successfully removed ourselves, no token given return ctx.Err() } + // We were already dequeued and given a token, must return it + <-w.ready + s.Release() + return ctx.Err() case <-w.ready: // We were notified and got the token // Stop the timer and drain it if it already fired @@ -176,28 +172,19 @@ func (s *FastSemaphore) Acquire(ctx context.Context, timeout time.Duration, time // We have the token, just return return nil case <-timer.C: - // Mark as cancelled and try to claim ourselves - w.cancelled.Store(true) - if w.notified.CompareAndSwap(false, true) { - // We successfully claimed ourselves, we're cancelling - // Try to remove from queue - s.lock.Lock() - removed := s.removeWaiter(w) - s.lock.Unlock() + // Try to remove ourselves from the queue + s.lock.Lock() + removed := s.removeWaiter(w) + s.lock.Unlock() - if !removed { - // Already dequeued, wait for ready to be closed - <-w.ready - } - // We claimed it, so no token was given to us - return timeoutErr - } else { - // Release() already claimed us and is giving us a token - // Wait for the notification and then release the token - <-w.ready - s.releaseToPool() + if removed { + // We successfully removed ourselves, no token given return timeoutErr } + // We were already dequeued and given a token, must return it + <-w.ready + s.Release() + return timeoutErr } } @@ -258,76 +245,21 @@ func (s *FastSemaphore) AcquireBlocking() { <-w.ready } -// releaseToPool releases a token back to the pool. -// This should be called when a waiter was notified but then cancelled/timed out. -// We need to pass the token to another waiter if any, otherwise put it back in the channel. -func (s *FastSemaphore) releaseToPool() { - // Try to give the token to a waiter first - for { - s.lock.Lock() - w := s.dequeue() - s.lock.Unlock() - - if w == nil { - // No waiters, put the token back in the channel - // Use select to avoid blocking (should never block, but just in case) - select { - case s.tokens <- struct{}{}: - return - default: - // Channel is full - this should never happen! - // It means we have a logic error in token accounting - panic("semaphore: releaseToPool: channel is full") - } - } - - // Try to claim this waiter by setting notified flag - // If the waiter is being cancelled concurrently, one of us will win - if !w.notified.CompareAndSwap(false, true) { - // Someone else (the waiter itself) already claimed it - // This means the waiter is cancelling, skip to next - close(w.ready) // Still need to close to unblock them - continue - } - - // We successfully claimed the waiter, transfer the token - // Even if it was cancelled, we must give it the token because we claimed it - // The waiter will call releaseToPool() if it was cancelled - close(w.ready) - return - } -} - // Release releases a token back to the semaphore. // This wakes up the first waiting goroutine if any are blocked. func (s *FastSemaphore) Release() { - // Try to give the token to a waiter first - for { - s.lock.Lock() - w := s.dequeue() + s.lock.Lock() + w := s.dequeue() + if w == nil { + // No waiters, put the token back in the channel s.lock.Unlock() - - if w == nil { - // No waiters, put the token back in the channel - s.tokens <- struct{}{} - return - } - - // Try to claim this waiter by setting notified flag - // If the waiter is being cancelled concurrently, one of us will win - if !w.notified.CompareAndSwap(false, true) { - // Someone else (the waiter itself) already claimed it - // This means the waiter is cancelling, skip to next - close(w.ready) // Still need to close to unblock them - continue - } - - // We successfully claimed the waiter, transfer the token - // Even if it was cancelled, we must give it the token because we claimed it - // The waiter will call releaseToPool() if it was cancelled - close(w.ready) + s.tokens <- struct{}{} return } + s.lock.Unlock() + + // We have a waiter, give them the token + close(w.ready) } // Len returns the current number of acquired tokens.