1
0
mirror of https://github.com/redis/go-redis.git synced 2025-12-02 06:22:31 +03:00

use simple approach of fifo waiters

This commit is contained in:
Nedyalko Dyakov
2025-11-10 14:41:04 +02:00
parent 4f612c1f63
commit 584c162736

View File

@@ -3,7 +3,6 @@ package internal
import ( import (
"context" "context"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@@ -17,10 +16,8 @@ var semTimers = sync.Pool{
// waiter represents a goroutine waiting for a token. // waiter represents a goroutine waiting for a token.
type waiter struct { type waiter struct {
ready chan struct{} ready chan struct{}
next *waiter next *waiter
cancelled atomic.Bool // Set to true if this waiter was cancelled/timed out
notified atomic.Bool // Set to true when Release() notifies this waiter
} }
// FastSemaphore is a counting semaphore implementation using a hybrid approach. // FastSemaphore is a counting semaphore implementation using a hybrid approach.
@@ -132,8 +129,16 @@ func (s *FastSemaphore) Acquire(ctx context.Context, timeout time.Duration, time
} }
s.lock.Lock() s.lock.Lock()
s.enqueue(w) // After acquiring lock, try the channel again (someone might have released)
s.lock.Unlock() select {
case <-s.tokens:
s.lock.Unlock()
return nil
default:
// Still empty, add to queue
s.enqueue(w)
s.lock.Unlock()
}
// Use timer pool to avoid allocation // Use timer pool to avoid allocation
timer := semTimers.Get().(*time.Timer) timer := semTimers.Get().(*time.Timer)
@@ -145,28 +150,19 @@ func (s *FastSemaphore) Acquire(ctx context.Context, timeout time.Duration, time
if !timer.Stop() { if !timer.Stop() {
<-timer.C <-timer.C
} }
// Mark as cancelled and try to claim ourselves // Try to remove ourselves from the queue
w.cancelled.Store(true) s.lock.Lock()
if w.notified.CompareAndSwap(false, true) { removed := s.removeWaiter(w)
// We successfully claimed ourselves, we're cancelling s.lock.Unlock()
// Try to remove from queue
s.lock.Lock()
removed := s.removeWaiter(w)
s.lock.Unlock()
if !removed { if removed {
// Already dequeued, wait for ready to be closed // We successfully removed ourselves, no token given
<-w.ready
}
// We claimed it, so no token was given to us
return ctx.Err()
} else {
// Release() already claimed us and is giving us a token
// Wait for the notification and then release the token
<-w.ready
s.releaseToPool()
return ctx.Err() return ctx.Err()
} }
// We were already dequeued and given a token, must return it
<-w.ready
s.Release()
return ctx.Err()
case <-w.ready: case <-w.ready:
// We were notified and got the token // We were notified and got the token
// Stop the timer and drain it if it already fired // Stop the timer and drain it if it already fired
@@ -176,28 +172,19 @@ func (s *FastSemaphore) Acquire(ctx context.Context, timeout time.Duration, time
// We have the token, just return // We have the token, just return
return nil return nil
case <-timer.C: case <-timer.C:
// Mark as cancelled and try to claim ourselves // Try to remove ourselves from the queue
w.cancelled.Store(true) s.lock.Lock()
if w.notified.CompareAndSwap(false, true) { removed := s.removeWaiter(w)
// We successfully claimed ourselves, we're cancelling s.lock.Unlock()
// Try to remove from queue
s.lock.Lock()
removed := s.removeWaiter(w)
s.lock.Unlock()
if !removed { if removed {
// Already dequeued, wait for ready to be closed // We successfully removed ourselves, no token given
<-w.ready
}
// We claimed it, so no token was given to us
return timeoutErr
} else {
// Release() already claimed us and is giving us a token
// Wait for the notification and then release the token
<-w.ready
s.releaseToPool()
return timeoutErr return timeoutErr
} }
// We were already dequeued and given a token, must return it
<-w.ready
s.Release()
return timeoutErr
} }
} }
@@ -258,76 +245,21 @@ func (s *FastSemaphore) AcquireBlocking() {
<-w.ready <-w.ready
} }
// releaseToPool releases a token back to the pool.
// This should be called when a waiter was notified but then cancelled/timed out.
// We need to pass the token to another waiter if any, otherwise put it back in the channel.
func (s *FastSemaphore) releaseToPool() {
// Try to give the token to a waiter first
for {
s.lock.Lock()
w := s.dequeue()
s.lock.Unlock()
if w == nil {
// No waiters, put the token back in the channel
// Use select to avoid blocking (should never block, but just in case)
select {
case s.tokens <- struct{}{}:
return
default:
// Channel is full - this should never happen!
// It means we have a logic error in token accounting
panic("semaphore: releaseToPool: channel is full")
}
}
// Try to claim this waiter by setting notified flag
// If the waiter is being cancelled concurrently, one of us will win
if !w.notified.CompareAndSwap(false, true) {
// Someone else (the waiter itself) already claimed it
// This means the waiter is cancelling, skip to next
close(w.ready) // Still need to close to unblock them
continue
}
// We successfully claimed the waiter, transfer the token
// Even if it was cancelled, we must give it the token because we claimed it
// The waiter will call releaseToPool() if it was cancelled
close(w.ready)
return
}
}
// Release releases a token back to the semaphore. // Release releases a token back to the semaphore.
// This wakes up the first waiting goroutine if any are blocked. // This wakes up the first waiting goroutine if any are blocked.
func (s *FastSemaphore) Release() { func (s *FastSemaphore) Release() {
// Try to give the token to a waiter first s.lock.Lock()
for { w := s.dequeue()
s.lock.Lock() if w == nil {
w := s.dequeue() // No waiters, put the token back in the channel
s.lock.Unlock() s.lock.Unlock()
s.tokens <- struct{}{}
if w == nil {
// No waiters, put the token back in the channel
s.tokens <- struct{}{}
return
}
// Try to claim this waiter by setting notified flag
// If the waiter is being cancelled concurrently, one of us will win
if !w.notified.CompareAndSwap(false, true) {
// Someone else (the waiter itself) already claimed it
// This means the waiter is cancelling, skip to next
close(w.ready) // Still need to close to unblock them
continue
}
// We successfully claimed the waiter, transfer the token
// Even if it was cancelled, we must give it the token because we claimed it
// The waiter will call releaseToPool() if it was cancelled
close(w.ready)
return return
} }
s.lock.Unlock()
// We have a waiter, give them the token
close(w.ready)
} }
// Len returns the current number of acquired tokens. // Len returns the current number of acquired tokens.