From d3bcc30a8f6b945e44718cb420ac2061374f6346 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Tue, 11 Nov 2025 00:16:27 +0200 Subject: [PATCH] use sync.Cond --- autopipeline.go | 52 +++++++++++++++++++++---------------------------- 1 file changed, 22 insertions(+), 30 deletions(-) diff --git a/autopipeline.go b/autopipeline.go index 41e5bfc2..46e98a57 100644 --- a/autopipeline.go +++ b/autopipeline.go @@ -185,7 +185,7 @@ type AutoPipeliner struct { queueLen atomic.Int32 // Fast path check without lock // Flush control - flushCh chan struct{} // Signal to flush immediately + flushCond *sync.Cond // Condition variable to signal flusher // Concurrency control sem *internal.FastSemaphore // Semaphore for concurrent batch limit @@ -219,12 +219,15 @@ func NewAutoPipeliner(pipeliner cmdableClient, config *AutoPipelineConfig) *Auto ap := &AutoPipeliner{ pipeliner: pipeliner, config: config, - flushCh: make(chan struct{}, 1), sem: internal.NewFastSemaphore(int32(config.MaxConcurrentBatches)), ctx: ctx, cancel: cancel, } + // Initialize condition variable for flush signaling + // Use a separate mutex for the condition variable to avoid contention with queue operations + ap.flushCond = sync.NewCond(&sync.Mutex{}) + // Initialize cmdable to route all commands through processAndBlock ap.cmdable = ap.processAndBlock @@ -323,11 +326,8 @@ func (ap *AutoPipeliner) processWithQueuedCmd(ctx context.Context, cmd Cmder) *q ap.queueLen.Store(int32(queueLen)) ap.mu.Unlock() - // Always signal the flusher (non-blocking) - select { - case ap.flushCh <- struct{}{}: - default: - } + // Signal the flusher using condition variable + ap.flushCond.Signal() return qc } @@ -338,11 +338,8 @@ func (ap *AutoPipeliner) processWithQueuedCmd(ctx context.Context, cmd Cmder) *q ap.queueLen.Store(int32(queueLen)) ap.mu.Unlock() - // always signal the flusher (non-blocking) - select { - case ap.flushCh <- struct{}{}: - default: - } + // Signal the flusher using condition variable + ap.flushCond.Signal() return qc } @@ -360,6 +357,9 @@ func (ap *AutoPipeliner) Close() error { // Cancel context to stop flusher ap.cancel() + // Signal the flusher to wake up and check context + ap.flushCond.Signal() + // Wait for flusher to finish ap.wg.Wait() @@ -372,30 +372,22 @@ func (ap *AutoPipeliner) Close() error { // flusher is the background goroutine that flushes batches. func (ap *AutoPipeliner) flusher() { defer ap.wg.Done() + for { - // Wait for a command to arrive - select { - case <-ap.flushCh: - // Command arrived, continue - case <-ap.ctx.Done(): + // Wait for a command to arrive using condition variable + ap.flushCond.L.Lock() + for ap.Len() == 0 && ap.ctx.Err() == nil { + ap.flushCond.Wait() + } + ap.flushCond.L.Unlock() + + // Check if context is cancelled + if ap.ctx.Err() != nil { // Final flush before shutdown ap.flushBatchSlice() return } - // Drain any additional signals - for { - select { - case <-ap.flushCh: - if ap.Len() >= ap.config.MaxBatchSize { - goto drained - } - default: - goto drained - } - } - drained: - // Flush all pending commands for ap.Len() > 0 { select {