1
0
mirror of https://github.com/redis/go-redis.git synced 2025-12-02 06:22:31 +03:00

wip, hard limit on batch

This commit is contained in:
Nedyalko Dyakov
2025-11-03 17:17:30 +02:00
parent ed43bd6dbd
commit bdd3016963
3 changed files with 492 additions and 12 deletions

293
adaptive_delay_test.go Normal file
View File

@@ -0,0 +1,293 @@
package redis
import (
"testing"
"time"
)
// TestAdaptiveDelayCalculation tests the adaptive delay calculation logic
func TestAdaptiveDelayCalculation(t *testing.T) {
tests := []struct {
name string
maxBatchSize int
maxDelay time.Duration
adaptive bool
queueLen int
expected time.Duration
}{
// Disabled adaptive delay
{
name: "adaptive disabled, returns fixed delay",
maxBatchSize: 100,
maxDelay: 100 * time.Microsecond,
adaptive: false,
queueLen: 50,
expected: 100 * time.Microsecond,
},
{
name: "adaptive disabled, zero delay",
maxBatchSize: 100,
maxDelay: 0,
adaptive: false,
queueLen: 50,
expected: 0,
},
// Enabled adaptive delay - 75% threshold
{
name: "75% full - no delay",
maxBatchSize: 100,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 75,
expected: 0,
},
{
name: "76% full - no delay",
maxBatchSize: 100,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 76,
expected: 0,
},
{
name: "100% full - no delay",
maxBatchSize: 100,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 100,
expected: 0,
},
// Enabled adaptive delay - 50% threshold
{
name: "50% full - 25% delay",
maxBatchSize: 100,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 50,
expected: 25 * time.Microsecond,
},
{
name: "60% full - 25% delay",
maxBatchSize: 100,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 60,
expected: 25 * time.Microsecond,
},
{
name: "74% full - 25% delay",
maxBatchSize: 100,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 74,
expected: 25 * time.Microsecond,
},
// Enabled adaptive delay - 25% threshold
{
name: "25% full - 50% delay",
maxBatchSize: 100,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 25,
expected: 50 * time.Microsecond,
},
{
name: "30% full - 50% delay",
maxBatchSize: 100,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 30,
expected: 50 * time.Microsecond,
},
{
name: "49% full - 50% delay",
maxBatchSize: 100,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 49,
expected: 50 * time.Microsecond,
},
// Enabled adaptive delay - <25% threshold
{
name: "24% full - 100% delay",
maxBatchSize: 100,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 24,
expected: 100 * time.Microsecond,
},
{
name: "10% full - 100% delay",
maxBatchSize: 100,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 10,
expected: 100 * time.Microsecond,
},
{
name: "1% full - 100% delay",
maxBatchSize: 100,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 1,
expected: 100 * time.Microsecond,
},
// Edge cases
{
name: "empty queue - no delay",
maxBatchSize: 100,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 0,
expected: 0,
},
{
name: "zero max delay - no delay",
maxBatchSize: 100,
maxDelay: 0,
adaptive: true,
queueLen: 50,
expected: 0,
},
// Different batch sizes
{
name: "small batch - 75% full",
maxBatchSize: 10,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 8,
expected: 0,
},
{
name: "small batch - 50% full",
maxBatchSize: 10,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 5,
expected: 25 * time.Microsecond,
},
{
name: "large batch - 75% full",
maxBatchSize: 1000,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 750,
expected: 0,
},
{
name: "large batch - 50% full",
maxBatchSize: 1000,
maxDelay: 100 * time.Microsecond,
adaptive: true,
queueLen: 500,
expected: 25 * time.Microsecond,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
// Create autopipeliner with test config
ap := &AutoPipeliner{
config: &AutoPipelineConfig{
MaxBatchSize: tt.maxBatchSize,
MaxFlushDelay: tt.maxDelay,
AdaptiveDelay: tt.adaptive,
},
}
ap.queueLen.Store(int32(tt.queueLen))
// Calculate delay
result := ap.calculateDelay()
// Verify result
if result != tt.expected {
t.Errorf("calculateDelay() = %v, want %v", result, tt.expected)
}
})
}
}
// TestAdaptiveDelayIntegerArithmetic verifies integer arithmetic correctness
func TestAdaptiveDelayIntegerArithmetic(t *testing.T) {
maxBatch := 100
maxDelay := 100 * time.Microsecond
ap := &AutoPipeliner{
config: &AutoPipelineConfig{
MaxBatchSize: maxBatch,
MaxFlushDelay: maxDelay,
AdaptiveDelay: true,
},
}
// Test all queue lengths from 0 to maxBatch
for queueLen := 0; queueLen <= maxBatch; queueLen++ {
ap.queueLen.Store(int32(queueLen))
delay := ap.calculateDelay()
// Verify delay is one of the expected values
switch {
case queueLen == 0:
if delay != 0 {
t.Errorf("queueLen=%d: expected 0, got %v", queueLen, delay)
}
case queueLen*4 >= maxBatch*3: // ≥75%
if delay != 0 {
t.Errorf("queueLen=%d (≥75%%): expected 0, got %v", queueLen, delay)
}
case queueLen*2 >= maxBatch: // ≥50%
if delay != maxDelay/4 {
t.Errorf("queueLen=%d (≥50%%): expected %v, got %v", queueLen, maxDelay/4, delay)
}
case queueLen*4 >= maxBatch: // ≥25%
if delay != maxDelay/2 {
t.Errorf("queueLen=%d (≥25%%): expected %v, got %v", queueLen, maxDelay/2, delay)
}
default: // <25%
if delay != maxDelay {
t.Errorf("queueLen=%d (<25%%): expected %v, got %v", queueLen, maxDelay, delay)
}
}
}
}
// BenchmarkCalculateDelay benchmarks the delay calculation
func BenchmarkCalculateDelay(b *testing.B) {
ap := &AutoPipeliner{
config: &AutoPipelineConfig{
MaxBatchSize: 100,
MaxFlushDelay: 100 * time.Microsecond,
AdaptiveDelay: true,
},
}
ap.queueLen.Store(50)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = ap.calculateDelay()
}
}
// BenchmarkCalculateDelayDisabled benchmarks with adaptive delay disabled
func BenchmarkCalculateDelayDisabled(b *testing.B) {
ap := &AutoPipeliner{
config: &AutoPipelineConfig{
MaxBatchSize: 100,
MaxFlushDelay: 100 * time.Microsecond,
AdaptiveDelay: false,
},
}
ap.queueLen.Store(50)
b.ResetTimer()
for i := 0; i < b.N; i++ {
_ = ap.calculateDelay()
}
}

View File

@@ -33,6 +33,18 @@ type AutoPipelineConfig struct {
// while adding only ~100μs average latency per command.
// Default: 0 (no delay)
MaxFlushDelay time.Duration
// AdaptiveDelay enables smart delay calculation based on queue fill level.
// When enabled, the delay is automatically adjusted:
// - Queue ≥75% full: No delay (flush immediately to prevent overflow)
// - Queue ≥50% full: 25% of MaxFlushDelay (queue filling up)
// - Queue ≥25% full: 50% of MaxFlushDelay (moderate load)
// - Queue <25% full: 100% of MaxFlushDelay (low load, maximize batching)
//
// This provides automatic adaptation to varying load patterns without
// manual tuning. Uses integer-only arithmetic for optimal performance.
// Default: false (use fixed MaxFlushDelay)
AdaptiveDelay bool
}
// DefaultAutoPipelineConfig returns the default autopipelining configuration.
@@ -181,7 +193,8 @@ type AutoPipeliner struct {
// Lifecycle
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
wg sync.WaitGroup // Tracks flusher goroutine
batchWg sync.WaitGroup // Tracks batch execution goroutines
closed atomic.Bool
}
@@ -350,6 +363,9 @@ func (ap *AutoPipeliner) Close() error {
// Wait for flusher to finish
ap.wg.Wait()
// Wait for all batch execution goroutines to finish
ap.batchWg.Wait()
return nil
}
@@ -391,8 +407,12 @@ func (ap *AutoPipeliner) flusher() {
ap.flushBatchSlice()
if ap.config.MaxFlushDelay > 0 && ap.Len() > 0 {
time.Sleep(ap.config.MaxFlushDelay)
// Apply delay if configured and queue still has items
if ap.Len() > 0 {
delay := ap.calculateDelay()
if delay > 0 {
time.Sleep(delay)
}
}
}
}
@@ -408,11 +428,30 @@ func (ap *AutoPipeliner) flushBatchSlice() {
return
}
// Take ownership of current queue
queuedCmds := ap.queue
ap.queue = getQueueSlice(ap.config.MaxBatchSize)
ap.mu.Unlock()
// Take up to MaxBatchSize commands from the queue
var queuedCmds []*queuedCmd
queueLen := len(ap.queue)
batchSize := ap.config.MaxBatchSize
if queueLen <= batchSize {
// Take all commands
queuedCmds = ap.queue
ap.queue = getQueueSlice(batchSize)
ap.queueLen.Store(0)
} else {
// Take only MaxBatchSize commands, leave the rest in the queue
// Allocate a new slice for the batch
queuedCmds = make([]*queuedCmd, batchSize)
copy(queuedCmds, ap.queue[:batchSize])
// Create a new queue with the remaining commands
remaining := queueLen - batchSize
newQueue := make([]*queuedCmd, remaining)
copy(newQueue, ap.queue[batchSize:])
ap.queue = newQueue
ap.queueLen.Store(int32(remaining))
}
ap.mu.Unlock()
// Acquire semaphore (limit concurrent batches)
// Try fast path first
@@ -448,7 +487,14 @@ func (ap *AutoPipeliner) flushBatchSlice() {
return
}
// Track this goroutine in the batchWg so Close() waits for it
// IMPORTANT: Add to WaitGroup AFTER semaphore is acquired to avoid deadlock
ap.batchWg.Add(1)
go func() {
defer ap.batchWg.Done()
defer ap.sem.Release()
defer putQueueSlice(queuedCmds)
// Use Pipeline directly instead of Pipelined to avoid closure overhead
pipe := ap.Pipeline()
// Process all commands in a pipeline
@@ -463,8 +509,6 @@ func (ap *AutoPipeliner) flushBatchSlice() {
// Signal completion by sending to buffered channel (non-blocking)
qc.done <- struct{}{}
}
ap.sem.Release()
putQueueSlice(queuedCmds)
}()
}
@@ -473,6 +517,49 @@ func (ap *AutoPipeliner) Len() int {
return int(ap.queueLen.Load())
}
// calculateDelay calculates the delay based on current queue length.
// Uses integer-only arithmetic for optimal performance (no float operations).
// Returns 0 if MaxFlushDelay is 0.
func (ap *AutoPipeliner) calculateDelay() time.Duration {
maxDelay := ap.config.MaxFlushDelay
if maxDelay == 0 {
return 0
}
// If adaptive delay is disabled, return fixed delay
if !ap.config.AdaptiveDelay {
return maxDelay
}
// Get current queue length
queueLen := ap.Len()
if queueLen == 0 {
return 0
}
maxBatch := ap.config.MaxBatchSize
// Use integer arithmetic to avoid float operations
// Calculate thresholds: 75%, 50%, 25% of maxBatch
// Multiply by 4 to avoid division: queueLen * 4 vs maxBatch * 3 (75%)
//
// Adaptive delay strategy:
// - ≥75% full: No delay (flush immediately to prevent overflow)
// - ≥50% full: 25% of max delay (queue filling up)
// - ≥25% full: 50% of max delay (moderate load)
// - <25% full: 100% of max delay (low load, maximize batching)
switch {
case queueLen*4 >= maxBatch*3: // queueLen >= 75% of maxBatch
return 0 // Flush immediately
case queueLen*2 >= maxBatch: // queueLen >= 50% of maxBatch
return maxDelay >> 2 // Divide by 4 using bit shift (faster)
case queueLen*4 >= maxBatch: // queueLen >= 25% of maxBatch
return maxDelay >> 1 // Divide by 2 using bit shift (faster)
default:
return maxDelay
}
}
// Pipeline returns a new pipeline that uses the underlying pipeliner.
// This allows you to create a traditional pipeline from an autopipeliner.
func (ap *AutoPipeliner) Pipeline() Pipeliner {

View File

@@ -0,0 +1,100 @@
package redis_test
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
// ExampleAutoPipeliner_adaptiveDelay demonstrates using adaptive delay
// to automatically adjust batching behavior based on load.
func ExampleAutoPipeliner_adaptiveDelay() {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
AutoPipelineConfig: &redis.AutoPipelineConfig{
MaxBatchSize: 100,
MaxConcurrentBatches: 10,
MaxFlushDelay: 100 * time.Microsecond,
AdaptiveDelay: true, // Enable adaptive delay
},
})
defer client.Close()
ctx := context.Background()
// The autopipeliner will automatically adjust delays:
// - When queue is ≥75% full: No delay (flush immediately)
// - When queue is ≥50% full: 25μs delay
// - When queue is ≥25% full: 50μs delay
// - When queue is <25% full: 100μs delay (maximize batching)
// Low load scenario - commands will batch with longer delays
for i := 0; i < 10; i++ {
_ = client.Set(ctx, fmt.Sprintf("key:%d", i), i, 0)
time.Sleep(10 * time.Millisecond) // Slow rate
}
// High load scenario - commands will flush immediately when queue fills
for i := 0; i < 200; i++ {
_ = client.Set(ctx, fmt.Sprintf("key:%d", i), i, 0)
// No sleep - high rate, queue fills up quickly
}
fmt.Println("Adaptive delay automatically adjusted to load patterns")
// Output: Adaptive delay automatically adjusted to load patterns
}
// ExampleAutoPipeliner_fixedDelay demonstrates using a fixed delay
// for predictable batching behavior.
func ExampleAutoPipeliner_fixedDelay() {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
AutoPipelineConfig: &redis.AutoPipelineConfig{
MaxBatchSize: 100,
MaxConcurrentBatches: 10,
MaxFlushDelay: 100 * time.Microsecond,
AdaptiveDelay: false, // Use fixed delay
},
})
defer client.Close()
ctx := context.Background()
// With fixed delay, the autopipeliner always waits 100μs
// between flushes, regardless of queue fill level
for i := 0; i < 100; i++ {
_ = client.Set(ctx, fmt.Sprintf("key:%d", i), i, 0)
}
fmt.Println("Fixed delay provides predictable batching")
// Output: Fixed delay provides predictable batching
}
// ExampleAutoPipeliner_noDelay demonstrates zero-delay configuration
// for lowest latency at the cost of higher CPU usage.
func ExampleAutoPipeliner_noDelay() {
client := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
AutoPipelineConfig: &redis.AutoPipelineConfig{
MaxBatchSize: 100,
MaxConcurrentBatches: 10,
MaxFlushDelay: 0, // No delay
AdaptiveDelay: false,
},
})
defer client.Close()
ctx := context.Background()
// With zero delay, the autopipeliner flushes as fast as possible
// This provides lowest latency but higher CPU usage
for i := 0; i < 100; i++ {
_ = client.Set(ctx, fmt.Sprintf("key:%d", i), i, 0)
}
fmt.Println("Zero delay provides lowest latency")
// Output: Zero delay provides lowest latency
}