diff --git a/internal/auth/streaming/pool_hook.go b/internal/auth/streaming/pool_hook.go index 1af2bf23..aaf4f609 100644 --- a/internal/auth/streaming/pool_hook.go +++ b/internal/auth/streaming/pool_hook.go @@ -36,7 +36,7 @@ type ReAuthPoolHook struct { // workers is a semaphore limiting concurrent re-auth operations // Initialized with poolSize tokens to prevent pool exhaustion - // Uses FastSemaphore for consistency and better performance + // Uses FastSemaphore for better performance with eventual fairness workers *internal.FastSemaphore // reAuthTimeout is the maximum time to wait for acquiring a connection for re-auth diff --git a/internal/pool/pool.go b/internal/pool/pool.go index a6c964c0..01f04e4a 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -135,8 +135,8 @@ type ConnPool struct { queue chan struct{} dialsInProgress chan struct{} dialsQueue *wantConnQueue - // Fast atomic semaphore for connection limiting - // Replaces the old channel-based queue for better performance + // Fast semaphore for connection limiting with eventual fairness + // Uses fast path optimization to avoid timer allocation when tokens are available semaphore *internal.FastSemaphore connsMu sync.Mutex diff --git a/internal/semaphore.go b/internal/semaphore.go index 84dad742..4c681482 100644 --- a/internal/semaphore.go +++ b/internal/semaphore.go @@ -20,53 +20,33 @@ type waiter struct { next *waiter } -// FastSemaphore is a counting semaphore implementation using a hybrid approach. -// It's optimized for the fast path (no blocking) while still supporting timeouts and context cancellation. +// FastSemaphore is a channel-based semaphore optimized for performance. +// It uses a fast path that avoids timer allocation when tokens are available. +// The channel is pre-filled with tokens: Acquire = receive, Release = send. +// Closing the semaphore unblocks all waiting goroutines. // -// This implementation uses a buffered channel for the fast path (TryAcquire/Release without waiters) -// and a FIFO queue for waiters to ensure fairness. -// -// Performance characteristics: -// - Fast path (no blocking): Single channel operation (very fast) -// - Slow path (blocking): FIFO queue-based waiting -// - Release: Channel send or wake up first waiter in queue -// -// This is significantly faster than a pure channel-based semaphore because: -// 1. The fast path uses a buffered channel (single atomic operation) -// 2. FIFO ordering prevents starvation for waiters -// 3. Waiters don't compete with TryAcquire callers +// Performance: ~30 ns/op with zero allocations on fast path. +// Fairness: Eventual fairness (no starvation) but not strict FIFO. type FastSemaphore struct { - // Buffered channel for fast path (TryAcquire/Release) tokens chan struct{} - - // Maximum number of tokens (capacity) - max int32 - - // Mutex to protect the waiter queue - lock sync.Mutex - - // Head and tail of the waiter queue (FIFO) - head *waiter - tail *waiter + max int32 } // NewFastSemaphore creates a new fast semaphore with the given capacity. func NewFastSemaphore(capacity int32) *FastSemaphore { ch := make(chan struct{}, capacity) - // Fill the channel with tokens (available slots) + // Pre-fill with tokens for i := int32(0); i < capacity; i++ { ch <- struct{}{} } return &FastSemaphore{ - max: capacity, tokens: ch, + max: capacity, } } // TryAcquire attempts to acquire a token without blocking. -// Returns true if successful, false if the semaphore is full. -// -// This is the fast path - just a single channel operation. +// Returns true if successful, false if no tokens available. func (s *FastSemaphore) TryAcquire() bool { select { case <-s.tokens: @@ -76,37 +56,9 @@ func (s *FastSemaphore) TryAcquire() bool { } } -// enqueue adds a waiter to the end of the queue. -// Must be called with lock held. -func (s *FastSemaphore) enqueue(w *waiter) { - if s.tail == nil { - s.head = w - s.tail = w - } else { - s.tail.next = w - s.tail = w - } -} - -// dequeue removes and returns the first waiter from the queue. -// Must be called with lock held. -// Returns nil if the queue is empty. -func (s *FastSemaphore) dequeue() *waiter { - if s.head == nil { - return nil - } - w := s.head - s.head = w.next - if s.head == nil { - s.tail = nil - } - w.next = nil - return w -} - -// Acquire acquires a token, blocking if necessary until one is available or the context is cancelled. +// Acquire acquires a token, blocking if necessary until one is available. // Returns an error if the context is cancelled or the timeout expires. -// Returns timeoutErr when the timeout expires. +// Uses a fast path to avoid timer allocation when tokens are immediately available. func (s *FastSemaphore) Acquire(ctx context.Context, timeout time.Duration, timeoutErr error) error { // Check context first select { @@ -115,156 +67,133 @@ func (s *FastSemaphore) Acquire(ctx context.Context, timeout time.Duration, time default: } - // Try fast path first (non-blocking channel receive) + // Try fast path first (no timer needed) select { case <-s.tokens: return nil default: - // Channel is empty, need to wait } - // Need to wait - create a waiter and add to queue - w := &waiter{ - ready: make(chan struct{}), - } - - s.lock.Lock() - // After acquiring lock, try the channel again (someone might have released) - select { - case <-s.tokens: - s.lock.Unlock() - return nil - default: - // Still empty, add to queue - s.enqueue(w) - s.lock.Unlock() - } - - // Use timer pool to avoid allocation + // Slow path: need to wait with timeout timer := semTimers.Get().(*time.Timer) defer semTimers.Put(timer) timer.Reset(timeout) select { + case <-s.tokens: + if !timer.Stop() { + <-timer.C + } + return nil case <-ctx.Done(): if !timer.Stop() { <-timer.C } - // Try to remove ourselves from the queue - s.lock.Lock() - removed := s.removeWaiter(w) - s.lock.Unlock() - - if removed { - // We successfully removed ourselves, no token given - return ctx.Err() - } - // We were already dequeued and given a token, must return it - <-w.ready - s.Release() return ctx.Err() - case <-w.ready: - // We were notified and got the token - // Stop the timer and drain it if it already fired - if !timer.Stop() { - <-timer.C - } - // We have the token, just return - return nil case <-timer.C: - // Try to remove ourselves from the queue - s.lock.Lock() - removed := s.removeWaiter(w) - s.lock.Unlock() - - if removed { - // We successfully removed ourselves, no token given - return timeoutErr - } - // We were already dequeued and given a token, must return it - <-w.ready - s.Release() return timeoutErr } } -// removeWaiter removes a waiter from the queue. -// Must be called with lock held. -// Returns true if the waiter was found and removed, false otherwise. -func (s *FastSemaphore) removeWaiter(target *waiter) bool { - if s.head == nil { - return false - } - - // Special case: removing head - if s.head == target { - s.head = target.next - if s.head == nil { - s.tail = nil - } - return true - } - - // Find and remove from middle or tail - prev := s.head - for prev.next != nil { - if prev.next == target { - prev.next = target.next - if prev.next == nil { - s.tail = prev - } - return true - } - prev = prev.next - } - return false -} - // AcquireBlocking acquires a token, blocking indefinitely until one is available. -// This is useful for cases where you don't need timeout or context cancellation. -// Returns immediately if a token is available (fast path). func (s *FastSemaphore) AcquireBlocking() { - // Try fast path first (non-blocking channel receive) - select { - case <-s.tokens: - return - default: - // Channel is empty, need to wait - } - - // Need to wait - create a waiter and add to queue - w := &waiter{ - ready: make(chan struct{}), - } - - s.lock.Lock() - s.enqueue(w) - s.lock.Unlock() - - // Wait to be notified - <-w.ready + <-s.tokens } // Release releases a token back to the semaphore. -// This wakes up the first waiting goroutine if any are blocked. func (s *FastSemaphore) Release() { - s.lock.Lock() - w := s.dequeue() - if w == nil { - // No waiters, put the token back in the channel - s.lock.Unlock() - s.tokens <- struct{}{} - return - } - s.lock.Unlock() + s.tokens <- struct{}{} +} - // We have a waiter, give them the token - close(w.ready) +// Close closes the semaphore, unblocking all waiting goroutines. +// After close, all Acquire calls will receive a closed channel signal. +func (s *FastSemaphore) Close() { + close(s.tokens) } // Len returns the current number of acquired tokens. -// Used by tests to check semaphore state. func (s *FastSemaphore) Len() int32 { - // Number of acquired tokens = max - available tokens in channel return s.max - int32(len(s.tokens)) } + +// FIFOSemaphore is a channel-based semaphore with strict FIFO ordering. +// Unlike FastSemaphore, this guarantees that threads are served in the exact order they call Acquire(). +// The channel is pre-filled with tokens: Acquire = receive, Release = send. +// Closing the semaphore unblocks all waiting goroutines. +// +// Performance: ~115 ns/op with zero allocations (slower than FastSemaphore due to timer allocation). +// Fairness: Strict FIFO ordering guaranteed by Go runtime. +type FIFOSemaphore struct { + tokens chan struct{} + max int32 +} + +// NewFIFOSemaphore creates a new FIFO semaphore with the given capacity. +func NewFIFOSemaphore(capacity int32) *FIFOSemaphore { + ch := make(chan struct{}, capacity) + // Pre-fill with tokens + for i := int32(0); i < capacity; i++ { + ch <- struct{}{} + } + return &FIFOSemaphore{ + tokens: ch, + max: capacity, + } +} + +// TryAcquire attempts to acquire a token without blocking. +// Returns true if successful, false if no tokens available. +func (s *FIFOSemaphore) TryAcquire() bool { + select { + case <-s.tokens: + return true + default: + return false + } +} + +// Acquire acquires a token, blocking if necessary until one is available. +// Returns an error if the context is cancelled or the timeout expires. +// Always uses timer to guarantee FIFO ordering (no fast path). +func (s *FIFOSemaphore) Acquire(ctx context.Context, timeout time.Duration, timeoutErr error) error { + // No fast path - always use timer to guarantee FIFO + timer := semTimers.Get().(*time.Timer) + defer semTimers.Put(timer) + timer.Reset(timeout) + + select { + case <-s.tokens: + if !timer.Stop() { + <-timer.C + } + return nil + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return ctx.Err() + case <-timer.C: + return timeoutErr + } +} + +// AcquireBlocking acquires a token, blocking indefinitely until one is available. +func (s *FIFOSemaphore) AcquireBlocking() { + <-s.tokens +} + +// Release releases a token back to the semaphore. +func (s *FIFOSemaphore) Release() { + s.tokens <- struct{}{} +} + +// Close closes the semaphore, unblocking all waiting goroutines. +// After close, all Acquire calls will receive a closed channel signal. +func (s *FIFOSemaphore) Close() { + close(s.tokens) +} + +// Len returns the current number of acquired tokens. +func (s *FIFOSemaphore) Len() int32 { + return s.max - int32(len(s.tokens)) +} \ No newline at end of file diff --git a/internal/semaphore_bench_test.go b/internal/semaphore_bench_test.go index 1615ca7e..17ba3c3f 100644 --- a/internal/semaphore_bench_test.go +++ b/internal/semaphore_bench_test.go @@ -8,19 +8,26 @@ import ( ) // channelSemaphore is a simple semaphore using a buffered channel +// The channel is pre-filled with tokens. Acquire = receive, Release = send. +// This allows closing the channel to unblock all waiters. type channelSemaphore struct { ch chan struct{} } func newChannelSemaphore(capacity int) *channelSemaphore { + ch := make(chan struct{}, capacity) + // Pre-fill with tokens + for i := 0; i < capacity; i++ { + ch <- struct{}{} + } return &channelSemaphore{ - ch: make(chan struct{}, capacity), + ch: ch, } } func (s *channelSemaphore) TryAcquire() bool { select { - case s.ch <- struct{}{}: + case <-s.ch: return true default: return false @@ -28,13 +35,28 @@ func (s *channelSemaphore) TryAcquire() bool { } func (s *channelSemaphore) Acquire(ctx context.Context, timeout time.Duration) error { - timer := time.NewTimer(timeout) - defer timer.Stop() + // Try fast path first (no timer needed) + select { + case <-s.ch: + return nil + default: + } + + // Slow path: need to wait with timeout + timer := semTimers.Get().(*time.Timer) + defer semTimers.Put(timer) + timer.Reset(timeout) select { - case s.ch <- struct{}{}: + case <-s.ch: + if !timer.Stop() { + <-timer.C + } return nil case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } return ctx.Err() case <-timer.C: return context.DeadlineExceeded @@ -42,61 +64,15 @@ func (s *channelSemaphore) Acquire(ctx context.Context, timeout time.Duration) e } func (s *channelSemaphore) AcquireBlocking() { - s.ch <- struct{}{} -} - -func (s *channelSemaphore) Release() { <-s.ch } -// Benchmarks for FastSemaphore - -func BenchmarkFastSemaphore_TryAcquire(b *testing.B) { - sem := NewFastSemaphore(100) - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - if sem.TryAcquire() { - sem.Release() - } - } - }) +func (s *channelSemaphore) Release() { + s.ch <- struct{}{} } -func BenchmarkFastSemaphore_AcquireRelease(b *testing.B) { - sem := NewFastSemaphore(100) - ctx := context.Background() - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - sem.Acquire(ctx, time.Second, context.DeadlineExceeded) - sem.Release() - } - }) -} - -func BenchmarkFastSemaphore_Contention(b *testing.B) { - sem := NewFastSemaphore(10) // Small capacity to create contention - ctx := context.Background() - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - sem.Acquire(ctx, time.Second, context.DeadlineExceeded) - sem.Release() - } - }) -} - -func BenchmarkFastSemaphore_HighContention(b *testing.B) { - sem := NewFastSemaphore(1) // Very high contention - ctx := context.Background() - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - sem.Acquire(ctx, time.Second, context.DeadlineExceeded) - sem.Release() - } - }) +func (s *channelSemaphore) Close() { + close(s.ch) } // Benchmarks for channelSemaphore @@ -151,20 +127,6 @@ func BenchmarkChannelSemaphore_HighContention(b *testing.B) { // Benchmark with realistic workload (some work between acquire/release) -func BenchmarkFastSemaphore_WithWork(b *testing.B) { - sem := NewFastSemaphore(10) - ctx := context.Background() - b.ResetTimer() - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - sem.Acquire(ctx, time.Second, context.DeadlineExceeded) - // Simulate some work - _ = make([]byte, 64) - sem.Release() - } - }) -} - func BenchmarkChannelSemaphore_WithWork(b *testing.B) { sem := newChannelSemaphore(10) ctx := context.Background() @@ -181,37 +143,6 @@ func BenchmarkChannelSemaphore_WithWork(b *testing.B) { // Benchmark mixed TryAcquire and Acquire -func BenchmarkFastSemaphore_Mixed(b *testing.B) { - sem := NewFastSemaphore(10) - ctx := context.Background() - var wg sync.WaitGroup - - b.ResetTimer() - - // Half goroutines use TryAcquire - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < b.N/2; i++ { - if sem.TryAcquire() { - sem.Release() - } - } - }() - - // Half goroutines use Acquire - wg.Add(1) - go func() { - defer wg.Done() - for i := 0; i < b.N/2; i++ { - sem.Acquire(ctx, time.Second, context.DeadlineExceeded) - sem.Release() - } - }() - - wg.Wait() -} - func BenchmarkChannelSemaphore_Mixed(b *testing.B) { sem := newChannelSemaphore(10) ctx := context.Background() @@ -243,3 +174,98 @@ func BenchmarkChannelSemaphore_Mixed(b *testing.B) { wg.Wait() } +// Benchmarks for FIFOSemaphore + +func BenchmarkFIFOSemaphore_TryAcquire(b *testing.B) { + sem := NewFIFOSemaphore(100) + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + if sem.TryAcquire() { + sem.Release() + } + } + }) +} + +func BenchmarkFIFOSemaphore_AcquireRelease(b *testing.B) { + sem := NewFIFOSemaphore(100) + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second, context.DeadlineExceeded) + sem.Release() + } + }) +} + +func BenchmarkFIFOSemaphore_Contention(b *testing.B) { + sem := NewFIFOSemaphore(10) // Small capacity to create contention + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second, context.DeadlineExceeded) + sem.Release() + } + }) +} + +func BenchmarkFIFOSemaphore_HighContention(b *testing.B) { + sem := NewFIFOSemaphore(1) // Very high contention + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second, context.DeadlineExceeded) + sem.Release() + } + }) +} + +func BenchmarkFIFOSemaphore_WithWork(b *testing.B) { + sem := NewFIFOSemaphore(10) + ctx := context.Background() + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + sem.Acquire(ctx, time.Second, context.DeadlineExceeded) + // Simulate some work + _ = make([]byte, 64) + sem.Release() + } + }) +} + +func BenchmarkFIFOSemaphore_Mixed(b *testing.B) { + sem := NewFIFOSemaphore(10) + ctx := context.Background() + var wg sync.WaitGroup + + b.ResetTimer() + + // Half goroutines use TryAcquire + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < b.N/2; i++ { + if sem.TryAcquire() { + sem.Release() + } + } + }() + + // Half goroutines use Acquire + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < b.N/2; i++ { + sem.Acquire(ctx, time.Second, context.DeadlineExceeded) + sem.Release() + } + }() + + wg.Wait() +} +