1
0
mirror of https://github.com/redis/go-redis.git synced 2025-12-02 06:22:31 +03:00

should properly notify the waiters

- this way a waiter that timesout at the same time
a releaser is releasing, won't throw token. the releaser
will fail to notify and will pick another waiter.

this hybrid approach should be faster than channels and maintains FIFO
This commit is contained in:
Nedyalko Dyakov
2025-11-10 13:47:41 +02:00
parent e418737532
commit dc319c0f7e
2 changed files with 336 additions and 59 deletions

View File

@@ -20,26 +20,27 @@ type waiter struct {
ready chan struct{}
next *waiter
cancelled atomic.Bool // Set to true if this waiter was cancelled/timed out
notified atomic.Bool // Set to true when Release() notifies this waiter
}
// FastSemaphore is a counting semaphore implementation using atomic operations.
// FastSemaphore is a counting semaphore implementation using a hybrid approach.
// It's optimized for the fast path (no blocking) while still supporting timeouts and context cancellation.
//
// This implementation maintains FIFO ordering of waiters using a linked list queue.
// When a token is released, the first waiter in the queue is notified.
// This implementation uses a buffered channel for the fast path (TryAcquire/Release without waiters)
// and a FIFO queue for waiters to ensure fairness.
//
// Performance characteristics:
// - Fast path (no blocking): Single atomic CAS operation
// - Fast path (no blocking): Single channel operation (very fast)
// - Slow path (blocking): FIFO queue-based waiting
// - Release: Single atomic decrement + wake up first waiter in queue
// - Release: Channel send or wake up first waiter in queue
//
// This is significantly faster than a pure channel-based semaphore because:
// 1. The fast path avoids channel operations entirely (no scheduler involvement)
// 2. Atomic operations are much cheaper than channel send/receive
// 3. FIFO ordering prevents starvation
// 1. The fast path uses a buffered channel (single atomic operation)
// 2. FIFO ordering prevents starvation for waiters
// 3. Waiters don't compete with TryAcquire callers
type FastSemaphore struct {
// Current number of acquired tokens (atomic)
count atomic.Int32
// Buffered channel for fast path (TryAcquire/Release)
tokens chan struct{}
// Maximum number of tokens (capacity)
max int32
@@ -54,25 +55,27 @@ type FastSemaphore struct {
// NewFastSemaphore creates a new fast semaphore with the given capacity.
func NewFastSemaphore(capacity int32) *FastSemaphore {
ch := make(chan struct{}, capacity)
// Fill the channel with tokens (available slots)
for i := int32(0); i < capacity; i++ {
ch <- struct{}{}
}
return &FastSemaphore{
max: capacity,
tokens: ch,
}
}
// TryAcquire attempts to acquire a token without blocking.
// Returns true if successful, false if the semaphore is full.
//
// This is the fast path - just a single CAS operation.
// This is the fast path - just a single channel operation.
func (s *FastSemaphore) TryAcquire() bool {
for {
current := s.count.Load()
if current >= s.max {
return false // Semaphore is full
}
if s.count.CompareAndSwap(current, current+1) {
return true // Successfully acquired
}
// CAS failed due to concurrent modification, retry
select {
case <-s.tokens:
return true
default:
return false
}
}
@@ -126,9 +129,12 @@ func (s *FastSemaphore) Acquire(ctx context.Context, timeout time.Duration, time
default:
}
// Try fast path first
if s.TryAcquire() {
// Try fast path first (non-blocking channel receive)
select {
case <-s.tokens:
return nil
default:
// Channel is empty, need to wait
}
// Need to wait - create a waiter and add to queue
@@ -150,47 +156,60 @@ func (s *FastSemaphore) Acquire(ctx context.Context, timeout time.Duration, time
if !timer.Stop() {
<-timer.C
}
// Mark ourselves as cancelled
// Mark as cancelled and try to claim ourselves
w.cancelled.Store(true)
// Try to remove ourselves from the queue
if w.notified.CompareAndSwap(false, true) {
// We successfully claimed ourselves, we're cancelling
// Try to remove from queue
s.lock.Lock()
removed := s.removeWaiter(w)
s.lock.Unlock()
if !removed {
// We were already dequeued and notified
// Already dequeued, wait for ready to be closed
<-w.ready
}
// We claimed it, so no token was given to us
return ctx.Err()
} else {
// Release() already claimed us and is giving us a token
// Wait for the notification and then release the token
<-w.ready
s.releaseToPool()
}
return ctx.Err()
}
case <-w.ready:
// We were notified, check if we were cancelled
// We were notified and got the token
// Stop the timer and drain it if it already fired
if !timer.Stop() {
<-timer.C
}
if w.cancelled.Load() {
// We were cancelled while being notified, release the token
s.releaseToPool()
return ctx.Err()
}
// We have the token, just return
return nil
case <-timer.C:
// Mark ourselves as cancelled
// Mark as cancelled and try to claim ourselves
w.cancelled.Store(true)
// Try to remove ourselves from the queue
if w.notified.CompareAndSwap(false, true) {
// We successfully claimed ourselves, we're cancelling
// Try to remove from queue
s.lock.Lock()
removed := s.removeWaiter(w)
s.lock.Unlock()
if !removed {
// We were already dequeued and notified
// Already dequeued, wait for ready to be closed
<-w.ready
}
// We claimed it, so no token was given to us
return timeoutErr
} else {
// Release() already claimed us and is giving us a token
// Wait for the notification and then release the token
<-w.ready
s.releaseToPool()
}
return timeoutErr
}
}
}
// removeWaiter removes a waiter from the queue.
@@ -229,9 +248,12 @@ func (s *FastSemaphore) removeWaiter(target *waiter) bool {
// This is useful for cases where you don't need timeout or context cancellation.
// Returns immediately if a token is available (fast path).
func (s *FastSemaphore) AcquireBlocking() {
// Try fast path first
if s.TryAcquire() {
// Try fast path first (non-blocking channel receive)
select {
case <-s.tokens:
return
default:
// Channel is empty, need to wait
}
// Need to wait - create a waiter and add to queue
@@ -249,7 +271,7 @@ func (s *FastSemaphore) AcquireBlocking() {
// releaseToPool releases a token back to the pool.
// This should be called when a waiter was notified but then cancelled/timed out.
// We need to pass the token to another waiter if any, otherwise decrement the counter.
// We need to pass the token to another waiter if any, otherwise put it back in the channel.
func (s *FastSemaphore) releaseToPool() {
s.lock.Lock()
w := s.dequeue()
@@ -259,22 +281,23 @@ func (s *FastSemaphore) releaseToPool() {
// Transfer the token to another waiter
close(w.ready)
} else {
// No waiters, decrement the counter to free the slot
s.count.Add(-1)
// No waiters, put the token back in the channel
s.tokens <- struct{}{}
}
}
// Release releases a token back to the semaphore.
// This wakes up the first waiting goroutine if any are blocked.
func (s *FastSemaphore) Release() {
// Try to give the token to a waiter first
for {
s.lock.Lock()
w := s.dequeue()
s.lock.Unlock()
if w == nil {
// No waiters, decrement the counter to free the slot
s.count.Add(-1)
// No waiters, put the token back in the channel
s.tokens <- struct{}{}
return
}
@@ -286,8 +309,16 @@ func (s *FastSemaphore) Release() {
continue
}
// Transfer the token directly to the waiter
// Don't decrement the counter - the waiter takes over this slot
// Try to claim this waiter by setting notified flag
// If the waiter is being cancelled concurrently, one of us will win
if !w.notified.CompareAndSwap(false, true) {
// Someone else (the waiter itself) already claimed it
// This means the waiter is cancelling, skip to next
close(w.ready) // Still need to close to unblock them
continue
}
// We successfully claimed the waiter, transfer the token
close(w.ready)
return
}
@@ -296,5 +327,6 @@ func (s *FastSemaphore) Release() {
// Len returns the current number of acquired tokens.
// Used by tests to check semaphore state.
func (s *FastSemaphore) Len() int32 {
return s.count.Load()
// Number of acquired tokens = max - available tokens in channel
return s.max - int32(len(s.tokens))
}

View File

@@ -0,0 +1,245 @@
package internal
import (
"context"
"sync"
"testing"
"time"
)
// channelSemaphore is a simple semaphore using a buffered channel
type channelSemaphore struct {
ch chan struct{}
}
func newChannelSemaphore(capacity int) *channelSemaphore {
return &channelSemaphore{
ch: make(chan struct{}, capacity),
}
}
func (s *channelSemaphore) TryAcquire() bool {
select {
case s.ch <- struct{}{}:
return true
default:
return false
}
}
func (s *channelSemaphore) Acquire(ctx context.Context, timeout time.Duration) error {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case s.ch <- struct{}{}:
return nil
case <-ctx.Done():
return ctx.Err()
case <-timer.C:
return context.DeadlineExceeded
}
}
func (s *channelSemaphore) AcquireBlocking() {
s.ch <- struct{}{}
}
func (s *channelSemaphore) Release() {
<-s.ch
}
// Benchmarks for FastSemaphore
func BenchmarkFastSemaphore_TryAcquire(b *testing.B) {
sem := NewFastSemaphore(100)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if sem.TryAcquire() {
sem.Release()
}
}
})
}
func BenchmarkFastSemaphore_AcquireRelease(b *testing.B) {
sem := NewFastSemaphore(100)
ctx := context.Background()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
sem.Acquire(ctx, time.Second, context.DeadlineExceeded)
sem.Release()
}
})
}
func BenchmarkFastSemaphore_Contention(b *testing.B) {
sem := NewFastSemaphore(10) // Small capacity to create contention
ctx := context.Background()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
sem.Acquire(ctx, time.Second, context.DeadlineExceeded)
sem.Release()
}
})
}
func BenchmarkFastSemaphore_HighContention(b *testing.B) {
sem := NewFastSemaphore(1) // Very high contention
ctx := context.Background()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
sem.Acquire(ctx, time.Second, context.DeadlineExceeded)
sem.Release()
}
})
}
// Benchmarks for channelSemaphore
func BenchmarkChannelSemaphore_TryAcquire(b *testing.B) {
sem := newChannelSemaphore(100)
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
if sem.TryAcquire() {
sem.Release()
}
}
})
}
func BenchmarkChannelSemaphore_AcquireRelease(b *testing.B) {
sem := newChannelSemaphore(100)
ctx := context.Background()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
sem.Acquire(ctx, time.Second)
sem.Release()
}
})
}
func BenchmarkChannelSemaphore_Contention(b *testing.B) {
sem := newChannelSemaphore(10) // Small capacity to create contention
ctx := context.Background()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
sem.Acquire(ctx, time.Second)
sem.Release()
}
})
}
func BenchmarkChannelSemaphore_HighContention(b *testing.B) {
sem := newChannelSemaphore(1) // Very high contention
ctx := context.Background()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
sem.Acquire(ctx, time.Second)
sem.Release()
}
})
}
// Benchmark with realistic workload (some work between acquire/release)
func BenchmarkFastSemaphore_WithWork(b *testing.B) {
sem := NewFastSemaphore(10)
ctx := context.Background()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
sem.Acquire(ctx, time.Second, context.DeadlineExceeded)
// Simulate some work
_ = make([]byte, 64)
sem.Release()
}
})
}
func BenchmarkChannelSemaphore_WithWork(b *testing.B) {
sem := newChannelSemaphore(10)
ctx := context.Background()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
sem.Acquire(ctx, time.Second)
// Simulate some work
_ = make([]byte, 64)
sem.Release()
}
})
}
// Benchmark mixed TryAcquire and Acquire
func BenchmarkFastSemaphore_Mixed(b *testing.B) {
sem := NewFastSemaphore(10)
ctx := context.Background()
var wg sync.WaitGroup
b.ResetTimer()
// Half goroutines use TryAcquire
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < b.N/2; i++ {
if sem.TryAcquire() {
sem.Release()
}
}
}()
// Half goroutines use Acquire
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < b.N/2; i++ {
sem.Acquire(ctx, time.Second, context.DeadlineExceeded)
sem.Release()
}
}()
wg.Wait()
}
func BenchmarkChannelSemaphore_Mixed(b *testing.B) {
sem := newChannelSemaphore(10)
ctx := context.Background()
var wg sync.WaitGroup
b.ResetTimer()
// Half goroutines use TryAcquire
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < b.N/2; i++ {
if sem.TryAcquire() {
sem.Release()
}
}
}()
// Half goroutines use Acquire
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < b.N/2; i++ {
sem.Acquire(ctx, time.Second)
sem.Release()
}
}()
wg.Wait()
}