diff --git a/adaptive_delay_test.go b/adaptive_delay_test.go new file mode 100644 index 00000000..d07cf41e --- /dev/null +++ b/adaptive_delay_test.go @@ -0,0 +1,293 @@ +package redis + +import ( + "testing" + "time" +) + +// TestAdaptiveDelayCalculation tests the adaptive delay calculation logic +func TestAdaptiveDelayCalculation(t *testing.T) { + tests := []struct { + name string + maxBatchSize int + maxDelay time.Duration + adaptive bool + queueLen int + expected time.Duration + }{ + // Disabled adaptive delay + { + name: "adaptive disabled, returns fixed delay", + maxBatchSize: 100, + maxDelay: 100 * time.Microsecond, + adaptive: false, + queueLen: 50, + expected: 100 * time.Microsecond, + }, + { + name: "adaptive disabled, zero delay", + maxBatchSize: 100, + maxDelay: 0, + adaptive: false, + queueLen: 50, + expected: 0, + }, + + // Enabled adaptive delay - 75% threshold + { + name: "75% full - no delay", + maxBatchSize: 100, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 75, + expected: 0, + }, + { + name: "76% full - no delay", + maxBatchSize: 100, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 76, + expected: 0, + }, + { + name: "100% full - no delay", + maxBatchSize: 100, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 100, + expected: 0, + }, + + // Enabled adaptive delay - 50% threshold + { + name: "50% full - 25% delay", + maxBatchSize: 100, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 50, + expected: 25 * time.Microsecond, + }, + { + name: "60% full - 25% delay", + maxBatchSize: 100, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 60, + expected: 25 * time.Microsecond, + }, + { + name: "74% full - 25% delay", + maxBatchSize: 100, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 74, + expected: 25 * time.Microsecond, + }, + + // Enabled adaptive delay - 25% threshold + { + name: "25% full - 50% delay", + maxBatchSize: 100, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 25, + expected: 50 * time.Microsecond, + }, + { + name: "30% full - 50% delay", + maxBatchSize: 100, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 30, + expected: 50 * time.Microsecond, + }, + { + name: "49% full - 50% delay", + maxBatchSize: 100, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 49, + expected: 50 * time.Microsecond, + }, + + // Enabled adaptive delay - <25% threshold + { + name: "24% full - 100% delay", + maxBatchSize: 100, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 24, + expected: 100 * time.Microsecond, + }, + { + name: "10% full - 100% delay", + maxBatchSize: 100, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 10, + expected: 100 * time.Microsecond, + }, + { + name: "1% full - 100% delay", + maxBatchSize: 100, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 1, + expected: 100 * time.Microsecond, + }, + + // Edge cases + { + name: "empty queue - no delay", + maxBatchSize: 100, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 0, + expected: 0, + }, + { + name: "zero max delay - no delay", + maxBatchSize: 100, + maxDelay: 0, + adaptive: true, + queueLen: 50, + expected: 0, + }, + + // Different batch sizes + { + name: "small batch - 75% full", + maxBatchSize: 10, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 8, + expected: 0, + }, + { + name: "small batch - 50% full", + maxBatchSize: 10, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 5, + expected: 25 * time.Microsecond, + }, + { + name: "large batch - 75% full", + maxBatchSize: 1000, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 750, + expected: 0, + }, + { + name: "large batch - 50% full", + maxBatchSize: 1000, + maxDelay: 100 * time.Microsecond, + adaptive: true, + queueLen: 500, + expected: 25 * time.Microsecond, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create autopipeliner with test config + ap := &AutoPipeliner{ + config: &AutoPipelineConfig{ + MaxBatchSize: tt.maxBatchSize, + MaxFlushDelay: tt.maxDelay, + AdaptiveDelay: tt.adaptive, + }, + } + ap.queueLen.Store(int32(tt.queueLen)) + + // Calculate delay + result := ap.calculateDelay() + + // Verify result + if result != tt.expected { + t.Errorf("calculateDelay() = %v, want %v", result, tt.expected) + } + }) + } +} + +// TestAdaptiveDelayIntegerArithmetic verifies integer arithmetic correctness +func TestAdaptiveDelayIntegerArithmetic(t *testing.T) { + maxBatch := 100 + maxDelay := 100 * time.Microsecond + + ap := &AutoPipeliner{ + config: &AutoPipelineConfig{ + MaxBatchSize: maxBatch, + MaxFlushDelay: maxDelay, + AdaptiveDelay: true, + }, + } + + // Test all queue lengths from 0 to maxBatch + for queueLen := 0; queueLen <= maxBatch; queueLen++ { + ap.queueLen.Store(int32(queueLen)) + delay := ap.calculateDelay() + + // Verify delay is one of the expected values + switch { + case queueLen == 0: + if delay != 0 { + t.Errorf("queueLen=%d: expected 0, got %v", queueLen, delay) + } + case queueLen*4 >= maxBatch*3: // ≥75% + if delay != 0 { + t.Errorf("queueLen=%d (≥75%%): expected 0, got %v", queueLen, delay) + } + case queueLen*2 >= maxBatch: // ≥50% + if delay != maxDelay/4 { + t.Errorf("queueLen=%d (≥50%%): expected %v, got %v", queueLen, maxDelay/4, delay) + } + case queueLen*4 >= maxBatch: // ≥25% + if delay != maxDelay/2 { + t.Errorf("queueLen=%d (≥25%%): expected %v, got %v", queueLen, maxDelay/2, delay) + } + default: // <25% + if delay != maxDelay { + t.Errorf("queueLen=%d (<25%%): expected %v, got %v", queueLen, maxDelay, delay) + } + } + } +} + +// BenchmarkCalculateDelay benchmarks the delay calculation +func BenchmarkCalculateDelay(b *testing.B) { + ap := &AutoPipeliner{ + config: &AutoPipelineConfig{ + MaxBatchSize: 100, + MaxFlushDelay: 100 * time.Microsecond, + AdaptiveDelay: true, + }, + } + ap.queueLen.Store(50) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = ap.calculateDelay() + } +} + +// BenchmarkCalculateDelayDisabled benchmarks with adaptive delay disabled +func BenchmarkCalculateDelayDisabled(b *testing.B) { + ap := &AutoPipeliner{ + config: &AutoPipelineConfig{ + MaxBatchSize: 100, + MaxFlushDelay: 100 * time.Microsecond, + AdaptiveDelay: false, + }, + } + ap.queueLen.Store(50) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + _ = ap.calculateDelay() + } +} + diff --git a/autopipeline.go b/autopipeline.go index 1786f4f6..41e5bfc2 100644 --- a/autopipeline.go +++ b/autopipeline.go @@ -33,6 +33,18 @@ type AutoPipelineConfig struct { // while adding only ~100μs average latency per command. // Default: 0 (no delay) MaxFlushDelay time.Duration + + // AdaptiveDelay enables smart delay calculation based on queue fill level. + // When enabled, the delay is automatically adjusted: + // - Queue ≥75% full: No delay (flush immediately to prevent overflow) + // - Queue ≥50% full: 25% of MaxFlushDelay (queue filling up) + // - Queue ≥25% full: 50% of MaxFlushDelay (moderate load) + // - Queue <25% full: 100% of MaxFlushDelay (low load, maximize batching) + // + // This provides automatic adaptation to varying load patterns without + // manual tuning. Uses integer-only arithmetic for optimal performance. + // Default: false (use fixed MaxFlushDelay) + AdaptiveDelay bool } // DefaultAutoPipelineConfig returns the default autopipelining configuration. @@ -179,10 +191,11 @@ type AutoPipeliner struct { sem *internal.FastSemaphore // Semaphore for concurrent batch limit // Lifecycle - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - closed atomic.Bool + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup // Tracks flusher goroutine + batchWg sync.WaitGroup // Tracks batch execution goroutines + closed atomic.Bool } // NewAutoPipeliner creates a new autopipeliner for the given client. @@ -350,6 +363,9 @@ func (ap *AutoPipeliner) Close() error { // Wait for flusher to finish ap.wg.Wait() + // Wait for all batch execution goroutines to finish + ap.batchWg.Wait() + return nil } @@ -391,8 +407,12 @@ func (ap *AutoPipeliner) flusher() { ap.flushBatchSlice() - if ap.config.MaxFlushDelay > 0 && ap.Len() > 0 { - time.Sleep(ap.config.MaxFlushDelay) + // Apply delay if configured and queue still has items + if ap.Len() > 0 { + delay := ap.calculateDelay() + if delay > 0 { + time.Sleep(delay) + } } } } @@ -408,11 +428,30 @@ func (ap *AutoPipeliner) flushBatchSlice() { return } - // Take ownership of current queue - queuedCmds := ap.queue - ap.queue = getQueueSlice(ap.config.MaxBatchSize) + // Take up to MaxBatchSize commands from the queue + var queuedCmds []*queuedCmd + queueLen := len(ap.queue) + batchSize := ap.config.MaxBatchSize + + if queueLen <= batchSize { + // Take all commands + queuedCmds = ap.queue + ap.queue = getQueueSlice(batchSize) + ap.queueLen.Store(0) + } else { + // Take only MaxBatchSize commands, leave the rest in the queue + // Allocate a new slice for the batch + queuedCmds = make([]*queuedCmd, batchSize) + copy(queuedCmds, ap.queue[:batchSize]) + + // Create a new queue with the remaining commands + remaining := queueLen - batchSize + newQueue := make([]*queuedCmd, remaining) + copy(newQueue, ap.queue[batchSize:]) + ap.queue = newQueue + ap.queueLen.Store(int32(remaining)) + } ap.mu.Unlock() - ap.queueLen.Store(0) // Acquire semaphore (limit concurrent batches) // Try fast path first @@ -448,7 +487,14 @@ func (ap *AutoPipeliner) flushBatchSlice() { return } + // Track this goroutine in the batchWg so Close() waits for it + // IMPORTANT: Add to WaitGroup AFTER semaphore is acquired to avoid deadlock + ap.batchWg.Add(1) go func() { + defer ap.batchWg.Done() + defer ap.sem.Release() + defer putQueueSlice(queuedCmds) + // Use Pipeline directly instead of Pipelined to avoid closure overhead pipe := ap.Pipeline() // Process all commands in a pipeline @@ -463,8 +509,6 @@ func (ap *AutoPipeliner) flushBatchSlice() { // Signal completion by sending to buffered channel (non-blocking) qc.done <- struct{}{} } - ap.sem.Release() - putQueueSlice(queuedCmds) }() } @@ -473,6 +517,49 @@ func (ap *AutoPipeliner) Len() int { return int(ap.queueLen.Load()) } +// calculateDelay calculates the delay based on current queue length. +// Uses integer-only arithmetic for optimal performance (no float operations). +// Returns 0 if MaxFlushDelay is 0. +func (ap *AutoPipeliner) calculateDelay() time.Duration { + maxDelay := ap.config.MaxFlushDelay + if maxDelay == 0 { + return 0 + } + + // If adaptive delay is disabled, return fixed delay + if !ap.config.AdaptiveDelay { + return maxDelay + } + + // Get current queue length + queueLen := ap.Len() + if queueLen == 0 { + return 0 + } + + maxBatch := ap.config.MaxBatchSize + + // Use integer arithmetic to avoid float operations + // Calculate thresholds: 75%, 50%, 25% of maxBatch + // Multiply by 4 to avoid division: queueLen * 4 vs maxBatch * 3 (75%) + // + // Adaptive delay strategy: + // - ≥75% full: No delay (flush immediately to prevent overflow) + // - ≥50% full: 25% of max delay (queue filling up) + // - ≥25% full: 50% of max delay (moderate load) + // - <25% full: 100% of max delay (low load, maximize batching) + switch { + case queueLen*4 >= maxBatch*3: // queueLen >= 75% of maxBatch + return 0 // Flush immediately + case queueLen*2 >= maxBatch: // queueLen >= 50% of maxBatch + return maxDelay >> 2 // Divide by 4 using bit shift (faster) + case queueLen*4 >= maxBatch: // queueLen >= 25% of maxBatch + return maxDelay >> 1 // Divide by 2 using bit shift (faster) + default: + return maxDelay + } +} + // Pipeline returns a new pipeline that uses the underlying pipeliner. // This allows you to create a traditional pipeline from an autopipeliner. func (ap *AutoPipeliner) Pipeline() Pipeliner { diff --git a/example_adaptive_delay_test.go b/example_adaptive_delay_test.go new file mode 100644 index 00000000..2bc36c32 --- /dev/null +++ b/example_adaptive_delay_test.go @@ -0,0 +1,100 @@ +package redis_test + +import ( + "context" + "fmt" + "time" + + "github.com/redis/go-redis/v9" +) + +// ExampleAutoPipeliner_adaptiveDelay demonstrates using adaptive delay +// to automatically adjust batching behavior based on load. +func ExampleAutoPipeliner_adaptiveDelay() { + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 100, + MaxConcurrentBatches: 10, + MaxFlushDelay: 100 * time.Microsecond, + AdaptiveDelay: true, // Enable adaptive delay + }, + }) + defer client.Close() + + ctx := context.Background() + + // The autopipeliner will automatically adjust delays: + // - When queue is ≥75% full: No delay (flush immediately) + // - When queue is ≥50% full: 25μs delay + // - When queue is ≥25% full: 50μs delay + // - When queue is <25% full: 100μs delay (maximize batching) + + // Low load scenario - commands will batch with longer delays + for i := 0; i < 10; i++ { + _ = client.Set(ctx, fmt.Sprintf("key:%d", i), i, 0) + time.Sleep(10 * time.Millisecond) // Slow rate + } + + // High load scenario - commands will flush immediately when queue fills + for i := 0; i < 200; i++ { + _ = client.Set(ctx, fmt.Sprintf("key:%d", i), i, 0) + // No sleep - high rate, queue fills up quickly + } + + fmt.Println("Adaptive delay automatically adjusted to load patterns") + // Output: Adaptive delay automatically adjusted to load patterns +} + +// ExampleAutoPipeliner_fixedDelay demonstrates using a fixed delay +// for predictable batching behavior. +func ExampleAutoPipeliner_fixedDelay() { + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 100, + MaxConcurrentBatches: 10, + MaxFlushDelay: 100 * time.Microsecond, + AdaptiveDelay: false, // Use fixed delay + }, + }) + defer client.Close() + + ctx := context.Background() + + // With fixed delay, the autopipeliner always waits 100μs + // between flushes, regardless of queue fill level + for i := 0; i < 100; i++ { + _ = client.Set(ctx, fmt.Sprintf("key:%d", i), i, 0) + } + + fmt.Println("Fixed delay provides predictable batching") + // Output: Fixed delay provides predictable batching +} + +// ExampleAutoPipeliner_noDelay demonstrates zero-delay configuration +// for lowest latency at the cost of higher CPU usage. +func ExampleAutoPipeliner_noDelay() { + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 100, + MaxConcurrentBatches: 10, + MaxFlushDelay: 0, // No delay + AdaptiveDelay: false, + }, + }) + defer client.Close() + + ctx := context.Background() + + // With zero delay, the autopipeliner flushes as fast as possible + // This provides lowest latency but higher CPU usage + for i := 0; i < 100; i++ { + _ = client.Set(ctx, fmt.Sprintf("key:%d", i), i, 0) + } + + fmt.Println("Zero delay provides lowest latency") + // Output: Zero delay provides lowest latency +} +