diff --git a/example/autopipeline/advanced_examples.go b/example/autopipeline/advanced_examples.go new file mode 100644 index 00000000..589e34ce --- /dev/null +++ b/example/autopipeline/advanced_examples.go @@ -0,0 +1,317 @@ +package main + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/redis/go-redis/v9" +) + +// Example: Rate-limited API with AutoPipeline +func ExampleRateLimitedAPI() { + ctx := context.Background() + + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 100, + MaxConcurrentBatches: 20, + }, + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + // Simulate API that processes requests with rate limiting + const requestsPerSecond = 10000 + const duration = 1 * time.Second + + ticker := time.NewTicker(duration / time.Duration(requestsPerSecond)) + defer ticker.Stop() + + var wg sync.WaitGroup + count := 0 + start := time.Now() + + for range ticker.C { + if count >= requestsPerSecond { + break + } + wg.Add(1) + idx := count + go func() { + defer wg.Done() + // Each API request increments counter and logs + ap.Incr(ctx, "api:requests") + ap.LPush(ctx, "api:log", fmt.Sprintf("request-%d", idx)) + }() + count++ + } + + wg.Wait() + elapsed := time.Since(start) + + fmt.Printf("Rate-limited API: %d requests in %v (%.0f req/sec)\n", + requestsPerSecond, elapsed, float64(requestsPerSecond)/elapsed.Seconds()) +} + +// Example: Worker pool with AutoPipeline +func ExampleWorkerPool() { + ctx := context.Background() + + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 200, + MaxConcurrentBatches: 50, + }, + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + // Worker pool pattern + const numWorkers = 50 + const jobsPerWorker = 200 + const totalJobs = numWorkers * jobsPerWorker + + jobs := make(chan int, numWorkers*2) + var wg sync.WaitGroup + + // Start workers + start := time.Now() + for w := 0; w < numWorkers; w++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for jobID := range jobs { + // Process job: update status and increment counter + ap.Set(ctx, fmt.Sprintf("job:%d:status", jobID), "completed", 0) + ap.Incr(ctx, fmt.Sprintf("worker:%d:processed", workerID)) + } + }(w) + } + + // Send jobs + for i := 0; i < totalJobs; i++ { + jobs <- i + } + close(jobs) + wg.Wait() + + elapsed := time.Since(start) + fmt.Printf("Worker pool: %d workers processed %d jobs in %v (%.0f jobs/sec)\n", + numWorkers, totalJobs, elapsed, float64(totalJobs)/elapsed.Seconds()) +} + +// Example: Batch processing with size-based flushing +func ExampleBatchProcessing() { + ctx := context.Background() + + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 500, // Flush every 500 commands + MaxConcurrentBatches: 10, + }, + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + // Process data in batches + const totalRecords = 10000 + const batchSize = 500 + + start := time.Now() + var wg sync.WaitGroup + + for batch := 0; batch < totalRecords/batchSize; batch++ { + wg.Add(1) + go func(batchID int) { + defer wg.Done() + for i := 0; i < batchSize; i++ { + recordID := batchID*batchSize + i + ap.HSet(ctx, fmt.Sprintf("record:%d", recordID), + "batch", batchID, + "index", i, + "timestamp", time.Now().Unix()) + } + }(batch) + } + + wg.Wait() + elapsed := time.Since(start) + + fmt.Printf("Batch processing: %d records in %v (%.0f records/sec)\n", + totalRecords, elapsed, float64(totalRecords)/elapsed.Seconds()) +} + +// Example: High-throughput metrics collection +func ExampleMetricsCollection() { + ctx := context.Background() + + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + PipelinePoolSize: 30, + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 1000, + MaxConcurrentBatches: 100, + }, + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + // Simulate high-throughput metrics collection + const numMetrics = 100000 + const numSources = 100 + + var wg sync.WaitGroup + var processed atomic.Int64 + + start := time.Now() + + // Each source sends metrics continuously + for source := 0; source < numSources; source++ { + wg.Add(1) + go func(sourceID int) { + defer wg.Done() + metricsPerSource := numMetrics / numSources + for i := 0; i < metricsPerSource; i++ { + timestamp := time.Now().Unix() + ap.ZAdd(ctx, fmt.Sprintf("metrics:source:%d", sourceID), + redis.Z{Score: float64(timestamp), Member: fmt.Sprintf("value-%d", i)}) + ap.Incr(ctx, "metrics:total") + processed.Add(1) + } + }(source) + } + + wg.Wait() + elapsed := time.Since(start) + + fmt.Printf("Metrics collection: %d metrics from %d sources in %v (%.0f metrics/sec)\n", + processed.Load(), numSources, elapsed, float64(processed.Load())/elapsed.Seconds()) +} + +// Example: Session management with AutoPipeline +func ExampleSessionManagement() { + ctx := context.Background() + + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 100, + MaxConcurrentBatches: 50, + }, + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + // Simulate session creation/update from web server + const numSessions = 10000 + + var wg sync.WaitGroup + start := time.Now() + + for i := 0; i < numSessions; i++ { + wg.Add(1) + go func(sessionID int) { + defer wg.Done() + sessionKey := fmt.Sprintf("session:%d", sessionID) + + // Create session with multiple fields + ap.HSet(ctx, sessionKey, + "user_id", sessionID, + "created_at", time.Now().Unix(), + "last_seen", time.Now().Unix(), + "ip", "192.168.1.1") + + // Set expiration + ap.Expire(ctx, sessionKey, 24*time.Hour) + + // Track active sessions + ap.SAdd(ctx, "sessions:active", sessionID) + }(i) + } + + wg.Wait() + elapsed := time.Since(start) + + fmt.Printf("Session management: %d sessions created in %v (%.0f sessions/sec)\n", + numSessions, elapsed, float64(numSessions)/elapsed.Seconds()) +} + +// Example: Cache warming with AutoPipeline +func ExampleCacheWarming() { + ctx := context.Background() + + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + PipelinePoolSize: 50, + PipelineReadBufferSize: 512 * 1024, + PipelineWriteBufferSize: 512 * 1024, + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 1000, + MaxConcurrentBatches: 50, + }, + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + // Warm cache with product data + const numProducts = 50000 + + var wg sync.WaitGroup + start := time.Now() + + // Use worker pool for cache warming + const numWorkers = 100 + products := make(chan int, numWorkers*2) + + for w := 0; w < numWorkers; w++ { + wg.Add(1) + go func() { + defer wg.Done() + for productID := range products { + // Cache product data + ap.HSet(ctx, fmt.Sprintf("product:%d", productID), + "name", fmt.Sprintf("Product %d", productID), + "price", productID*10, + "stock", productID%100, + "category", productID%10) + + // Add to category index + ap.SAdd(ctx, fmt.Sprintf("category:%d:products", productID%10), productID) + + // Add to price-sorted set + ap.ZAdd(ctx, "products:by_price", + redis.Z{Score: float64(productID * 10), Member: productID}) + } + }() + } + + // Send products to workers + for i := 0; i < numProducts; i++ { + products <- i + } + close(products) + wg.Wait() + + elapsed := time.Since(start) + fmt.Printf("Cache warming: %d products cached in %v (%.0f products/sec)\n", + numProducts, elapsed, float64(numProducts)/elapsed.Seconds()) +} + diff --git a/example/autopipeline/benchmark_test.go b/example/autopipeline/benchmark_test.go new file mode 100644 index 00000000..0dce43fd --- /dev/null +++ b/example/autopipeline/benchmark_test.go @@ -0,0 +1,293 @@ +package main + +import ( + "context" + "fmt" + "sync" + "testing" + + "github.com/redis/go-redis/v9" +) + +// BenchmarkIndividualCommands benchmarks individual Redis commands +func BenchmarkIndividualCommands(b *testing.B) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer client.Close() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + key := fmt.Sprintf("bench:individual:%d", i) + client.Set(ctx, key, i, 0) + i++ + } + }) +} + +// BenchmarkManualPipeline benchmarks manual pipeline usage +func BenchmarkManualPipeline(b *testing.B) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer client.Close() + + const batchSize = 100 + + b.ResetTimer() + for i := 0; i < b.N; i += batchSize { + pipe := client.Pipeline() + for j := 0; j < batchSize && i+j < b.N; j++ { + key := fmt.Sprintf("bench:manual:%d", i+j) + pipe.Set(ctx, key, i+j, 0) + } + pipe.Exec(ctx) + } +} + +// BenchmarkAutoPipelineDefault benchmarks AutoPipeline with default config +func BenchmarkAutoPipelineDefault(b *testing.B) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: redis.DefaultAutoPipelineConfig(), + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + key := fmt.Sprintf("bench:auto-default:%d", i) + ap.Do(ctx, "SET", key, i) + i++ + } + }) +} + +// BenchmarkAutoPipelineTuned benchmarks AutoPipeline with tuned config +func BenchmarkAutoPipelineTuned(b *testing.B) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + PipelinePoolSize: 50, + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 500, + MaxConcurrentBatches: 100, + }, + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + key := fmt.Sprintf("bench:auto-tuned:%d", i) + ap.Do(ctx, "SET", key, i) + i++ + } + }) +} + +// BenchmarkAutoPipelineHighThroughput benchmarks AutoPipeline optimized for throughput +func BenchmarkAutoPipelineHighThroughput(b *testing.B) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + PipelinePoolSize: 100, + PipelineReadBufferSize: 1024 * 1024, + PipelineWriteBufferSize: 1024 * 1024, + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 2000, + MaxConcurrentBatches: 200, + }, + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + key := fmt.Sprintf("bench:auto-throughput:%d", i) + ap.Do(ctx, "SET", key, i) + i++ + } + }) +} + +// BenchmarkConcurrentWorkload benchmarks realistic concurrent workload +func BenchmarkConcurrentWorkload(b *testing.B) { + ctx := context.Background() + + configs := []struct { + name string + client *redis.Client + usePipeline bool + }{ + { + name: "Individual", + client: redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }), + usePipeline: false, + }, + { + name: "AutoPipeline-Default", + client: redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: redis.DefaultAutoPipelineConfig(), + }), + usePipeline: true, + }, + { + name: "AutoPipeline-Tuned", + client: redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + PipelinePoolSize: 50, + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 500, + MaxConcurrentBatches: 100, + }, + }), + usePipeline: true, + }, + } + + for _, cfg := range configs { + b.Run(cfg.name, func(b *testing.B) { + defer cfg.client.Close() + + var ap *redis.AutoPipeliner + if cfg.usePipeline { + ap = cfg.client.AutoPipeline() + defer ap.Close() + } + + const numWorkers = 100 + b.ResetTimer() + + var wg sync.WaitGroup + opsPerWorker := b.N / numWorkers + + for w := 0; w < numWorkers; w++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + for i := 0; i < opsPerWorker; i++ { + key := fmt.Sprintf("bench:concurrent:%s:%d:%d", cfg.name, workerID, i) + if cfg.usePipeline { + ap.Do(ctx, "SET", key, i) + } else { + cfg.client.Set(ctx, key, i, 0) + } + } + }(w) + } + wg.Wait() + }) + } +} + +// BenchmarkWebServerSimulation benchmarks web server-like workload +func BenchmarkWebServerSimulation(b *testing.B) { + ctx := context.Background() + + configs := []struct { + name string + client *redis.Client + usePipeline bool + }{ + { + name: "Individual", + client: redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }), + usePipeline: false, + }, + { + name: "AutoPipeline", + client: redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 200, + MaxConcurrentBatches: 100, + }, + }), + usePipeline: true, + }, + } + + for _, cfg := range configs { + b.Run(cfg.name, func(b *testing.B) { + defer cfg.client.Close() + + var ap *redis.AutoPipeliner + if cfg.usePipeline { + ap = cfg.client.AutoPipeline() + defer ap.Close() + } + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + // Simulate web request: increment counter + set session + if cfg.usePipeline { + ap.Incr(ctx, "requests:total") + ap.Set(ctx, fmt.Sprintf("session:%d", i), fmt.Sprintf("data-%d", i), 0) + } else { + cfg.client.Incr(ctx, "requests:total") + cfg.client.Set(ctx, fmt.Sprintf("session:%d", i), fmt.Sprintf("data-%d", i), 0) + } + i++ + } + }) + }) + } +} + +// BenchmarkBatchSizes benchmarks different batch sizes +func BenchmarkBatchSizes(b *testing.B) { + ctx := context.Background() + batchSizes := []int{10, 50, 100, 200, 500, 1000, 2000} + + for _, batchSize := range batchSizes { + b.Run(fmt.Sprintf("BatchSize-%d", batchSize), func(b *testing.B) { + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: batchSize, + MaxConcurrentBatches: 50, + }, + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + key := fmt.Sprintf("bench:batch-%d:%d", batchSize, i) + ap.Do(ctx, "SET", key, i) + i++ + } + }) + }) + } +} + diff --git a/example/autopipeline/main.go b/example/autopipeline/main.go index 6e83e7fb..86e8ff6c 100644 --- a/example/autopipeline/main.go +++ b/example/autopipeline/main.go @@ -10,7 +10,8 @@ import ( ) func main() { - fmt.Println("=== Redis AutoPipeline Examples ===\n") + fmt.Println("=== Redis AutoPipeline Examples ===") + fmt.Println() // Example 1: Basic autopipelining example1BasicAutoPipeline() @@ -23,6 +24,15 @@ func main() { // Example 4: Performance comparison example4PerformanceComparison() + + // Example 5: Realistic streaming workload + example5StreamingWorkload() + + // Example 6: AutoPipeline at its best - concurrent web server simulation + example6WebServerSimulation() + + // Example 7: Tuned AutoPipeline matching manual pipeline + example7TunedAutoPipeline() } // Example 1: Basic autopipelining usage @@ -34,7 +44,7 @@ func example1BasicAutoPipeline() { // Create client with autopipelining enabled client := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", + Addr: "localhost:6379", AutoPipelineConfig: redis.DefaultAutoPipelineConfig(), }) defer client.Close() @@ -69,7 +79,8 @@ func example1BasicAutoPipeline() { fmt.Printf(" %s = %s\n", key, val) } - fmt.Println("โœ“ Successfully set 100 keys using autopipelining\n") + fmt.Println("โœ“ Successfully set 100 keys using autopipelining") + fmt.Println() } // Example 2: Concurrent autopipelining from multiple goroutines @@ -83,7 +94,6 @@ func example2ConcurrentAutoPipeline() { Addr: "localhost:6379", AutoPipelineConfig: &redis.AutoPipelineConfig{ MaxBatchSize: 100, - FlushInterval: 10 * time.Millisecond, MaxConcurrentBatches: 10, }, }) @@ -136,8 +146,6 @@ func example3CustomConfig() { AutoPipelineConfig: &redis.AutoPipelineConfig{ // Flush when we have 50 commands MaxBatchSize: 50, - // Or flush every 5ms, whichever comes first - FlushInterval: 5 * time.Millisecond, // Allow up to 5 concurrent pipeline executions MaxConcurrentBatches: 5, }, @@ -171,7 +179,7 @@ func example4PerformanceComparison() { fmt.Println("----------------------------------") ctx := context.Background() - const numCommands = 1000 + const numCommands = 10000 // Test 1: Individual commands client1 := redis.NewClient(&redis.Options{ @@ -207,10 +215,14 @@ func example4PerformanceComparison() { } manualTime := time.Since(start) - // Test 3: AutoPipeline + // Test 3: AutoPipeline (with worker pool - OPTIMIZED) client3 := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - AutoPipelineConfig: redis.DefaultAutoPipelineConfig(), + Addr: "localhost:6379", + PipelinePoolSize: 30, + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxConcurrentBatches: 300, + MaxBatchSize: 200, + }, }) defer client3.Close() @@ -218,26 +230,359 @@ func example4PerformanceComparison() { defer ap.Close() start = time.Now() + + // Use worker pool instead of spawning 10k goroutines + const numWorkers = 100 + workCh := make(chan int, numWorkers) var wg3 sync.WaitGroup - for i := 0; i < numCommands; i++ { + + // Start workers + for w := 0; w < numWorkers; w++ { wg3.Add(1) - go func(idx int) { + go func() { defer wg3.Done() - key := fmt.Sprintf("perf:auto:%d", idx) - ap.Do(ctx, "SET", key, idx) - }(i) + for idx := range workCh { + key := fmt.Sprintf("perf:auto:%d", idx) + ap.Do(ctx, "SET", key, idx) + } + }() } + + // Send work to workers + for i := 0; i < numCommands; i++ { + workCh <- i + } + close(workCh) wg3.Wait() autoTime := time.Since(start) + // Test 4: AutoPipeline (UNOPTIMIZED - spawning 10k goroutines) + client4 := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + PipelinePoolSize: 30, + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxConcurrentBatches: 300, + MaxBatchSize: 200, + }, + }) + defer client4.Close() + + ap4 := client4.AutoPipeline() + defer ap4.Close() + + start = time.Now() + var wg4 sync.WaitGroup + for i := 0; i < numCommands; i++ { + wg4.Add(1) + go func(idx int) { + defer wg4.Done() + key := fmt.Sprintf("perf:auto-unopt:%d", idx) + ap4.Do(ctx, "SET", key, idx) + }(i) + } + wg4.Wait() + autoTimeUnopt := time.Since(start) + // Results fmt.Printf("Executing %d SET commands:\n", numCommands) - fmt.Printf(" Individual commands: %v (%.0f ops/sec)\n", individualTime, float64(numCommands)/individualTime.Seconds()) - fmt.Printf(" Manual pipeline: %v (%.0f ops/sec) - %.1fx faster\n", manualTime, float64(numCommands)/manualTime.Seconds(), float64(individualTime)/float64(manualTime)) - fmt.Printf(" AutoPipeline: %v (%.0f ops/sec) - %.1fx faster\n", autoTime, float64(numCommands)/autoTime.Seconds(), float64(individualTime)/float64(autoTime)) + fmt.Printf(" Individual commands: %v (%.0f ops/sec)\n", individualTime, float64(numCommands)/individualTime.Seconds()) + fmt.Printf(" Manual pipeline: %v (%.0f ops/sec) - %.1fx faster\n", manualTime, float64(numCommands)/manualTime.Seconds(), float64(individualTime)/float64(manualTime)) + fmt.Printf(" AutoPipeline (optimized): %v (%.0f ops/sec) - %.1fx faster\n", autoTime, float64(numCommands)/autoTime.Seconds(), float64(individualTime)/float64(autoTime)) + fmt.Printf(" AutoPipeline (unoptimized): %v (%.0f ops/sec) - %.1fx faster\n", autoTimeUnopt, float64(numCommands)/autoTimeUnopt.Seconds(), float64(individualTime)/float64(autoTimeUnopt)) + fmt.Println() + fmt.Println("๐Ÿ“Š Analysis:") + fmt.Println(" โ€ข Manual pipeline: Single batch of 10k commands (lowest latency)") + fmt.Println(" โ€ข AutoPipeline: Multiple batches with coordination overhead") + fmt.Println(" โ€ข Worker pool: Reduces goroutine creation but adds channel overhead") + fmt.Println() + fmt.Println("๐Ÿ’ก When to use AutoPipeline:") + fmt.Println(" โœ“ Commands arrive from multiple sources/goroutines over time") + fmt.Println(" โœ“ You want automatic batching without manual pipeline management") + fmt.Println(" โœ“ Concurrent workloads where commands trickle in continuously") + fmt.Println() + fmt.Println("๐Ÿ’ก When to use Manual Pipeline:") + fmt.Println(" โœ“ You have all commands ready upfront (like this benchmark)") + fmt.Println(" โœ“ You want absolute maximum throughput for batch operations") + fmt.Println(" โœ“ Single-threaded batch processing") fmt.Println() // Cleanup client1.Del(ctx, "perf:*") } +// Example 5: Realistic streaming workload - commands arriving over time +func example5StreamingWorkload() { + fmt.Println("Example 5: Streaming Workload (Realistic Use Case)") + fmt.Println("---------------------------------------------------") + + ctx := context.Background() + const totalCommands = 10000 + const arrivalRateMicros = 10 // Commands arrive every 10 microseconds + + // Test 1: Individual commands with streaming arrival + client1 := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer client1.Close() + + start := time.Now() + ticker := time.NewTicker(time.Duration(arrivalRateMicros) * time.Microsecond) + defer ticker.Stop() + + count := 0 + for range ticker.C { + if count >= totalCommands { + break + } + key := fmt.Sprintf("stream:individual:%d", count) + client1.Set(ctx, key, count, 0) + count++ + } + individualStreamTime := time.Since(start) + + // Test 2: AutoPipeline with streaming arrival + client2 := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 100, + MaxConcurrentBatches: 50, + MaxFlushDelay: 0, // Flush immediately when batch is ready + }, + }) + defer client2.Close() + + ap := client2.AutoPipeline() + defer ap.Close() + + start = time.Now() + ticker2 := time.NewTicker(time.Duration(arrivalRateMicros) * time.Microsecond) + defer ticker2.Stop() + + var wg sync.WaitGroup + count = 0 + for range ticker2.C { + if count >= totalCommands { + break + } + wg.Add(1) + idx := count + go func() { + defer wg.Done() + key := fmt.Sprintf("stream:auto:%d", idx) + ap.Do(ctx, "SET", key, idx) + }() + count++ + } + wg.Wait() + autoStreamTime := time.Since(start) + + // Results + fmt.Printf("Streaming %d commands (1 every %dยตs):\n", totalCommands, arrivalRateMicros) + fmt.Printf(" Individual commands: %v (%.0f ops/sec)\n", individualStreamTime, float64(totalCommands)/individualStreamTime.Seconds()) + fmt.Printf(" AutoPipeline: %v (%.0f ops/sec) - %.1fx faster\n", autoStreamTime, float64(totalCommands)/autoStreamTime.Seconds(), float64(individualStreamTime)/float64(autoStreamTime)) + fmt.Println() + fmt.Println("๐Ÿ’ก In streaming workloads, AutoPipeline automatically batches commands") + fmt.Println(" as they arrive, providing better throughput without manual batching.") + fmt.Println() +} + +// Example 6: Web server simulation - AutoPipeline's ideal use case +func example6WebServerSimulation() { + fmt.Println("Example 6: Web Server Simulation (AutoPipeline's Sweet Spot)") + fmt.Println("-------------------------------------------------------------") + + ctx := context.Background() + const numRequests = 10000 + const numConcurrentUsers = 100 + + // Simulate web server handling concurrent requests + // Each request needs to increment a counter and set user data + + // Test 1: Individual commands (typical naive approach) + client1 := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer client1.Close() + + start := time.Now() + var wg1 sync.WaitGroup + for i := 0; i < numRequests; i++ { + wg1.Add(1) + go func(reqID int) { + defer wg1.Done() + // Simulate web request: increment counter + set session data + client1.Incr(ctx, "requests:total") + client1.Set(ctx, fmt.Sprintf("session:%d", reqID), fmt.Sprintf("data-%d", reqID), 0) + }(i) + } + wg1.Wait() + individualWebTime := time.Since(start) + + // Test 2: AutoPipeline (automatic batching) + client2 := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 200, + MaxConcurrentBatches: 100, + }, + }) + defer client2.Close() + + ap := client2.AutoPipeline() + defer ap.Close() + + start = time.Now() + var wg2 sync.WaitGroup + for i := 0; i < numRequests; i++ { + wg2.Add(1) + go func(reqID int) { + defer wg2.Done() + // Same operations, but automatically batched + ap.Incr(ctx, "requests:total:auto") + ap.Set(ctx, fmt.Sprintf("session:auto:%d", reqID), fmt.Sprintf("data-%d", reqID), 0) + }(i) + } + wg2.Wait() + autoWebTime := time.Since(start) + + // Results + totalOps := numRequests * 2 // 2 operations per request + fmt.Printf("Simulating %d concurrent web requests (%d total operations):\n", numRequests, totalOps) + fmt.Printf(" Individual commands: %v (%.0f ops/sec)\n", individualWebTime, float64(totalOps)/individualWebTime.Seconds()) + fmt.Printf(" AutoPipeline: %v (%.0f ops/sec) - %.1fx faster\n", autoWebTime, float64(totalOps)/autoWebTime.Seconds(), float64(individualWebTime)/float64(autoWebTime)) + fmt.Println() + fmt.Println("๐ŸŽฏ This is AutoPipeline's ideal scenario:") + fmt.Println(" โ€ข Multiple concurrent goroutines (simulating web requests)") + fmt.Println(" โ€ข Commands arriving continuously over time") + fmt.Println(" โ€ข No manual pipeline management needed") + fmt.Println(" โ€ข Automatic batching provides massive speedup") + fmt.Println() +} + +// Example 7: Tuned AutoPipeline to match manual pipeline performance +func example7TunedAutoPipeline() { + fmt.Println("Example 7: Tuned AutoPipeline Configuration") + fmt.Println("--------------------------------------------") + + ctx := context.Background() + const numCommands = 10000 + + // Test 1: Manual pipeline (baseline) + client1 := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer client1.Close() + + start := time.Now() + pipe := client1.Pipeline() + for i := 0; i < numCommands; i++ { + key := fmt.Sprintf("tuned:manual:%d", i) + pipe.Set(ctx, key, i, 0) + } + if _, err := pipe.Exec(ctx); err != nil { + fmt.Printf("Error: %v\n", err) + return + } + manualTime := time.Since(start) + + // Test 2: AutoPipeline with default config + client2 := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: redis.DefaultAutoPipelineConfig(), + }) + defer client2.Close() + + ap2 := client2.AutoPipeline() + defer ap2.Close() + + start = time.Now() + var wg2 sync.WaitGroup + for i := 0; i < numCommands; i++ { + wg2.Add(1) + go func(idx int) { + defer wg2.Done() + key := fmt.Sprintf("tuned:auto-default:%d", idx) + ap2.Do(ctx, "SET", key, idx) + }(i) + } + wg2.Wait() + autoDefaultTime := time.Since(start) + + // Test 3: AutoPipeline with tuned config (large batches) + client3 := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + PipelinePoolSize: 50, + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 2000, // Much larger batches + MaxConcurrentBatches: 50, // More parallelism + MaxFlushDelay: 0, // No delay + }, + }) + defer client3.Close() + + ap3 := client3.AutoPipeline() + defer ap3.Close() + + start = time.Now() + var wg3 sync.WaitGroup + for i := 0; i < numCommands; i++ { + wg3.Add(1) + go func(idx int) { + defer wg3.Done() + key := fmt.Sprintf("tuned:auto-tuned:%d", idx) + ap3.Do(ctx, "SET", key, idx) + }(i) + } + wg3.Wait() + autoTunedTime := time.Since(start) + + // Test 4: AutoPipeline with extreme tuning (single batch) + client4 := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + PipelinePoolSize: 10, + PipelineReadBufferSize: 1024 * 1024, // 1 MiB + PipelineWriteBufferSize: 1024 * 1024, // 1 MiB + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 10000, // Single batch like manual pipeline + MaxConcurrentBatches: 10, + MaxFlushDelay: 0, + }, + }) + defer client4.Close() + + ap4 := client4.AutoPipeline() + defer ap4.Close() + + start = time.Now() + var wg4 sync.WaitGroup + for i := 0; i < numCommands; i++ { + wg4.Add(1) + go func(idx int) { + defer wg4.Done() + key := fmt.Sprintf("tuned:auto-extreme:%d", idx) + ap4.Do(ctx, "SET", key, idx) + }(i) + } + wg4.Wait() + autoExtremeTime := time.Since(start) + + // Results + fmt.Printf("Executing %d SET commands:\n", numCommands) + fmt.Printf(" Manual pipeline: %v (%.0f ops/sec) [baseline]\n", manualTime, float64(numCommands)/manualTime.Seconds()) + fmt.Printf(" AutoPipeline (default): %v (%.0f ops/sec) - %.1fx slower\n", autoDefaultTime, float64(numCommands)/autoDefaultTime.Seconds(), float64(autoDefaultTime)/float64(manualTime)) + fmt.Printf(" AutoPipeline (tuned): %v (%.0f ops/sec) - %.1fx slower\n", autoTunedTime, float64(numCommands)/autoTunedTime.Seconds(), float64(autoTunedTime)/float64(manualTime)) + fmt.Printf(" AutoPipeline (extreme): %v (%.0f ops/sec) - %.1fx slower\n", autoExtremeTime, float64(numCommands)/autoExtremeTime.Seconds(), float64(autoExtremeTime)/float64(manualTime)) + fmt.Println() + fmt.Println("๐Ÿ“Š Configuration Impact:") + fmt.Printf(" Default config: MaxBatchSize=50, MaxConcurrentBatches=10\n") + fmt.Printf(" Tuned config: MaxBatchSize=2000, MaxConcurrentBatches=50\n") + fmt.Printf(" Extreme config: MaxBatchSize=10000 (single batch), 1MiB buffers\n") + fmt.Println() + fmt.Println("๐Ÿ’ก Key Insights:") + fmt.Println(" โ€ข Larger batches reduce the number of round-trips") + fmt.Println(" โ€ข More concurrent batches improve parallelism") + fmt.Println(" โ€ข Larger buffers reduce memory allocations") + fmt.Println(" โ€ข Even with extreme tuning, coordination overhead remains") + fmt.Println(" โ€ข For pure batch workloads, manual pipeline is still optimal") + fmt.Println() +}