diff --git a/autopipeline.go b/autopipeline.go index 96e64b56..1786f4f6 100644 --- a/autopipeline.go +++ b/autopipeline.go @@ -20,21 +20,6 @@ type AutoPipelineConfig struct { // Default: 10 MaxConcurrentBatches int - // UseRingBuffer enables the high-performance ring buffer queue. - // When enabled, uses a pre-allocated ring buffer with lock-free enqueue - // instead of the slice-based queue. This provides: - // - 6x faster enqueue operations - // - 100% reduction in allocations during enqueue - // - Better performance under high concurrency - // Default: true (enabled) - UseRingBuffer bool - - // RingBufferSize is the size of the ring buffer queue. - // Only used when UseRingBuffer is true. - // Must be a power of 2 for optimal performance (will be rounded up if not). - // Default: 1024 - RingBufferSize int - // MaxFlushDelay is the maximum delay after flushing before checking for more commands. // A small delay (e.g., 100μs) can significantly reduce CPU usage by allowing // more commands to batch together, at the cost of slightly higher latency. @@ -55,8 +40,6 @@ func DefaultAutoPipelineConfig() *AutoPipelineConfig { return &AutoPipelineConfig{ MaxBatchSize: 50, MaxConcurrentBatches: 10, - UseRingBuffer: true, // Enable ring buffer by default - RingBufferSize: 1024, MaxFlushDelay: 0, // No delay by default (lowest latency) } } @@ -184,11 +167,10 @@ type AutoPipeliner struct { pipeliner cmdableClient config *AutoPipelineConfig - // Command queue - either slice-based or ring buffer + // Command queue mu sync.Mutex - queue []*queuedCmd // Slice-based queue (legacy) - ring *autoPipelineRing // Ring buffer queue (high-performance) - queueLen atomic.Int32 // Fast path check without lock + queue []*queuedCmd // Slice-based queue + queueLen atomic.Int32 // Fast path check without lock // Flush control flushCh chan struct{} // Signal to flush immediately @@ -233,12 +215,8 @@ func NewAutoPipeliner(pipeliner cmdableClient, config *AutoPipelineConfig) *Auto // Initialize cmdable to route all commands through processAndBlock ap.cmdable = ap.processAndBlock - // Initialize queue based on configuration - if config.UseRingBuffer { - ap.ring = newAutoPipelineRing(config.RingBufferSize) - } else { - ap.queue = getQueueSlice(config.MaxBatchSize) - } + // Initialize queue + ap.queue = getQueueSlice(config.MaxBatchSize) // Start background flusher ap.wg.Add(1) @@ -314,12 +292,6 @@ var closedQueuedCmd = &queuedCmd{ done: closedChan, } -// ringQueuedCmd is a queuedCmd wrapper for ring buffer (not pooled) -type ringQueuedCmd struct { - cmd Cmder - done <-chan struct{} -} - // processWithQueuedCmd is the internal method that queues a command and returns the queuedCmd. // The caller is responsible for returning the queuedCmd to the pool after use. func (ap *AutoPipeliner) processWithQueuedCmd(ctx context.Context, cmd Cmder) *queuedCmd { @@ -328,19 +300,6 @@ func (ap *AutoPipeliner) processWithQueuedCmd(ctx context.Context, cmd Cmder) *q return closedQueuedCmd } - // Use ring buffer if enabled - if ap.config.UseRingBuffer { - done := ap.ring.putOne(cmd) - // putOne will signal the flusher via condition variable if needed - // For ring buffer, we create a simple wrapper (not pooled) - // The done channel is managed by the ring buffer - return &queuedCmd{ - cmd: cmd, - done: done, - } - } - - // Legacy slice-based queue // Get queued command from pool qc := getQueuedCmd(cmd) @@ -388,11 +347,6 @@ func (ap *AutoPipeliner) Close() error { // Cancel context to stop flusher ap.cancel() - // Wake up flusher if it's waiting - if ap.config.UseRingBuffer { - ap.ring.wakeAll() - } - // Wait for flusher to finish ap.wg.Wait() @@ -402,19 +356,6 @@ func (ap *AutoPipeliner) Close() error { // flusher is the background goroutine that flushes batches. func (ap *AutoPipeliner) flusher() { defer ap.wg.Done() - - if !ap.config.UseRingBuffer { - // Legacy slice-based flusher - ap.flusherSlice() - return - } - - // Ring buffer flusher - ap.flusherRing() -} - -// flusherSlice is the legacy slice-based flusher. -func (ap *AutoPipeliner) flusherSlice() { for { // Wait for a command to arrive select { @@ -457,122 +398,21 @@ func (ap *AutoPipeliner) flusherSlice() { } } -// flusherRing is the ring buffer flusher. -func (ap *AutoPipeliner) flusherRing() { - var ( - cmds = make([]Cmder, 0, ap.config.MaxBatchSize) - doneChans = make([]chan struct{}, 0, ap.config.MaxBatchSize) - positions = make([]uint32, 0, ap.config.MaxBatchSize) - ) - - for { - // Try to get next command (non-blocking) - cmd, done, pos := ap.ring.nextWriteCmd() - - if cmd == nil { - // No command available - // If we have buffered commands, execute them first - if len(cmds) > 0 { - ap.executeBatch(cmds, doneChans, positions) - cmds = cmds[:0] - doneChans = doneChans[:0] - positions = positions[:0] - } - - // Check for shutdown before blocking - select { - case <-ap.ctx.Done(): - return - default: - } - - // Wait for next command (blocking) - // This will be woken up by wakeAll() during shutdown - cmd, done, pos = ap.ring.waitForWrite() - - // If nil, ring is closed - if cmd == nil { - return - } - } - - // Add command to batch - cmds = append(cmds, cmd) - doneChans = append(doneChans, done) - positions = append(positions, pos) - - // Execute batch if full - if len(cmds) >= ap.config.MaxBatchSize { - ap.executeBatch(cmds, doneChans, positions) - cmds = cmds[:0] - doneChans = doneChans[:0] - positions = positions[:0] - } - } -} - -// executeBatch executes a batch of commands. -func (ap *AutoPipeliner) executeBatch(cmds []Cmder, doneChans []chan struct{}, positions []uint32) { - if len(cmds) == 0 { - return - } - - // Acquire semaphore (limit concurrent batches) - // Try fast path first - if !ap.sem.TryAcquire() { - // Fast path failed, need to wait - err := ap.sem.Acquire(ap.ctx, 5*time.Second, context.DeadlineExceeded) - if err != nil { - // Context cancelled, set error on all commands and notify - for i, cmd := range cmds { - cmd.SetErr(ErrClosed) - doneChans[i] <- struct{}{} // Send signal instead of close - ap.ring.finishCmd(positions[i]) - } - return - } - } - - // Fast path for single command - if len(cmds) == 1 { - _ = ap.pipeliner.Process(context.Background(), cmds[0]) - doneChans[0] <- struct{}{} // Send signal instead of close - ap.ring.finishCmd(positions[0]) - ap.sem.Release() - return - } - // Execute pipeline for multiple commands - pipe := ap.pipeliner.Pipeline() - for _, cmd := range cmds { - _ = pipe.Process(context.Background(), cmd) - } - - // Execute and wait for completion - _, _ = pipe.Exec(context.Background()) - - // Notify completion and finish slots - for i, done := range doneChans { - done <- struct{}{} // Send signal instead of close - ap.ring.finishCmd(positions[i]) - } - ap.sem.Release() -} - -// flushBatchSlice flushes commands from the slice-based queue (legacy). +// flushBatchSlice flushes commands from the slice-based queue. func (ap *AutoPipeliner) flushBatchSlice() { // Get commands from queue ap.mu.Lock() if len(ap.queue) == 0 { - ap.queueLen.Store(0) ap.mu.Unlock() + ap.queueLen.Store(0) return } // Take ownership of current queue queuedCmds := ap.queue ap.queue = getQueueSlice(ap.config.MaxBatchSize) - ap.queueLen.Store(0) ap.mu.Unlock() + ap.queueLen.Store(0) // Acquire semaphore (limit concurrent batches) // Try fast path first @@ -630,9 +470,6 @@ func (ap *AutoPipeliner) flushBatchSlice() { // Len returns the current number of queued commands. func (ap *AutoPipeliner) Len() int { - if ap.config.UseRingBuffer { - return ap.ring.len() - } return int(ap.queueLen.Load()) } diff --git a/autopipeline_ring.go b/autopipeline_ring.go deleted file mode 100644 index 5a4816a4..00000000 --- a/autopipeline_ring.go +++ /dev/null @@ -1,244 +0,0 @@ -package redis - -import ( - "math/bits" - "sync" - "sync/atomic" -) - -// autoPipelineRing is a pre-allocated ring buffer queue for autopipelining. -// It provides lock-free enqueue and FIFO ordering guarantees. -// -// Ring buffer architecture: -// - Pre-allocated slots (no allocations during enqueue) -// - Per-slot channels for request-response matching -// - Atomic write pointer for lock-free enqueue -// - Separate read pointers for write and read goroutines -// -// The ring buffer uses three pointers: -// - write: Where app goroutines add commands (atomic increment) -// - read1: Where flush goroutine reads commands to send -// - read2: Where result goroutine matches responses (currently unused, for future optimization) -type autoPipelineRing struct { - store []autoPipelineSlot // Pre-allocated slots - mask uint32 // Size - 1 (for fast modulo via bitwise AND) - write uint32 // Write position (atomic, incremented by app goroutines) - read1 uint32 // Read position for flush goroutine - read2 uint32 // Read position for result matching (reserved for future use) - cmds []Cmder // Persistent buffer for collecting commands (reused, no allocations) - doneChans []chan struct{} // Persistent buffer for collecting done channels (reused, no allocations) -} - -// autoPipelineSlot represents a single command slot in the ring buffer. -type autoPipelineSlot struct { - c1 *sync.Cond // Condition variable for write synchronization (shared mutex with c2) - c2 *sync.Cond // Condition variable for wait/signal (shared mutex with c1) - cmd Cmder // The command to execute - done chan struct{} // Completion notification channel (pre-allocated, reused) - mark uint32 // State: 0=empty, 1=queued, 2=sent (atomic) - slept bool // Whether writer goroutine is sleeping on this slot -} - -// State constants for autoPipelineSlot.mark -const ( - apSlotEmpty uint32 = 0 // Slot is empty and available - apSlotQueued uint32 = 1 // Command queued, ready to be sent - apSlotSent uint32 = 2 // Command sent, waiting for response - apSlotClosed uint32 = 3 // Ring is closed, stop waiting -) - -// newAutoPipelineRing creates a new ring buffer with the specified size. -// Size will be rounded up to the next power of 2 for efficient modulo operations. -func newAutoPipelineRing(size int) *autoPipelineRing { - // Round up to power of 2 for fast modulo via bitwise AND - if size <= 0 { - size = 1024 // Default size - } - if size&(size-1) != 0 { - // Not a power of 2, round up - size = 1 << (32 - bits.LeadingZeros32(uint32(size))) - } - - r := &autoPipelineRing{ - store: make([]autoPipelineSlot, size), - mask: uint32(size - 1), - read1: ^uint32(0), // Start at -1 (0xFFFFFFFF) so first increment gives 0 - read2: ^uint32(0), // Start at -1 (0xFFFFFFFF) so first increment gives 0 - cmds: make([]Cmder, 0, size), // Persistent buffer, reused - doneChans: make([]chan struct{}, 0, size), // Persistent buffer, reused - } - - // Initialize each slot with condition variables and pre-allocated channel - for i := range r.store { - m := &sync.Mutex{} - r.store[i].c1 = sync.NewCond(m) - r.store[i].c2 = sync.NewCond(m) // Share the same mutex - r.store[i].done = make(chan struct{}, 1) // Buffered channel for signal (not close) - } - - return r -} - -// putOne enqueues a command into the ring buffer. -// Returns the done channel that will be signaled when the command completes. -// -// Ring buffer enqueue implementation: -// - Atomic increment for write position -// - Wait on condition variable if slot is full -// - Signal reader if it's sleeping -func (r *autoPipelineRing) putOne(cmd Cmder) chan struct{} { - // Atomic increment to get next slot - // AddUint32 returns the NEW value (after increment), so subtract 1 to get the slot we own - pos := atomic.AddUint32(&r.write, 1) - 1 - slot := &r.store[pos&r.mask] - - // Lock the slot - slot.c1.L.Lock() - - // Wait if slot is not empty (mark != 0) - for slot.mark != 0 { - slot.c1.Wait() - } - - // Store command and mark as queued - slot.cmd = cmd - slot.mark = 1 - s := slot.slept - - slot.c1.L.Unlock() - - // If reader is sleeping, wake it up - if s { - slot.c2.Broadcast() - } - - return slot.done -} - -// nextWriteCmd tries to get the next command (non-blocking). -// Returns nil if no command is available. -// Should only be called by the flush goroutine. -// Returns: cmd, done channel, position (for finishCmd) -func (r *autoPipelineRing) nextWriteCmd() (Cmder, chan struct{}, uint32) { - r.read1++ - pos := r.read1 - p := pos & r.mask - slot := &r.store[p] - - slot.c1.L.Lock() - if slot.mark == 1 { - cmd := slot.cmd - done := slot.done - slot.mark = 2 - slot.c1.L.Unlock() - return cmd, done, pos - } - // No command available, rollback read position - r.read1-- - slot.c1.L.Unlock() - return nil, nil, 0 -} - -// waitForWrite waits for the next command (blocking). -// Should only be called by the flush goroutine. -// Returns nil if the ring is closed. -// Returns: cmd, done channel, position (for finishCmd) -func (r *autoPipelineRing) waitForWrite() (Cmder, chan struct{}, uint32) { - r.read1++ - pos := r.read1 - p := pos & r.mask - slot := &r.store[p] - - slot.c1.L.Lock() - // Wait until command is available (mark == 1) or closed (mark == 3) - for slot.mark != 1 && slot.mark != apSlotClosed { - slot.slept = true - slot.c2.Wait() // c1 and c2 share the same mutex - slot.slept = false - } - - // Check if closed - if slot.mark == apSlotClosed { - r.read1-- // Rollback read position - slot.c1.L.Unlock() - return nil, nil, 0 - } - - cmd := slot.cmd - done := slot.done - slot.mark = 2 - slot.c1.L.Unlock() - return cmd, done, pos -} - -// finishCmd marks a command as completed and clears the slot. -// Should only be called by the flush goroutine. -// The position parameter should be the position returned by nextWriteCmd/waitForWrite. -func (r *autoPipelineRing) finishCmd(position uint32) { - p := position & r.mask - slot := &r.store[p] - - slot.c1.L.Lock() - if slot.mark == 2 { - // Drain the done channel before reusing - select { - case <-slot.done: - default: - } - - // Clear slot for reuse - slot.cmd = nil - slot.mark = 0 - } - slot.c1.L.Unlock() - slot.c1.Signal() // Wake up any writer waiting on this slot -} - -// len returns the approximate number of queued commands. -// This is an estimate and may not be exact due to concurrent access. -func (r *autoPipelineRing) len() int { - write := atomic.LoadUint32(&r.write) - read := atomic.LoadUint32(&r.read1) - - // Handle wrap-around - if write >= read { - return int(write - read) - } - // Wrapped around - return int(write + (^uint32(0) - read) + 1) -} - -// cap returns the capacity of the ring buffer. -func (r *autoPipelineRing) cap() int { - return len(r.store) -} - -// reset resets the ring buffer to empty state. -// This should only be called when no goroutines are accessing the ring. -func (r *autoPipelineRing) reset() { - atomic.StoreUint32(&r.write, 0) - atomic.StoreUint32(&r.read1, 0) - atomic.StoreUint32(&r.read2, 0) - - for i := range r.store { - r.store[i].c1.L.Lock() - r.store[i].cmd = nil - r.store[i].mark = 0 - r.store[i].slept = false - r.store[i].c1.L.Unlock() - } -} - -// wakeAll wakes up all waiting goroutines. -// This is used during shutdown to unblock the flusher. -func (r *autoPipelineRing) wakeAll() { - for i := range r.store { - r.store[i].c1.L.Lock() - if r.store[i].mark == 0 { - r.store[i].mark = apSlotClosed - } - r.store[i].c1.L.Unlock() - r.store[i].c2.Broadcast() - } -} - diff --git a/pipeline_buffer_test.go b/pipeline_buffer_test.go index 27ade784..178c9314 100644 --- a/pipeline_buffer_test.go +++ b/pipeline_buffer_test.go @@ -78,8 +78,6 @@ func TestPipelineBufferSizesWithAutoPipeline(t *testing.T) { AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 10, MaxConcurrentBatches: 2, - UseRingBuffer: true, - RingBufferSize: 64, }, }) defer client.Close()