diff --git a/autopipeline.go b/autopipeline.go index f2fb9b9a..d5e3aa30 100644 --- a/autopipeline.go +++ b/autopipeline.go @@ -27,8 +27,8 @@ type AutoPipelineConfig struct { func DefaultAutoPipelineConfig() *AutoPipelineConfig { return &AutoPipelineConfig{ MaxBatchSize: 30, - FlushInterval: 10 * time.Millisecond, - MaxConcurrentBatches: 20, + FlushInterval: 10 * time.Microsecond, + MaxConcurrentBatches: 30, } } @@ -78,14 +78,13 @@ type AutoPipeliner struct { pipeliner pipelinerClient config *AutoPipelineConfig - // Command queue + // Command queue - hybrid approach for best performance mu sync.Mutex queue []*queuedCmd queueLen atomic.Int32 // Fast path check without lock // Flush control - flushTimer *time.Timer - flushCh chan struct{} // Signal to flush immediately + flushCh chan struct{} // Signal to flush immediately // Concurrency control sem chan struct{} // Semaphore for concurrent batch limit @@ -108,19 +107,13 @@ func NewAutoPipeliner(pipeliner pipelinerClient, config *AutoPipelineConfig) *Au ctx, cancel := context.WithCancel(context.Background()) ap := &AutoPipeliner{ - pipeliner: pipeliner, - config: config, - queue: make([]*queuedCmd, 0, config.MaxBatchSize), - flushTimer: time.NewTimer(config.FlushInterval), - flushCh: make(chan struct{}, 1), - sem: make(chan struct{}, config.MaxConcurrentBatches), - ctx: ctx, - cancel: cancel, - } - - // Stop the timer initially - if !ap.flushTimer.Stop() { - <-ap.flushTimer.C + pipeliner: pipeliner, + config: config, + queue: make([]*queuedCmd, 0, config.MaxBatchSize), + flushCh: make(chan struct{}, 1), + sem: make(chan struct{}, config.MaxConcurrentBatches), + ctx: ctx, + cancel: cancel, } // Start background flusher @@ -178,29 +171,36 @@ func (ap *AutoPipeliner) process(ctx context.Context, cmd Cmder) <-chan struct{} done: make(chan struct{}), } + // Fast path: try to acquire lock without blocking + if ap.mu.TryLock() { + ap.queue = append(ap.queue, qc) + queueLen := len(ap.queue) + ap.queueLen.Store(int32(queueLen)) + + // Trigger immediate flush if batch is full + shouldFlush := queueLen >= ap.config.MaxBatchSize + ap.mu.Unlock() + + if shouldFlush { + select { + case ap.flushCh <- struct{}{}: + default: + } + } + return qc.done + } + + // Slow path: lock is contended, wait for it ap.mu.Lock() ap.queue = append(ap.queue, qc) queueLen := len(ap.queue) ap.queueLen.Store(int32(queueLen)) - // Check if we should flush immediately + // Trigger immediate flush if batch is full shouldFlush := queueLen >= ap.config.MaxBatchSize - - // Start flush timer if this is the first command - if queueLen == 1 { - ap.flushTimer.Reset(ap.config.FlushInterval) - } - if queueLen > 1 { - cachedFlushInterval := ap.cachedFlushInterval.Load() - if cachedFlushInterval == 0 && ap.cachedFlushInterval.CompareAndSwap(cachedFlushInterval, 0) { - ap.config.FlushInterval = time.Duration(cachedFlushInterval) * time.Nanosecond - } - } - ap.mu.Unlock() if shouldFlush { - // Signal immediate flush (non-blocking) select { case ap.flushCh <- struct{}{}: default: @@ -251,6 +251,17 @@ func (ap *AutoPipeliner) Close() error { func (ap *AutoPipeliner) flusher() { defer ap.wg.Done() + // Adaptive delays: + // - Single command: flush almost immediately (1ns) to minimize latency + // - Multiple commands: wait a bit (10µs) to allow batching + const singleCmdDelay = 1 * time.Nanosecond + const batchDelay = 10 * time.Microsecond + + // Start with batch delay + timer := time.NewTimer(batchDelay) + defer timer.Stop() + currentDelay := batchDelay + for { select { case <-ap.ctx.Done(): @@ -258,21 +269,52 @@ func (ap *AutoPipeliner) flusher() { ap.flushBatch() return - case <-ap.flushTimer.C: - // Timer expired, flush if we have commands - if ap.queueLen.Load() > 0 { + case <-ap.flushCh: + // Immediate flush requested (batch full) + if !timer.Stop() { + select { + case <-timer.C: + default: + } + } + ap.flushBatch() + + // Reset timer based on remaining queue + qLen := ap.queueLen.Load() + if qLen == 1 { + currentDelay = singleCmdDelay + } else { + currentDelay = batchDelay + } + timer.Reset(currentDelay) + + case <-timer.C: + qLen := ap.queueLen.Load() + if qLen > 0 { ap.flushBatch() } - // Reset timer for next interval if queue is not empty - ap.mu.Lock() - if len(ap.queue) > 0 { - ap.flushTimer.Reset(ap.config.FlushInterval) - } - ap.mu.Unlock() - case <-ap.flushCh: - // Immediate flush requested - ap.flushBatch() + // Adaptive delay based on queue size after flush + qLen = ap.queueLen.Load() + var nextDelay time.Duration + if qLen == 1 { + // Single command waiting - flush very quickly + nextDelay = singleCmdDelay + } else if qLen > 1 { + // Multiple commands - use batch delay to accumulate more + nextDelay = batchDelay + } else { + // Empty queue - use batch delay + nextDelay = batchDelay + } + + // Only reset timer if delay changed + if nextDelay != currentDelay { + currentDelay = nextDelay + timer.Reset(nextDelay) + } else { + timer.Reset(currentDelay) + } } } } @@ -291,15 +333,6 @@ func (ap *AutoPipeliner) flushBatch() { queuedCmds := ap.queue ap.queue = make([]*queuedCmd, 0, ap.config.MaxBatchSize) ap.queueLen.Store(0) - - // Stop timer - if !ap.flushTimer.Stop() { - select { - case <-ap.flushTimer.C: - default: - } - } - ap.mu.Unlock() // Acquire semaphore (limit concurrent batches) @@ -320,20 +353,15 @@ func (ap *AutoPipeliner) flushBatch() { if len(queuedCmds) == 0 { return } + + // Fast path for single command if len(queuedCmds) == 1 { - if ap.cachedFlushInterval.CompareAndSwap(0, ap.config.FlushInterval.Nanoseconds()) { - ap.config.FlushInterval = time.Nanosecond - } _ = ap.pipeliner.Process(context.Background(), queuedCmds[0].cmd) close(queuedCmds[0].done) return } - cachedFlushInterval := ap.cachedFlushInterval.Load() - if cachedFlushInterval != 0 && ap.cachedFlushInterval.CompareAndSwap(cachedFlushInterval, 0) { - ap.config.FlushInterval = time.Duration(ap.cachedFlushInterval.Load()) * time.Nanosecond - } - // Execute pipeline + // Execute pipeline for multiple commands pipe := ap.pipeliner.Pipeline() for _, qc := range queuedCmds { _ = pipe.Process(context.Background(), qc.cmd)