diff --git a/internal/pool/export_test.go b/internal/pool/export_test.go index 20456b81..a38e33ea 100644 --- a/internal/pool/export_test.go +++ b/internal/pool/export_test.go @@ -20,5 +20,5 @@ func (p *ConnPool) CheckMinIdleConns() { } func (p *ConnPool) QueueLen() int { - return len(p.queue) + return int(p.semaphore.len()) } diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 17815083..6e49b729 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -132,7 +132,9 @@ type ConnPool struct { dialErrorsNum uint32 // atomic lastDialError atomic.Value - queue chan struct{} + // Fast atomic semaphore for connection limiting + // Replaces the old channel-based queue for better performance + semaphore *fastSemaphore connsMu sync.Mutex conns map[uint64]*Conn @@ -158,7 +160,7 @@ func NewConnPool(opt *Options) *ConnPool { p := &ConnPool{ cfg: opt, - queue: make(chan struct{}, opt.PoolSize), + semaphore: newFastSemaphore(opt.PoolSize), conns: make(map[uint64]*Conn), idleConns: make([]*Conn, 0, opt.PoolSize), } @@ -223,31 +225,32 @@ func (p *ConnPool) checkMinIdleConns() { // Only create idle connections if we haven't reached the total pool size limit // MinIdleConns should be a subset of PoolSize, not additional connections for p.poolSize.Load() < p.cfg.PoolSize && p.idleConnsLen.Load() < p.cfg.MinIdleConns { - select { - case p.queue <- struct{}{}: - p.poolSize.Add(1) - p.idleConnsLen.Add(1) - go func() { - defer func() { - if err := recover(); err != nil { - p.poolSize.Add(-1) - p.idleConnsLen.Add(-1) - - p.freeTurn() - internal.Logger.Printf(context.Background(), "addIdleConn panic: %+v", err) - } - }() - - err := p.addIdleConn() - if err != nil && err != ErrClosed { - p.poolSize.Add(-1) - p.idleConnsLen.Add(-1) - } - p.freeTurn() - }() - default: + // Try to acquire a semaphore token + if !p.semaphore.tryAcquire() { + // Semaphore is full, can't create more connections return } + + p.poolSize.Add(1) + p.idleConnsLen.Add(1) + go func() { + defer func() { + if err := recover(); err != nil { + p.poolSize.Add(-1) + p.idleConnsLen.Add(-1) + + p.freeTurn() + internal.Logger.Printf(context.Background(), "addIdleConn panic: %+v", err) + } + }() + + err := p.addIdleConn() + if err != nil && err != ErrClosed { + p.poolSize.Add(-1) + p.idleConnsLen.Add(-1) + } + p.freeTurn() + }() } } @@ -528,44 +531,35 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) { } func (p *ConnPool) waitTurn(ctx context.Context) error { + // Fast path: check context first select { case <-ctx.Done(): return ctx.Err() default: } - select { - case p.queue <- struct{}{}: + // Fast path: try to acquire without blocking + if p.semaphore.tryAcquire() { return nil - default: } + // Slow path: need to wait start := time.Now() - timer := timers.Get().(*time.Timer) - defer timers.Put(timer) - timer.Reset(p.cfg.PoolTimeout) + err := p.semaphore.acquire(ctx, p.cfg.PoolTimeout) - select { - case <-ctx.Done(): - if !timer.Stop() { - <-timer.C - } - return ctx.Err() - case p.queue <- struct{}{}: + if err == nil { + // Successfully acquired after waiting p.waitDurationNs.Add(time.Now().UnixNano() - start.UnixNano()) atomic.AddUint32(&p.stats.WaitCount, 1) - if !timer.Stop() { - <-timer.C - } - return nil - case <-timer.C: + } else if err == ErrPoolTimeout { atomic.AddUint32(&p.stats.Timeouts, 1) - return ErrPoolTimeout } + + return err } func (p *ConnPool) freeTurn() { - <-p.queue + p.semaphore.release() } func (p *ConnPool) popIdle() (*Conn, error) { diff --git a/internal/pool/semaphore.go b/internal/pool/semaphore.go new file mode 100644 index 00000000..e16ee733 --- /dev/null +++ b/internal/pool/semaphore.go @@ -0,0 +1,138 @@ +package pool + +import ( + "context" + "sync/atomic" + "time" +) + +// fastSemaphore is a high-performance semaphore implementation using atomic operations. +// It's optimized for the fast path (no blocking) while still supporting timeouts and context cancellation. +// +// Performance characteristics: +// - Fast path (no blocking): Single atomic CAS operation +// - Slow path (blocking): Falls back to channel-based waiting +// - Release: Single atomic decrement + optional channel notification +// +// This is significantly faster than a pure channel-based semaphore because: +// 1. The fast path avoids channel operations entirely (no scheduler involvement) +// 2. Atomic operations are much cheaper than channel send/receive +// 3. Better CPU cache behavior (no channel buffer allocation) +type fastSemaphore struct { + // Current number of acquired tokens (atomic) + count atomic.Int32 + + // Maximum number of tokens (capacity) + max int32 + + // Channel for blocking waiters + // Only used when fast path fails (semaphore is full) + waitCh chan struct{} +} + +// newFastSemaphore creates a new fast semaphore with the given capacity. +func newFastSemaphore(capacity int32) *fastSemaphore { + return &fastSemaphore{ + max: capacity, + waitCh: make(chan struct{}, capacity), + } +} + +// tryAcquire attempts to acquire a token without blocking. +// Returns true if successful, false if the semaphore is full. +// +// This is the fast path - just a single CAS operation. +func (s *fastSemaphore) tryAcquire() bool { + for { + current := s.count.Load() + if current >= s.max { + return false // Semaphore is full + } + if s.count.CompareAndSwap(current, current+1) { + return true // Successfully acquired + } + // CAS failed due to concurrent modification, retry + } +} + +// acquire acquires a token, blocking if necessary until one is available or the context is cancelled. +// Returns an error if the context is cancelled or the timeout expires. +// +// Performance optimization: +// 1. First try fast path (no blocking) +// 2. If that fails, fall back to channel-based waiting +func (s *fastSemaphore) acquire(ctx context.Context, timeout time.Duration) error { + // Fast path: try to acquire without blocking + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + // Try fast acquire first + if s.tryAcquire() { + return nil + } + + // Fast path failed, need to wait + // Use timer pool to avoid allocation + timer := timers.Get().(*time.Timer) + defer timers.Put(timer) + timer.Reset(timeout) + + start := time.Now() + + for { + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return ctx.Err() + + case <-s.waitCh: + // Someone released a token, try to acquire it + if s.tryAcquire() { + if !timer.Stop() { + <-timer.C + } + return nil + } + // Failed to acquire (race with another goroutine), continue waiting + + case <-timer.C: + return ErrPoolTimeout + } + + // Periodically check if we can acquire (handles race conditions) + if time.Since(start) > timeout { + return ErrPoolTimeout + } + } +} + +// release releases a token back to the semaphore. +// This wakes up one waiting goroutine if any are blocked. +func (s *fastSemaphore) release() { + s.count.Add(-1) + + // Try to wake up a waiter (non-blocking) + // If no one is waiting, this is a no-op + select { + case s.waitCh <- struct{}{}: + // Successfully notified a waiter + default: + // No waiters, that's fine + } +} + +// len returns the current number of acquired tokens. +func (s *fastSemaphore) len() int32 { + return s.count.Load() +} + +// cap returns the maximum capacity of the semaphore. +func (s *fastSemaphore) cap() int32 { + return s.max +} +