diff --git a/internal/semaphore.go b/internal/semaphore.go index 3c1ae337..df30d0a9 100644 --- a/internal/semaphore.go +++ b/internal/semaphore.go @@ -19,27 +19,28 @@ var semTimers = sync.Pool{ type waiter struct { ready chan struct{} next *waiter - cancelled atomic.Bool // Set to true if this waiter was cancelled/timed out + cancelled atomic.Bool // Set to true if this waiter was cancelled/timed out + notified atomic.Bool // Set to true when Release() notifies this waiter } -// FastSemaphore is a counting semaphore implementation using atomic operations. +// FastSemaphore is a counting semaphore implementation using a hybrid approach. // It's optimized for the fast path (no blocking) while still supporting timeouts and context cancellation. // -// This implementation maintains FIFO ordering of waiters using a linked list queue. -// When a token is released, the first waiter in the queue is notified. +// This implementation uses a buffered channel for the fast path (TryAcquire/Release without waiters) +// and a FIFO queue for waiters to ensure fairness. // // Performance characteristics: -// - Fast path (no blocking): Single atomic CAS operation +// - Fast path (no blocking): Single channel operation (very fast) // - Slow path (blocking): FIFO queue-based waiting -// - Release: Single atomic decrement + wake up first waiter in queue +// - Release: Channel send or wake up first waiter in queue // // This is significantly faster than a pure channel-based semaphore because: -// 1. The fast path avoids channel operations entirely (no scheduler involvement) -// 2. Atomic operations are much cheaper than channel send/receive -// 3. FIFO ordering prevents starvation +// 1. The fast path uses a buffered channel (single atomic operation) +// 2. FIFO ordering prevents starvation for waiters +// 3. Waiters don't compete with TryAcquire callers type FastSemaphore struct { - // Current number of acquired tokens (atomic) - count atomic.Int32 + // Buffered channel for fast path (TryAcquire/Release) + tokens chan struct{} // Maximum number of tokens (capacity) max int32 @@ -54,25 +55,27 @@ type FastSemaphore struct { // NewFastSemaphore creates a new fast semaphore with the given capacity. func NewFastSemaphore(capacity int32) *FastSemaphore { + ch := make(chan struct{}, capacity) + // Fill the channel with tokens (available slots) + for i := int32(0); i < capacity; i++ { + ch <- struct{}{} + } return &FastSemaphore{ - max: capacity, + max: capacity, + tokens: ch, } } // TryAcquire attempts to acquire a token without blocking. // Returns true if successful, false if the semaphore is full. // -// This is the fast path - just a single CAS operation. +// This is the fast path - just a single channel operation. func (s *FastSemaphore) TryAcquire() bool { - for { - current := s.count.Load() - if current >= s.max { - return false // Semaphore is full - } - if s.count.CompareAndSwap(current, current+1) { - return true // Successfully acquired - } - // CAS failed due to concurrent modification, retry + select { + case <-s.tokens: + return true + default: + return false } } @@ -126,9 +129,12 @@ func (s *FastSemaphore) Acquire(ctx context.Context, timeout time.Duration, time default: } - // Try fast path first - if s.TryAcquire() { + // Try fast path first (non-blocking channel receive) + select { + case <-s.tokens: return nil + default: + // Channel is empty, need to wait } // Need to wait - create a waiter and add to queue @@ -150,46 +156,59 @@ func (s *FastSemaphore) Acquire(ctx context.Context, timeout time.Duration, time if !timer.Stop() { <-timer.C } - // Mark ourselves as cancelled + // Mark as cancelled and try to claim ourselves w.cancelled.Store(true) - // Try to remove ourselves from the queue - s.lock.Lock() - removed := s.removeWaiter(w) - s.lock.Unlock() + if w.notified.CompareAndSwap(false, true) { + // We successfully claimed ourselves, we're cancelling + // Try to remove from queue + s.lock.Lock() + removed := s.removeWaiter(w) + s.lock.Unlock() - if !removed { - // We were already dequeued and notified + if !removed { + // Already dequeued, wait for ready to be closed + <-w.ready + } + // We claimed it, so no token was given to us + return ctx.Err() + } else { + // Release() already claimed us and is giving us a token // Wait for the notification and then release the token <-w.ready s.releaseToPool() - } - return ctx.Err() - case <-w.ready: - // We were notified, check if we were cancelled - if !timer.Stop() { - <-timer.C - } - if w.cancelled.Load() { - // We were cancelled while being notified, release the token - s.releaseToPool() return ctx.Err() } + case <-w.ready: + // We were notified and got the token + // Stop the timer and drain it if it already fired + if !timer.Stop() { + <-timer.C + } + // We have the token, just return return nil case <-timer.C: - // Mark ourselves as cancelled + // Mark as cancelled and try to claim ourselves w.cancelled.Store(true) - // Try to remove ourselves from the queue - s.lock.Lock() - removed := s.removeWaiter(w) - s.lock.Unlock() + if w.notified.CompareAndSwap(false, true) { + // We successfully claimed ourselves, we're cancelling + // Try to remove from queue + s.lock.Lock() + removed := s.removeWaiter(w) + s.lock.Unlock() - if !removed { - // We were already dequeued and notified + if !removed { + // Already dequeued, wait for ready to be closed + <-w.ready + } + // We claimed it, so no token was given to us + return timeoutErr + } else { + // Release() already claimed us and is giving us a token // Wait for the notification and then release the token <-w.ready s.releaseToPool() + return timeoutErr } - return timeoutErr } } @@ -229,9 +248,12 @@ func (s *FastSemaphore) removeWaiter(target *waiter) bool { // This is useful for cases where you don't need timeout or context cancellation. // Returns immediately if a token is available (fast path). func (s *FastSemaphore) AcquireBlocking() { - // Try fast path first - if s.TryAcquire() { + // Try fast path first (non-blocking channel receive) + select { + case <-s.tokens: return + default: + // Channel is empty, need to wait } // Need to wait - create a waiter and add to queue @@ -249,7 +271,7 @@ func (s *FastSemaphore) AcquireBlocking() { // releaseToPool releases a token back to the pool. // This should be called when a waiter was notified but then cancelled/timed out. -// We need to pass the token to another waiter if any, otherwise decrement the counter. +// We need to pass the token to another waiter if any, otherwise put it back in the channel. func (s *FastSemaphore) releaseToPool() { s.lock.Lock() w := s.dequeue() @@ -259,22 +281,23 @@ func (s *FastSemaphore) releaseToPool() { // Transfer the token to another waiter close(w.ready) } else { - // No waiters, decrement the counter to free the slot - s.count.Add(-1) + // No waiters, put the token back in the channel + s.tokens <- struct{}{} } } // Release releases a token back to the semaphore. // This wakes up the first waiting goroutine if any are blocked. func (s *FastSemaphore) Release() { + // Try to give the token to a waiter first for { s.lock.Lock() w := s.dequeue() s.lock.Unlock() if w == nil { - // No waiters, decrement the counter to free the slot - s.count.Add(-1) + // No waiters, put the token back in the channel + s.tokens <- struct{}{} return } @@ -286,8 +309,16 @@ func (s *FastSemaphore) Release() { continue } - // Transfer the token directly to the waiter - // Don't decrement the counter - the waiter takes over this slot + // Try to claim this waiter by setting notified flag + // If the waiter is being cancelled concurrently, one of us will win + if !w.notified.CompareAndSwap(false, true) { + // Someone else (the waiter itself) already claimed it + // This means the waiter is cancelling, skip to next + close(w.ready) // Still need to close to unblock them + continue + } + + // We successfully claimed the waiter, transfer the token close(w.ready) return } @@ -296,5 +327,6 @@ func (s *FastSemaphore) Release() { // Len returns the current number of acquired tokens. // Used by tests to check semaphore state. func (s *FastSemaphore) Len() int32 { - return s.count.Load() + // Number of acquired tokens = max - available tokens in channel + return s.max - int32(len(s.tokens)) } diff --git a/internal/semaphore_bench_test.go b/internal/semaphore_bench_test.go new file mode 100644 index 00000000..1615ca7e --- /dev/null +++ b/internal/semaphore_bench_test.go @@ -0,0 +1,245 @@ +package internal + +import ( + "context" + "sync" + "testing" + "time" +) + +// channelSemaphore is a simple semaphore using a buffered channel +type channelSemaphore struct { + ch chan struct{} +} + +func newChannelSemaphore(capacity int) *channelSemaphore { + return &channelSemaphore{ + ch: make(chan struct{}, capacity), + } +} + +func (s *channelSemaphore) TryAcquire() bool { + select { + case s.ch <- struct{}{}: + return true + default: + return false + } +} + +func (s *channelSemaphore) Acquire(ctx context.Context, timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case s.ch <- struct{}{}: + return nil + case <-ctx.Done(): + return ctx.Err() + case <-timer.C: + return context.DeadlineExceeded + } +} + +func (s *channelSemaphore) AcquireBlocking() { + s.ch <- struct{}{} +} + +func (s *channelSemaphore) Release() { + <-s.ch +} + +// Benchmarks for FastSemaphore + +func BenchmarkFastSemaphore_TryAcquire(b *testing.B) { + sem := NewFastSemaphore(100) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if sem.TryAcquire() { + sem.Release() + } + } + }) +} + +func BenchmarkFastSemaphore_AcquireRelease(b *testing.B) { + sem := NewFastSemaphore(100) + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second, context.DeadlineExceeded) + sem.Release() + } + }) +} + +func BenchmarkFastSemaphore_Contention(b *testing.B) { + sem := NewFastSemaphore(10) // Small capacity to create contention + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second, context.DeadlineExceeded) + sem.Release() + } + }) +} + +func BenchmarkFastSemaphore_HighContention(b *testing.B) { + sem := NewFastSemaphore(1) // Very high contention + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second, context.DeadlineExceeded) + sem.Release() + } + }) +} + +// Benchmarks for channelSemaphore + +func BenchmarkChannelSemaphore_TryAcquire(b *testing.B) { + sem := newChannelSemaphore(100) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if sem.TryAcquire() { + sem.Release() + } + } + }) +} + +func BenchmarkChannelSemaphore_AcquireRelease(b *testing.B) { + sem := newChannelSemaphore(100) + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second) + sem.Release() + } + }) +} + +func BenchmarkChannelSemaphore_Contention(b *testing.B) { + sem := newChannelSemaphore(10) // Small capacity to create contention + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second) + sem.Release() + } + }) +} + +func BenchmarkChannelSemaphore_HighContention(b *testing.B) { + sem := newChannelSemaphore(1) // Very high contention + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second) + sem.Release() + } + }) +} + +// Benchmark with realistic workload (some work between acquire/release) + +func BenchmarkFastSemaphore_WithWork(b *testing.B) { + sem := NewFastSemaphore(10) + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second, context.DeadlineExceeded) + // Simulate some work + _ = make([]byte, 64) + sem.Release() + } + }) +} + +func BenchmarkChannelSemaphore_WithWork(b *testing.B) { + sem := newChannelSemaphore(10) + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second) + // Simulate some work + _ = make([]byte, 64) + sem.Release() + } + }) +} + +// Benchmark mixed TryAcquire and Acquire + +func BenchmarkFastSemaphore_Mixed(b *testing.B) { + sem := NewFastSemaphore(10) + ctx := context.Background() + var wg sync.WaitGroup + + b.ResetTimer() + + // Half goroutines use TryAcquire + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < b.N/2; i++ { + if sem.TryAcquire() { + sem.Release() + } + } + }() + + // Half goroutines use Acquire + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < b.N/2; i++ { + sem.Acquire(ctx, time.Second, context.DeadlineExceeded) + sem.Release() + } + }() + + wg.Wait() +} + +func BenchmarkChannelSemaphore_Mixed(b *testing.B) { + sem := newChannelSemaphore(10) + ctx := context.Background() + var wg sync.WaitGroup + + b.ResetTimer() + + // Half goroutines use TryAcquire + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < b.N/2; i++ { + if sem.TryAcquire() { + sem.Release() + } + } + }() + + // Half goroutines use Acquire + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < b.N/2; i++ { + sem.Acquire(ctx, time.Second) + sem.Release() + } + }() + + wg.Wait() +} +