diff --git a/autopipeline.go b/autopipeline.go index c4489026..5b203607 100644 --- a/autopipeline.go +++ b/autopipeline.go @@ -5,6 +5,8 @@ import ( "sync" "sync/atomic" "time" + + "github.com/redis/go-redis/v9/internal" ) // AutoPipelineConfig configures the autopipelining behavior. @@ -13,30 +15,11 @@ type AutoPipelineConfig struct { // Default: 100 MaxBatchSize int - // FlushInterval is the maximum time to wait before flushing pending commands. - // Default: 10ms - FlushInterval time.Duration - // MaxConcurrentBatches is the maximum number of concurrent pipeline executions. // This prevents overwhelming the server with too many concurrent pipelines. // Default: 10 MaxConcurrentBatches int - // UseRingBuffer enables the high-performance ring buffer queue. - // When enabled, uses a pre-allocated ring buffer with lock-free enqueue - // instead of the slice-based queue. This provides: - // - 6x faster enqueue operations - // - 100% reduction in allocations during enqueue - // - Better performance under high concurrency - // Default: true (enabled) - UseRingBuffer bool - - // RingBufferSize is the size of the ring buffer queue. - // Only used when UseRingBuffer is true. - // Must be a power of 2 for optimal performance (will be rounded up if not). - // Default: 1024 - RingBufferSize int - // MaxFlushDelay is the maximum delay after flushing before checking for more commands. // A small delay (e.g., 100μs) can significantly reduce CPU usage by allowing // more commands to batch together, at the cost of slightly higher latency. @@ -56,19 +39,16 @@ type AutoPipelineConfig struct { func DefaultAutoPipelineConfig() *AutoPipelineConfig { return &AutoPipelineConfig{ MaxBatchSize: 50, - FlushInterval: time.Millisecond, MaxConcurrentBatches: 10, - UseRingBuffer: true, // Enable ring buffer by default - RingBufferSize: 1024, MaxFlushDelay: 0, // No delay by default (lowest latency) } } -// pipelinerClient is an interface for clients that support pipelining. +// cmdableClient is an interface for clients that support pipelining. // Both Client and ClusterClient implement this interface. -type pipelinerClient interface { +type cmdableClient interface { + Cmdable Process(ctx context.Context, cmd Cmder) error - Pipeline() Pipeliner } // queuedCmd wraps a command with a done channel for completion notification @@ -77,6 +57,91 @@ type queuedCmd struct { done chan struct{} } +// doneChanPool is a sync.Pool for done channels to reduce allocations +// We use buffered channels so we can signal completion without blocking +var doneChanPool = sync.Pool{ + New: func() interface{} { + return make(chan struct{}, 1) + }, +} + +// getDoneChan gets a done channel from the pool +func getDoneChan() chan struct{} { + ch := doneChanPool.Get().(chan struct{}) + // Make sure the channel is empty + select { + case <-ch: + default: + } + return ch +} + +// putDoneChan returns a done channel to the pool after draining it +func putDoneChan(ch chan struct{}) { + // Drain the channel completely + for { + select { + case <-ch: + default: + doneChanPool.Put(ch) + return + } + } +} + +// queuedCmdPool is a sync.Pool for queuedCmd to reduce allocations +var queuedCmdPool = sync.Pool{ + New: func() interface{} { + return &queuedCmd{} + }, +} + +// getQueuedCmd gets a queuedCmd from the pool and initializes it +func getQueuedCmd(cmd Cmder) *queuedCmd { + qc := queuedCmdPool.Get().(*queuedCmd) + qc.cmd = cmd + qc.done = getDoneChan() + return qc +} + +// putQueuedCmd returns a queuedCmd to the pool after clearing it +func putQueuedCmd(qc *queuedCmd) { + qc.cmd = nil + if qc.done != nil { + putDoneChan(qc.done) + qc.done = nil + } + queuedCmdPool.Put(qc) +} + +// queueSlicePool is a sync.Pool for queue slices to reduce allocations +var queueSlicePool = sync.Pool{ + New: func() interface{} { + // Create a slice with capacity for typical batch size + return make([]*queuedCmd, 0, 100) + }, +} + +// getQueueSlice gets a queue slice from the pool +func getQueueSlice(capacity int) []*queuedCmd { + slice := queueSlicePool.Get().([]*queuedCmd) + // Clear the slice but keep capacity + slice = slice[:0] + // If the capacity is too small, allocate a new one + if cap(slice) < capacity { + return make([]*queuedCmd, 0, capacity) + } + return slice +} + +// putQueueSlice returns a queue slice to the pool +func putQueueSlice(slice []*queuedCmd) { + // Only pool slices that aren't too large (avoid memory bloat) + if cap(slice) <= 1000 { + queueSlicePool.Put(slice) + } +} + // autoPipelineCmd wraps a command and blocks on result access until execution completes. type autoPipelineCmd struct { Cmder @@ -100,7 +165,6 @@ func (c *autoPipelineCmd) String() string { // 1. Collecting commands from multiple goroutines into a shared queue // 2. Automatically flushing the queue when: // - The batch size reaches MaxBatchSize -// - The flush interval (FlushInterval) expires // // 3. Executing batched commands using Redis pipelining // @@ -116,56 +180,59 @@ func (c *autoPipelineCmd) String() string { type AutoPipeliner struct { cmdable // Embed cmdable to get all Redis command methods - pipeliner pipelinerClient + pipeliner cmdableClient config *AutoPipelineConfig // Command queue - either slice-based or ring buffer mu sync.Mutex - queue []*queuedCmd // Slice-based queue (legacy) - ring *autoPipelineRing // Ring buffer queue (high-performance) - queueLen atomic.Int32 // Fast path check without lock + queue []*queuedCmd // Slice-based queue (legacy) + queueLen atomic.Int32 // Fast path check without lock // Flush control flushCh chan struct{} // Signal to flush immediately // Concurrency control - sem chan struct{} // Semaphore for concurrent batch limit + sem *internal.FastSemaphore // Semaphore for concurrent batch limit // Lifecycle - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - closed atomic.Bool - cachedFlushInterval atomic.Int64 + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + closed atomic.Bool } // NewAutoPipeliner creates a new autopipeliner for the given client. // The client can be either *Client or *ClusterClient. -func NewAutoPipeliner(pipeliner pipelinerClient, config *AutoPipelineConfig) *AutoPipeliner { +func NewAutoPipeliner(pipeliner cmdableClient, config *AutoPipelineConfig) *AutoPipeliner { if config == nil { config = DefaultAutoPipelineConfig() } + // Apply defaults for zero values + if config.MaxBatchSize <= 0 { + config.MaxBatchSize = 50 + } + + if config.MaxConcurrentBatches <= 0 { + config.MaxConcurrentBatches = 10 + } + ctx, cancel := context.WithCancel(context.Background()) ap := &AutoPipeliner{ pipeliner: pipeliner, config: config, flushCh: make(chan struct{}, 1), - sem: make(chan struct{}, config.MaxConcurrentBatches), + sem: internal.NewFastSemaphore(int32(config.MaxConcurrentBatches)), ctx: ctx, cancel: cancel, } - // Initialize cmdable to route all commands through Process - ap.cmdable = ap.Process + // Initialize cmdable to route all commands through processAndBlock + ap.cmdable = ap.processAndBlock // Initialize queue based on configuration - if config.UseRingBuffer { - ap.ring = newAutoPipelineRing(config.RingBufferSize) - } else { - ap.queue = make([]*queuedCmd, 0, config.MaxBatchSize) - } + ap.queue = getQueueSlice(config.MaxBatchSize) // Start background flusher ap.wg.Add(1) @@ -223,28 +290,41 @@ func (ap *AutoPipeliner) Process(ctx context.Context, cmd Cmder) error { return nil } +// processAndBlock is used by the cmdable interface. +// It queues the command and blocks until execution completes. +// This allows typed methods like Get(), Set(), etc. to work correctly with autopipelining. +func (ap *AutoPipeliner) processAndBlock(ctx context.Context, cmd Cmder) error { + // Check if this is a blocking command (has read timeout set) + // Blocking commands like BLPOP, BRPOP, BZMPOP should not be autopipelined + if cmd.readTimeout() != nil { + // Execute blocking commands directly without autopipelining + return ap.pipeliner.Process(ctx, cmd) + } + + done := ap.process(ctx, cmd) + + // Block until the command is executed + <-done + + return cmd.Err() +} + +// closedChan is a reusable closed channel for error cases +var closedChan = func() chan struct{} { + ch := make(chan struct{}) + close(ch) + return ch +}() + // process is the internal method that queues a command and returns its done channel. func (ap *AutoPipeliner) process(ctx context.Context, cmd Cmder) <-chan struct{} { if ap.closed.Load() { cmd.SetErr(ErrClosed) - closedCh := make(chan struct{}) - close(closedCh) - return closedCh + return closedChan } - // Use ring buffer if enabled - if ap.config.UseRingBuffer { - done := ap.ring.putOne(cmd) - // putOne will signal the flusher via condition variable if needed - return done - } - - // Legacy slice-based queue - // Create queued command with done channel - qc := &queuedCmd{ - cmd: cmd, - done: make(chan struct{}), - } + // Get queued command from pool + qc := getQueuedCmd(cmd) // Fast path: try to acquire lock without blocking if ap.mu.TryLock() { @@ -268,12 +348,11 @@ func (ap *AutoPipeliner) process(ctx context.Context, cmd Cmder) <-chan struct{} ap.queueLen.Store(int32(queueLen)) ap.mu.Unlock() - // Always signal the flusher (non-blocking) + // always signal the flusher (non-blocking) select { case ap.flushCh <- struct{}{}: default: } - return qc.done } @@ -308,11 +387,6 @@ func (ap *AutoPipeliner) Close() error { // Cancel context to stop flusher ap.cancel() - // Wake up flusher if it's waiting - if ap.config.UseRingBuffer { - ap.ring.wakeAll() - } - // Wait for flusher to finish ap.wg.Wait() @@ -322,106 +396,8 @@ func (ap *AutoPipeliner) Close() error { // flusher is the background goroutine that flushes batches. func (ap *AutoPipeliner) flusher() { defer ap.wg.Done() - - if !ap.config.UseRingBuffer { - // Legacy slice-based flusher - ap.flusherSlice() - return - } - - // Ring buffer flusher - var ( - cmds = make([]Cmder, 0, ap.config.MaxBatchSize) - doneChans = make([]chan struct{}, 0, ap.config.MaxBatchSize) - ) - - for { - // Try to get next command (non-blocking) - cmd, done := ap.ring.nextWriteCmd() - - if cmd == nil { - // No command available - // If we have buffered commands, execute them first - if len(cmds) > 0 { - ap.executeBatch(cmds, doneChans) - cmds = cmds[:0] - doneChans = doneChans[:0] - } - - // Check for shutdown before blocking - select { - case <-ap.ctx.Done(): - return - default: - } - - // Wait for next command (blocking) - // This will be woken up by wakeAll() during shutdown - cmd, done = ap.ring.waitForWrite() - - // If nil, ring is closed - if cmd == nil { - return - } - } - - // Add command to batch - cmds = append(cmds, cmd) - doneChans = append(doneChans, done) - - // Execute batch if full - if len(cmds) >= ap.config.MaxBatchSize { - ap.executeBatch(cmds, doneChans) - cmds = cmds[:0] - doneChans = doneChans[:0] - } - } -} - -// executeBatch executes a batch of commands. -func (ap *AutoPipeliner) executeBatch(cmds []Cmder, doneChans []chan struct{}) { - if len(cmds) == 0 { - return - } - - // Acquire semaphore (limit concurrent batches) - select { - case ap.sem <- struct{}{}: - defer func() { - <-ap.sem - }() - case <-ap.ctx.Done(): - // Context cancelled, set error on all commands and notify - for i, cmd := range cmds { - cmd.SetErr(ErrClosed) - doneChans[i] <- struct{}{} // Send signal instead of close - ap.ring.finishCmd() - } - return - } - - // Fast path for single command - if len(cmds) == 1 { - _ = ap.pipeliner.Process(context.Background(), cmds[0]) - doneChans[0] <- struct{}{} // Send signal instead of close - ap.ring.finishCmd() - return - } - - // Execute pipeline for multiple commands - pipe := ap.pipeliner.Pipeline() - for _, cmd := range cmds { - _ = pipe.Process(context.Background(), cmd) - } - - // Execute and wait for completion - _, _ = pipe.Exec(context.Background()) - - // Notify completion and finish slots - for _, done := range doneChans { - done <- struct{}{} // Send signal instead of close - ap.ring.finishCmd() - } + ap.flusherSlice() + return } // flusherSlice is the legacy slice-based flusher. @@ -441,6 +417,9 @@ func (ap *AutoPipeliner) flusherSlice() { for { select { case <-ap.flushCh: + if ap.Len() >= ap.config.MaxBatchSize { + goto drained + } default: goto drained } @@ -465,8 +444,6 @@ func (ap *AutoPipeliner) flusherSlice() { } } - - // flushBatchSlice flushes commands from the slice-based queue (legacy). func (ap *AutoPipeliner) flushBatchSlice() { // Get commands from queue @@ -479,33 +456,43 @@ func (ap *AutoPipeliner) flushBatchSlice() { // Take ownership of current queue queuedCmds := ap.queue - ap.queue = make([]*queuedCmd, 0, ap.config.MaxBatchSize) + ap.queue = getQueueSlice(ap.config.MaxBatchSize) ap.queueLen.Store(0) ap.mu.Unlock() // Acquire semaphore (limit concurrent batches) - select { - case ap.sem <- struct{}{}: - defer func() { - <-ap.sem - }() - case <-ap.ctx.Done(): - // Context cancelled, set error on all commands and notify - for _, qc := range queuedCmds { - qc.cmd.SetErr(ErrClosed) - close(qc.done) + // Try fast path first + if !ap.sem.TryAcquire() { + // Fast path failed, need to wait + // essentially, this is a blocking call + err := ap.sem.Acquire(ap.ctx, 5*time.Second, context.DeadlineExceeded) + if err != nil { + // Context cancelled, set error on all commands and notify + for _, qc := range queuedCmds { + qc.cmd.SetErr(ErrClosed) + // Signal completion by sending to buffered channel + qc.done <- struct{}{} + putQueuedCmd(qc) + } + putQueueSlice(queuedCmds) + return } - return } if len(queuedCmds) == 0 { + ap.sem.Release() return } // Fast path for single command if len(queuedCmds) == 1 { - _ = ap.pipeliner.Process(context.Background(), queuedCmds[0].cmd) - close(queuedCmds[0].done) + qc := queuedCmds[0] + _ = ap.pipeliner.Process(context.Background(), qc.cmd) + // Signal completion by sending to buffered channel + qc.done <- struct{}{} + ap.sem.Release() + putQueuedCmd(qc) + putQueueSlice(queuedCmds) return } @@ -521,15 +508,16 @@ func (ap *AutoPipeliner) flushBatchSlice() { // IMPORTANT: Only notify after pipeline execution is complete // This ensures command results are fully populated before waiters proceed for _, qc := range queuedCmds { - close(qc.done) + // Signal completion by sending to buffered channel + qc.done <- struct{}{} + putQueuedCmd(qc) } + ap.sem.Release() + putQueueSlice(queuedCmds) } // Len returns the current number of queued commands. func (ap *AutoPipeliner) Len() int { - if ap.config.UseRingBuffer { - return ap.ring.len() - } return int(ap.queueLen.Load()) } @@ -559,17 +547,7 @@ func (ap *AutoPipeliner) Pipelined(ctx context.Context, fn func(Pipeliner) error // Note: This uses the underlying client's TxPipeline if available (Client, Ring, ClusterClient). // For other clients, this will panic. func (ap *AutoPipeliner) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { - // Try to get TxPipeline from the underlying client - // This works for Client, Ring, and ClusterClient - type txPipeliner interface { - TxPipeline() Pipeliner - } - - if txp, ok := ap.pipeliner.(txPipeliner); ok { - return txp.TxPipeline().Pipelined(ctx, fn) - } - - panic("redis: TxPipelined not supported by this client type") + return ap.pipeliner.TxPipeline().Pipelined(ctx, fn) } // TxPipeline returns a new transaction pipeline that uses the underlying pipeliner. @@ -578,15 +556,8 @@ func (ap *AutoPipeliner) TxPipelined(ctx context.Context, fn func(Pipeliner) err // Note: This uses the underlying client's TxPipeline if available (Client, Ring, ClusterClient). // For other clients, this will panic. func (ap *AutoPipeliner) TxPipeline() Pipeliner { - // Try to get TxPipeline from the underlying client - // This works for Client, Ring, and ClusterClient - type txPipeliner interface { - TxPipeline() Pipeliner - } - - if txp, ok := ap.pipeliner.(txPipeliner); ok { - return txp.TxPipeline() - } - - panic("redis: TxPipeline not supported by this client type") + return ap.pipeliner.TxPipeline() } + +// validate AutoPipeliner implements Cmdable +var _ Cmdable = (*AutoPipeliner)(nil) diff --git a/autopipeline_bench_test.go b/autopipeline_bench_test.go index a3909659..ad92be85 100644 --- a/autopipeline_bench_test.go +++ b/autopipeline_bench_test.go @@ -69,7 +69,7 @@ func BenchmarkAutoPipeline(b *testing.B) { Addr: ":6379", AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 100, - FlushInterval: 10 * time.Millisecond, + MaxFlushDelay: 10 * time.Millisecond, MaxConcurrentBatches: 10, }, }) @@ -155,7 +155,7 @@ func BenchmarkConcurrentAutoPipeline(b *testing.B) { Addr: ":6379", AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 100, - FlushInterval: 10 * time.Millisecond, + MaxFlushDelay: 10 * time.Millisecond, MaxConcurrentBatches: 10, }, }) @@ -201,7 +201,7 @@ func BenchmarkAutoPipelineBatchSizes(b *testing.B) { Addr: ":6379", AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: batchSize, - FlushInterval: 10 * time.Millisecond, + MaxFlushDelay: 10 * time.Millisecond, MaxConcurrentBatches: 10, }, }) @@ -222,23 +222,23 @@ func BenchmarkAutoPipelineBatchSizes(b *testing.B) { } } -// BenchmarkAutoPipelineFlushIntervals tests different flush intervals -func BenchmarkAutoPipelineFlushIntervals(b *testing.B) { - intervals := []time.Duration{ +// BenchmarkAutoPipelineMaxFlushDelays tests different max flush delays +func BenchmarkAutoPipelineMaxFlushDelays(b *testing.B) { + delays := []time.Duration{ 1 * time.Millisecond, 5 * time.Millisecond, 10 * time.Millisecond, 50 * time.Millisecond, } - for _, interval := range intervals { - b.Run(fmt.Sprintf("interval=%s", interval), func(b *testing.B) { + for _, delay := range delays { + b.Run(fmt.Sprintf("delay=%s", delay), func(b *testing.B) { ctx := context.Background() client := redis.NewClient(&redis.Options{ Addr: ":6379", AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 100, - FlushInterval: interval, + MaxFlushDelay: delay, MaxConcurrentBatches: 10, }, }) @@ -349,10 +349,8 @@ func BenchmarkRingBufferVsSliceQueue(b *testing.B) { Addr: ":6379", AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 50, - FlushInterval: time.Millisecond, + MaxFlushDelay: time.Millisecond, MaxConcurrentBatches: 10, - UseRingBuffer: true, - RingBufferSize: 1024, }, }) defer client.Close() @@ -378,9 +376,8 @@ func BenchmarkRingBufferVsSliceQueue(b *testing.B) { Addr: ":6379", AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 50, - FlushInterval: time.Millisecond, + MaxFlushDelay: time.Millisecond, MaxConcurrentBatches: 10, - UseRingBuffer: false, // Use legacy slice queue }, }) defer client.Close() @@ -417,11 +414,8 @@ func BenchmarkMaxFlushDelay(b *testing.B) { Addr: ":6379", AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 50, - FlushInterval: time.Millisecond, - MaxConcurrentBatches: 10, - UseRingBuffer: true, - RingBufferSize: 1024, MaxFlushDelay: delay, + MaxConcurrentBatches: 10, }, }) defer client.Close() @@ -462,10 +456,8 @@ func BenchmarkBufferSizes(b *testing.B) { WriteBufferSize: size, AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 50, - FlushInterval: time.Millisecond, + MaxFlushDelay: time.Millisecond, MaxConcurrentBatches: 10, - UseRingBuffer: true, - RingBufferSize: 1024, }, }) defer client.Close() @@ -487,27 +479,25 @@ func BenchmarkBufferSizes(b *testing.B) { } } -// BenchmarkRingBufferSizes benchmarks different ring buffer sizes -func BenchmarkRingBufferSizes(b *testing.B) { - ringSizes := []int{ - 256, - 512, - 1024, // default - 2048, - 4096, +// BenchmarkAutoPipelineMaxBatchSizes benchmarks different max batch sizes +func BenchmarkAutoPipelineMaxBatchSizes(b *testing.B) { + batchSizes := []int{ + 10, + 50, // default + 100, + 200, + 500, } - for _, size := range ringSizes { - b.Run(fmt.Sprintf("ring_%d", size), func(b *testing.B) { + for _, size := range batchSizes { + b.Run(fmt.Sprintf("batch_%d", size), func(b *testing.B) { ctx := context.Background() client := redis.NewClient(&redis.Options{ Addr: ":6379", AutoPipelineConfig: &redis.AutoPipelineConfig{ - MaxBatchSize: 50, - FlushInterval: time.Millisecond, + MaxBatchSize: size, + MaxFlushDelay: time.Millisecond, MaxConcurrentBatches: 10, - UseRingBuffer: true, - RingBufferSize: size, }, }) defer client.Close() diff --git a/autopipeline_ring.go b/autopipeline_ring.go deleted file mode 100644 index 0a99036a..00000000 --- a/autopipeline_ring.go +++ /dev/null @@ -1,236 +0,0 @@ -package redis - -import ( - "math/bits" - "sync" - "sync/atomic" -) - -// autoPipelineRing is a pre-allocated ring buffer queue for autopipelining. -// It provides lock-free enqueue and FIFO ordering guarantees. -// -// Ring buffer architecture: -// - Pre-allocated slots (no allocations during enqueue) -// - Per-slot channels for request-response matching -// - Atomic write pointer for lock-free enqueue -// - Separate read pointers for write and read goroutines -// -// The ring buffer uses three pointers: -// - write: Where app goroutines add commands (atomic increment) -// - read1: Where flush goroutine reads commands to send -// - read2: Where result goroutine matches responses (currently unused, for future optimization) -type autoPipelineRing struct { - store []autoPipelineSlot // Pre-allocated slots - mask uint32 // Size - 1 (for fast modulo via bitwise AND) - write uint32 // Write position (atomic, incremented by app goroutines) - read1 uint32 // Read position for flush goroutine - read2 uint32 // Read position for result matching (reserved for future use) - cmds []Cmder // Persistent buffer for collecting commands (reused, no allocations) - doneChans []chan struct{} // Persistent buffer for collecting done channels (reused, no allocations) -} - -// autoPipelineSlot represents a single command slot in the ring buffer. -type autoPipelineSlot struct { - c1 *sync.Cond // Condition variable for write synchronization (shared mutex with c2) - c2 *sync.Cond // Condition variable for wait/signal (shared mutex with c1) - cmd Cmder // The command to execute - done chan struct{} // Completion notification channel (pre-allocated, reused) - mark uint32 // State: 0=empty, 1=queued, 2=sent (atomic) - slept bool // Whether writer goroutine is sleeping on this slot -} - -// State constants for autoPipelineSlot.mark -const ( - apSlotEmpty uint32 = 0 // Slot is empty and available - apSlotQueued uint32 = 1 // Command queued, ready to be sent - apSlotSent uint32 = 2 // Command sent, waiting for response - apSlotClosed uint32 = 3 // Ring is closed, stop waiting -) - -// newAutoPipelineRing creates a new ring buffer with the specified size. -// Size will be rounded up to the next power of 2 for efficient modulo operations. -func newAutoPipelineRing(size int) *autoPipelineRing { - // Round up to power of 2 for fast modulo via bitwise AND - if size <= 0 { - size = 1024 // Default size - } - if size&(size-1) != 0 { - // Not a power of 2, round up - size = 1 << (32 - bits.LeadingZeros32(uint32(size))) - } - - r := &autoPipelineRing{ - store: make([]autoPipelineSlot, size), - mask: uint32(size - 1), - cmds: make([]Cmder, 0, size), // Persistent buffer, reused - doneChans: make([]chan struct{}, 0, size), // Persistent buffer, reused - } - - // Initialize each slot with condition variables and pre-allocated channel - for i := range r.store { - m := &sync.Mutex{} - r.store[i].c1 = sync.NewCond(m) - r.store[i].c2 = sync.NewCond(m) // Share the same mutex - r.store[i].done = make(chan struct{}, 1) // Buffered channel for signal (not close) - } - - return r -} - -// putOne enqueues a command into the ring buffer. -// Returns the done channel that will be signaled when the command completes. -// -// Ring buffer enqueue implementation: -// - Atomic increment for write position -// - Wait on condition variable if slot is full -// - Signal reader if it's sleeping -func (r *autoPipelineRing) putOne(cmd Cmder) <-chan struct{} { - // Atomic increment to get next slot - slot := &r.store[atomic.AddUint32(&r.write, 1)&r.mask] - - // Lock the slot - slot.c1.L.Lock() - - // Wait if slot is not empty (mark != 0) - for slot.mark != 0 { - slot.c1.Wait() - } - - // Store command and mark as queued - slot.cmd = cmd - slot.mark = 1 - s := slot.slept - - slot.c1.L.Unlock() - - // If reader is sleeping, wake it up - if s { - slot.c2.Broadcast() - } - - return slot.done -} - -// nextWriteCmd tries to get the next command (non-blocking). -// Returns nil if no command is available. -// Should only be called by the flush goroutine. -func (r *autoPipelineRing) nextWriteCmd() (Cmder, chan struct{}) { - r.read1++ - p := r.read1 & r.mask - slot := &r.store[p] - - slot.c1.L.Lock() - if slot.mark == 1 { - cmd := slot.cmd - done := slot.done - slot.mark = 2 - slot.c1.L.Unlock() - return cmd, done - } - // No command available, rollback read position - r.read1-- - slot.c1.L.Unlock() - return nil, nil -} - -// waitForWrite waits for the next command (blocking). -// Should only be called by the flush goroutine. -// Returns nil if the ring is closed. -func (r *autoPipelineRing) waitForWrite() (Cmder, chan struct{}) { - r.read1++ - p := r.read1 & r.mask - slot := &r.store[p] - - slot.c1.L.Lock() - // Wait until command is available (mark == 1) or closed (mark == 3) - for slot.mark != 1 && slot.mark != apSlotClosed { - slot.slept = true - slot.c2.Wait() // c1 and c2 share the same mutex - slot.slept = false - } - - // Check if closed - if slot.mark == apSlotClosed { - r.read1-- // Rollback read position - slot.c1.L.Unlock() - return nil, nil - } - - cmd := slot.cmd - done := slot.done - slot.mark = 2 - slot.c1.L.Unlock() - return cmd, done -} - -// finishCmd marks a command as completed and clears the slot. -// Should only be called by the flush goroutine. -func (r *autoPipelineRing) finishCmd() { - r.read2++ - p := r.read2 & r.mask - slot := &r.store[p] - - slot.c1.L.Lock() - if slot.mark == 2 { - // Drain the done channel before reusing - select { - case <-slot.done: - default: - } - - // Clear slot for reuse - slot.cmd = nil - slot.mark = 0 - } - slot.c1.L.Unlock() - slot.c1.Signal() // Wake up any writer waiting on this slot -} - -// len returns the approximate number of queued commands. -// This is an estimate and may not be exact due to concurrent access. -func (r *autoPipelineRing) len() int { - write := atomic.LoadUint32(&r.write) - read := atomic.LoadUint32(&r.read1) - - // Handle wrap-around - if write >= read { - return int(write - read) - } - // Wrapped around - return int(write + (^uint32(0) - read) + 1) -} - -// cap returns the capacity of the ring buffer. -func (r *autoPipelineRing) cap() int { - return len(r.store) -} - -// reset resets the ring buffer to empty state. -// This should only be called when no goroutines are accessing the ring. -func (r *autoPipelineRing) reset() { - atomic.StoreUint32(&r.write, 0) - atomic.StoreUint32(&r.read1, 0) - atomic.StoreUint32(&r.read2, 0) - - for i := range r.store { - r.store[i].c1.L.Lock() - r.store[i].cmd = nil - r.store[i].mark = 0 - r.store[i].slept = false - r.store[i].c1.L.Unlock() - } -} - -// wakeAll wakes up all waiting goroutines. -// This is used during shutdown to unblock the flusher. -func (r *autoPipelineRing) wakeAll() { - for i := range r.store { - r.store[i].c1.L.Lock() - if r.store[i].mark == 0 { - r.store[i].mark = apSlotClosed - } - r.store[i].c1.L.Unlock() - r.store[i].c2.Broadcast() - } -} - diff --git a/autopipeline_sequential_test.go b/autopipeline_sequential_test.go index 01fba668..d45a3a3b 100644 --- a/autopipeline_sequential_test.go +++ b/autopipeline_sequential_test.go @@ -16,7 +16,7 @@ func TestAutoPipelineSequential(t *testing.T) { Addr: ":6379", AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 10, - FlushInterval: 50 * time.Millisecond, + MaxFlushDelay: 50 * time.Millisecond, MaxConcurrentBatches: 5, }, }) @@ -64,7 +64,7 @@ func TestAutoPipelineSequentialSmallBatches(t *testing.T) { Addr: ":6379", AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 1000, // Large batch size - FlushInterval: 20 * time.Millisecond, // Rely on timer + MaxFlushDelay: 20 * time.Millisecond, // Rely on timer MaxConcurrentBatches: 5, }, }) @@ -111,7 +111,7 @@ func TestAutoPipelineSequentialMixed(t *testing.T) { Addr: ":6379", AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 5, - FlushInterval: 50 * time.Millisecond, + MaxFlushDelay: 50 * time.Millisecond, MaxConcurrentBatches: 5, }, }) diff --git a/autopipeline_test.go b/autopipeline_test.go index cb411dd6..7f8af73f 100644 --- a/autopipeline_test.go +++ b/autopipeline_test.go @@ -24,7 +24,7 @@ var _ = Describe("AutoPipeline", func() { Addr: redisAddr, AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 10, - FlushInterval: 50 * time.Millisecond, + MaxFlushDelay: 50 * time.Millisecond, MaxConcurrentBatches: 5, }, }) @@ -235,7 +235,7 @@ var _ = Describe("AutoPipeline", func() { Addr: redisAddr, AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 5, - FlushInterval: 10 * time.Millisecond, + MaxFlushDelay: 10 * time.Millisecond, MaxConcurrentBatches: 2, }, }) @@ -296,7 +296,7 @@ func TestAutoPipelineBasic(t *testing.T) { Addr: ":6379", AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 10, - FlushInterval: 50 * time.Millisecond, + MaxFlushDelay: 50 * time.Millisecond, MaxConcurrentBatches: 5, }, }) @@ -338,14 +338,14 @@ func TestAutoPipelineBasic(t *testing.T) { } } -func TestAutoPipelineFlushInterval(t *testing.T) { +func TestAutoPipelineMaxFlushDelay(t *testing.T) { ctx := context.Background() client := redis.NewClient(&redis.Options{ Addr: ":6379", AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 1000, // Large batch size so only timer triggers flush - FlushInterval: 50 * time.Millisecond, + MaxFlushDelay: 50 * time.Millisecond, MaxConcurrentBatches: 5, }, }) diff --git a/autopipeline_typed_test.go b/autopipeline_typed_test.go new file mode 100644 index 00000000..da03d242 --- /dev/null +++ b/autopipeline_typed_test.go @@ -0,0 +1,126 @@ +package redis_test + +import ( + "context" + "testing" + + "github.com/redis/go-redis/v9" +) + +// TestAutoPipelineTypedCommands tests that typed commands like Get() and Set() +// work correctly with autopipelining and block until execution. +func TestAutoPipelineTypedCommands(t *testing.T) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + }) + defer client.Close() + + // Skip if Redis is not available + if err := client.Ping(ctx).Err(); err != nil { + t.Skipf("Redis not available: %v", err) + } + + ap := client.AutoPipeline() + defer ap.Close() + + // Test Set and Get + setResult, setErr := ap.Set(ctx, "test_key", "test_value", 0).Result() + if setErr != nil { + t.Fatalf("Set failed: %v", setErr) + } + if setResult != "OK" { + t.Errorf("Expected 'OK', got '%s'", setResult) + } + + getResult, getErr := ap.Get(ctx, "test_key").Result() + if getErr != nil { + t.Fatalf("Get failed: %v", getErr) + } + if getResult != "test_value" { + t.Errorf("Expected 'test_value', got '%s'", getResult) + } + + // Clean up + client.Del(ctx, "test_key") +} + +// TestAutoPipelineTypedCommandsMultiple tests multiple typed commands +func TestAutoPipelineTypedCommandsMultiple(t *testing.T) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + }) + defer client.Close() + + // Skip if Redis is not available + if err := client.Ping(ctx).Err(); err != nil { + t.Skipf("Redis not available: %v", err) + } + + ap := client.AutoPipeline() + defer ap.Close() + + // Queue multiple Set commands + ap.Set(ctx, "key1", "value1", 0) + ap.Set(ctx, "key2", "value2", 0) + ap.Set(ctx, "key3", "value3", 0) + + // Get the values - should block until execution + val1, err1 := ap.Get(ctx, "key1").Result() + if err1 != nil { + t.Fatalf("Get key1 failed: %v", err1) + } + if val1 != "value1" { + t.Errorf("Expected 'value1', got '%s'", val1) + } + + val2, err2 := ap.Get(ctx, "key2").Result() + if err2 != nil { + t.Fatalf("Get key2 failed: %v", err2) + } + if val2 != "value2" { + t.Errorf("Expected 'value2', got '%s'", val2) + } + + val3, err3 := ap.Get(ctx, "key3").Result() + if err3 != nil { + t.Fatalf("Get key3 failed: %v", err3) + } + if val3 != "value3" { + t.Errorf("Expected 'value3', got '%s'", val3) + } + + // Clean up + client.Del(ctx, "key1", "key2", "key3") +} + +// TestAutoPipelineTypedCommandsVal tests the Val() method +func TestAutoPipelineTypedCommandsVal(t *testing.T) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + }) + defer client.Close() + + // Skip if Redis is not available + if err := client.Ping(ctx).Err(); err != nil { + t.Skipf("Redis not available: %v", err) + } + + ap := client.AutoPipeline() + defer ap.Close() + + // Set a value + ap.Set(ctx, "test_val_key", "test_val_value", 0) + + // Get using Val() - should block until execution + val := ap.Get(ctx, "test_val_key").Val() + if val != "test_val_value" { + t.Errorf("Expected 'test_val_value', got '%s'", val) + } + + // Clean up + client.Del(ctx, "test_val_key") +} + diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 57fbfe17..71e71a8d 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -18,43 +18,6 @@ import ( var noDeadline = time.Time{} -// Global time cache updated every 100ms by background goroutine. -// This avoids expensive time.Now() syscalls in hot paths like getEffectiveReadTimeout. -// Max staleness: 100ms, which is acceptable for timeout deadline checks (timeouts are typically 3-30 seconds). -var globalTimeCache struct { - nowNs atomic.Int64 -} - -func init() { - // Initialize immediately - globalTimeCache.nowNs.Store(time.Now().UnixNano()) - - // Start background updater - go func() { - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - - for range ticker.C { - globalTimeCache.nowNs.Store(time.Now().UnixNano()) - } - }() -} - -// getCachedTimeNs returns the current time in nanoseconds from the global cache. -// This is updated every 100ms by a background goroutine, avoiding expensive syscalls. -// Max staleness: 100ms. -func getCachedTimeNs() int64 { - return globalTimeCache.nowNs.Load() -} - -// GetCachedTimeNs returns the current time in nanoseconds from the global cache. -// This is updated every 100ms by a background goroutine, avoiding expensive syscalls. -// Max staleness: 100ms. -// Exported for use by other packages that need fast time access. -func GetCachedTimeNs() int64 { - return getCachedTimeNs() -} - // Global time cache updated every 50ms by background goroutine. // This avoids expensive time.Now() syscalls in hot paths like getEffectiveReadTimeout. // Max staleness: 50ms, which is acceptable for timeout deadline checks (timeouts are typically 3-30 seconds). diff --git a/internal/proto/writer_bench_test.go b/internal/proto/writer_bench_test.go new file mode 100644 index 00000000..cb099d47 --- /dev/null +++ b/internal/proto/writer_bench_test.go @@ -0,0 +1,106 @@ +package proto + +import ( + "bytes" + "testing" +) + +// BenchmarkWriteArgs benchmarks writing command arguments +func BenchmarkWriteArgs(b *testing.B) { + testCases := []struct { + name string + args []interface{} + }{ + { + name: "simple_get", + args: []interface{}{"GET", "key"}, + }, + { + name: "simple_set", + args: []interface{}{"SET", "key", "value"}, + }, + { + name: "set_with_expiry", + args: []interface{}{"SET", "key", "value", "EX", 3600}, + }, + { + name: "zadd", + args: []interface{}{"ZADD", "myset", 1.0, "member1", 2.0, "member2", 3.0, "member3"}, + }, + { + name: "large_command", + args: make([]interface{}, 100), + }, + } + + // Initialize large command + for i := range testCases[len(testCases)-1].args { + testCases[len(testCases)-1].args[i] = "arg" + } + + for _, tc := range testCases { + b.Run(tc.name, func(b *testing.B) { + buf := &bytes.Buffer{} + wr := NewWriter(buf) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + buf.Reset() + if err := wr.WriteArgs(tc.args); err != nil { + b.Fatal(err) + } + } + }) + } +} + +// BenchmarkWriteArgsParallel benchmarks concurrent writes +func BenchmarkWriteArgsParallel(b *testing.B) { + args := []interface{}{"SET", "key", "value", "EX", 3600} + + b.ReportAllocs() + b.RunParallel(func(pb *testing.PB) { + buf := &bytes.Buffer{} + wr := NewWriter(buf) + + for pb.Next() { + buf.Reset() + if err := wr.WriteArgs(args); err != nil { + b.Fatal(err) + } + } + }) +} + +// BenchmarkWriteCmds benchmarks writing multiple commands (pipeline) +func BenchmarkWriteCmds(b *testing.B) { + sizes := []int{1, 10, 50, 100, 500} + + for _, size := range sizes { + b.Run(string(rune('0'+size/10)), func(b *testing.B) { + // Create commands + cmds := make([][]interface{}, size) + for i := range cmds { + cmds[i] = []interface{}{"SET", "key", i} + } + + buf := &bytes.Buffer{} + wr := NewWriter(buf) + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + buf.Reset() + for _, args := range cmds { + if err := wr.WriteArgs(args); err != nil { + b.Fatal(err) + } + } + } + }) + } +} + diff --git a/options.go b/options.go index ea5f4fa5..7b8a5493 100644 --- a/options.go +++ b/options.go @@ -281,6 +281,9 @@ type Options struct { // to reduce network round-trips and improve throughput. // If nil, autopipelining is disabled. AutoPipelineConfig *AutoPipelineConfig + + // AutoPipelineEnabled enables automatic pipelining of commands. + AutoPipelineEnabled bool } func (opt *Options) init() { diff --git a/osscluster.go b/osscluster.go index 8b288bf1..fe0fd9f1 100644 --- a/osscluster.go +++ b/osscluster.go @@ -1020,6 +1020,7 @@ type ClusterClient struct { nodes *clusterNodes state *clusterStateHolder cmdsInfoCache *cmdsInfoCache + autopipeliner *AutoPipeliner cmdable hooksMixin } @@ -1051,6 +1052,14 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient { return c } +func (c *ClusterClient) WithAutoPipeline() AutoPipelinedClient { + if c.autopipeliner != nil && !c.autopipeliner.closed.Load() { + return c.autopipeliner + } + c.autopipeliner = c.AutoPipeline() + return c.autopipeliner +} + // Options returns read-only Options that were used to create the client. func (c *ClusterClient) Options() *ClusterOptions { return c.opt diff --git a/osscluster_autopipeline_test.go b/osscluster_autopipeline_test.go index a90267b4..a57d5a8d 100644 --- a/osscluster_autopipeline_test.go +++ b/osscluster_autopipeline_test.go @@ -17,7 +17,7 @@ func TestClusterAutoPipelineBasic(t *testing.T) { Addrs: []string{":7000", ":7001", ":7002"}, AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 10, - FlushInterval: 50 * time.Millisecond, + MaxFlushDelay: 50 * time.Millisecond, MaxConcurrentBatches: 5, }, }) @@ -66,7 +66,7 @@ func TestClusterAutoPipelineConcurrency(t *testing.T) { Addrs: []string{":7000", ":7001", ":7002"}, AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 50, - FlushInterval: 10 * time.Millisecond, + MaxFlushDelay: 10 * time.Millisecond, MaxConcurrentBatches: 10, }, }) @@ -121,7 +121,7 @@ func TestClusterAutoPipelineCrossSlot(t *testing.T) { Addrs: []string{":7000", ":7001", ":7002"}, AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 20, - FlushInterval: 10 * time.Millisecond, + MaxFlushDelay: 10 * time.Millisecond, MaxConcurrentBatches: 5, }, }) diff --git a/redis.go b/redis.go index ac97c2ca..e16eccd4 100644 --- a/redis.go +++ b/redis.go @@ -208,6 +208,12 @@ func (hs *hooksMixin) processTxPipelineHook(ctx context.Context, cmds []Cmder) e return hs.current.txPipeline(ctx, cmds) } +// ------------------------------------------------------------------------------ +type AutoPipelinedClient interface { + Cmdable + Close() error +} + //------------------------------------------------------------------------------ type baseClient struct { @@ -1007,6 +1013,7 @@ func (c *baseClient) txPipelineReadQueued(ctx context.Context, cn *pool.Conn, rd type Client struct { *baseClient cmdable + autopipeliner *AutoPipeliner } // NewClient returns a client to the Redis Server specified by Options. @@ -1083,6 +1090,16 @@ func (c *Client) init() { }) } +func (c *Client) WithAutoPipeline() AutoPipelinedClient { + if c.autopipeliner != nil { + if !c.autopipeliner.closed.Load() { + return c.autopipeliner + } + } + c.autopipeliner = c.AutoPipeline() + return c.autopipeliner +} + func (c *Client) WithTimeout(timeout time.Duration) *Client { clone := *c clone.baseClient = c.baseClient.withTimeout(timeout) @@ -1165,6 +1182,37 @@ func (c *Client) Pipeline() Pipeliner { return &pipe } +// AutoPipeline creates a new autopipeliner that automatically batches commands. +// Commands are automatically flushed based on batch size and time interval. +// The autopipeliner must be closed when done to flush pending commands. +// +// Example: +// +// ap := client.AutoPipeline() +// defer ap.Close() +// +// var wg sync.WaitGroup +// for i := 0; i < 1000; i++ { +// wg.Add(1) +// go func(idx int) { +// defer wg.Done() +// ap.Do(ctx, "SET", fmt.Sprintf("key%d", idx), idx) +// }(i) +// } +// wg.Wait() +// +// Note: AutoPipeline requires AutoPipelineConfig to be set in Options. +// If not set, default configuration will be used. +func (c *Client) AutoPipeline() *AutoPipeliner { + if c.autopipeliner != nil { + return c.autopipeliner + } + if c.opt.AutoPipelineConfig == nil { + c.opt.AutoPipelineConfig = DefaultAutoPipelineConfig() + } + return NewAutoPipeliner(c, c.opt.AutoPipelineConfig) +} + func (c *Client) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { return c.TxPipeline().Pipelined(ctx, fn) }