From 7198f47baa5873c1d2d41b4263c9cc24295f5a37 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Mon, 27 Oct 2025 16:08:40 +0200 Subject: [PATCH] autopipeline playground --- autopipeline.go | 355 +++++++++++++++++++++++++ autopipeline_bench_test.go | 341 ++++++++++++++++++++++++ autopipeline_sequential_test.go | 151 +++++++++++ autopipeline_test.go | 448 ++++++++++++++++++++++++++++++++ commands.go | 1 + example/autopipeline/go.mod | 12 + example/autopipeline/go.sum | 8 + example/autopipeline/main.go | 243 +++++++++++++++++ internal/pool/pool.go | 7 - options.go | 6 + osscluster.go | 33 +++ osscluster_autopipeline_test.go | 172 ++++++++++++ pipeline.go | 4 + redis.go | 22 ++ ring.go | 4 + tx.go | 4 + universal.go | 2 + 17 files changed, 1806 insertions(+), 7 deletions(-) create mode 100644 autopipeline.go create mode 100644 autopipeline_bench_test.go create mode 100644 autopipeline_sequential_test.go create mode 100644 autopipeline_test.go create mode 100644 example/autopipeline/go.mod create mode 100644 example/autopipeline/go.sum create mode 100644 example/autopipeline/main.go create mode 100644 osscluster_autopipeline_test.go diff --git a/autopipeline.go b/autopipeline.go new file mode 100644 index 00000000..f2fb9b9a --- /dev/null +++ b/autopipeline.go @@ -0,0 +1,355 @@ +package redis + +import ( + "context" + "sync" + "sync/atomic" + "time" +) + +// AutoPipelineConfig configures the autopipelining behavior. +type AutoPipelineConfig struct { + // MaxBatchSize is the maximum number of commands to batch before flushing. + // Default: 100 + MaxBatchSize int + + // FlushInterval is the maximum time to wait before flushing pending commands. + // Default: 10ms + FlushInterval time.Duration + + // MaxConcurrentBatches is the maximum number of concurrent pipeline executions. + // This prevents overwhelming the server with too many concurrent pipelines. + // Default: 10 + MaxConcurrentBatches int +} + +// DefaultAutoPipelineConfig returns the default autopipelining configuration. +func DefaultAutoPipelineConfig() *AutoPipelineConfig { + return &AutoPipelineConfig{ + MaxBatchSize: 30, + FlushInterval: 10 * time.Millisecond, + MaxConcurrentBatches: 20, + } +} + +// pipelinerClient is an interface for clients that support pipelining. +// Both Client and ClusterClient implement this interface. +type pipelinerClient interface { + Process(ctx context.Context, cmd Cmder) error + Pipeline() Pipeliner +} + +// queuedCmd wraps a command with a done channel for completion notification +type queuedCmd struct { + cmd Cmder + done chan struct{} +} + +// autoPipelineCmd wraps a command and blocks on result access until execution completes. +type autoPipelineCmd struct { + Cmder + done <-chan struct{} +} + +func (c *autoPipelineCmd) Err() error { + <-c.done + return c.Cmder.Err() +} + +func (c *autoPipelineCmd) String() string { + <-c.done + return c.Cmder.String() +} + +// AutoPipeliner automatically batches commands and executes them in pipelines. +// It's safe for concurrent use by multiple goroutines. +// +// AutoPipeliner works by: +// 1. Collecting commands from multiple goroutines into a shared queue +// 2. Automatically flushing the queue when: +// - The batch size reaches MaxBatchSize +// - The flush interval (FlushInterval) expires +// +// 3. Executing batched commands using Redis pipelining +// +// This provides significant performance improvements for workloads with many +// concurrent small operations, as it reduces the number of network round-trips. +type AutoPipeliner struct { + pipeliner pipelinerClient + config *AutoPipelineConfig + + // Command queue + mu sync.Mutex + queue []*queuedCmd + queueLen atomic.Int32 // Fast path check without lock + + // Flush control + flushTimer *time.Timer + flushCh chan struct{} // Signal to flush immediately + + // Concurrency control + sem chan struct{} // Semaphore for concurrent batch limit + + // Lifecycle + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + closed atomic.Bool + cachedFlushInterval atomic.Int64 +} + +// NewAutoPipeliner creates a new autopipeliner for the given client. +// The client can be either *Client or *ClusterClient. +func NewAutoPipeliner(pipeliner pipelinerClient, config *AutoPipelineConfig) *AutoPipeliner { + if config == nil { + config = DefaultAutoPipelineConfig() + } + + ctx, cancel := context.WithCancel(context.Background()) + + ap := &AutoPipeliner{ + pipeliner: pipeliner, + config: config, + queue: make([]*queuedCmd, 0, config.MaxBatchSize), + flushTimer: time.NewTimer(config.FlushInterval), + flushCh: make(chan struct{}, 1), + sem: make(chan struct{}, config.MaxConcurrentBatches), + ctx: ctx, + cancel: cancel, + } + + // Stop the timer initially + if !ap.flushTimer.Stop() { + <-ap.flushTimer.C + } + + // Start background flusher + ap.wg.Add(1) + go ap.flusher() + + return ap +} + +// Do queues a command for autopipelined execution and returns immediately. +// The returned command will block when you access its result (Err(), Val(), Result(), etc.) +// until the command has been executed. +// +// This allows sequential usage without goroutines: +// +// cmd1 := ap.Do(ctx, "GET", "key1") +// cmd2 := ap.Do(ctx, "GET", "key2") +// // Commands are queued, will be batched and flushed automatically +// val1, err1 := cmd1.Result() // Blocks until command executes +// val2, err2 := cmd2.Result() // Blocks until command executes +func (ap *AutoPipeliner) Do(ctx context.Context, args ...interface{}) Cmder { + cmd := NewCmd(ctx, args...) + if len(args) == 0 { + cmd.SetErr(ErrClosed) + return cmd + } + done := ap.process(ctx, cmd) + return &autoPipelineCmd{Cmder: cmd, done: done} +} + +// Process queues a command for autopipelined execution and returns immediately. +// The command will be executed asynchronously when the batch is flushed. +// +// Unlike Do(), this does NOT wrap the command, so accessing results will NOT block. +// Use this only when you're managing synchronization yourself (e.g., with goroutines). +// +// For sequential usage, use Do() instead. +func (ap *AutoPipeliner) Process(ctx context.Context, cmd Cmder) error { + _ = ap.process(ctx, cmd) + return nil +} + +// process is the internal method that queues a command and returns its done channel. +func (ap *AutoPipeliner) process(ctx context.Context, cmd Cmder) <-chan struct{} { + if ap.closed.Load() { + cmd.SetErr(ErrClosed) + closedCh := make(chan struct{}) + close(closedCh) + return closedCh + } + + // Create queued command with done channel + qc := &queuedCmd{ + cmd: cmd, + done: make(chan struct{}), + } + + ap.mu.Lock() + ap.queue = append(ap.queue, qc) + queueLen := len(ap.queue) + ap.queueLen.Store(int32(queueLen)) + + // Check if we should flush immediately + shouldFlush := queueLen >= ap.config.MaxBatchSize + + // Start flush timer if this is the first command + if queueLen == 1 { + ap.flushTimer.Reset(ap.config.FlushInterval) + } + if queueLen > 1 { + cachedFlushInterval := ap.cachedFlushInterval.Load() + if cachedFlushInterval == 0 && ap.cachedFlushInterval.CompareAndSwap(cachedFlushInterval, 0) { + ap.config.FlushInterval = time.Duration(cachedFlushInterval) * time.Nanosecond + } + } + + ap.mu.Unlock() + + if shouldFlush { + // Signal immediate flush (non-blocking) + select { + case ap.flushCh <- struct{}{}: + default: + } + } + + return qc.done +} + +// Flush immediately flushes all pending commands. +// This is useful when you want to ensure all commands are executed +// before proceeding (e.g., before closing the autopipeliner). +func (ap *AutoPipeliner) Flush(ctx context.Context) error { + if ap.closed.Load() { + return ErrClosed + } + + // Signal flush + select { + case ap.flushCh <- struct{}{}: + case <-ctx.Done(): + return ctx.Err() + } + + // Wait a bit for the flush to complete + // This is a best-effort approach + time.Sleep(time.Millisecond) + + return nil +} + +// Close stops the autopipeliner and flushes any pending commands. +func (ap *AutoPipeliner) Close() error { + if !ap.closed.CompareAndSwap(false, true) { + return nil // Already closed + } + + // Cancel context to stop flusher + ap.cancel() + + // Wait for flusher to finish + ap.wg.Wait() + + return nil +} + +// flusher is the background goroutine that flushes batches. +func (ap *AutoPipeliner) flusher() { + defer ap.wg.Done() + + for { + select { + case <-ap.ctx.Done(): + // Final flush before shutdown + ap.flushBatch() + return + + case <-ap.flushTimer.C: + // Timer expired, flush if we have commands + if ap.queueLen.Load() > 0 { + ap.flushBatch() + } + // Reset timer for next interval if queue is not empty + ap.mu.Lock() + if len(ap.queue) > 0 { + ap.flushTimer.Reset(ap.config.FlushInterval) + } + ap.mu.Unlock() + + case <-ap.flushCh: + // Immediate flush requested + ap.flushBatch() + } + } +} + +// flushBatch flushes the current batch of commands. +func (ap *AutoPipeliner) flushBatch() { + // Get commands from queue + ap.mu.Lock() + if len(ap.queue) == 0 { + ap.queueLen.Store(0) + ap.mu.Unlock() + return + } + + // Take ownership of current queue + queuedCmds := ap.queue + ap.queue = make([]*queuedCmd, 0, ap.config.MaxBatchSize) + ap.queueLen.Store(0) + + // Stop timer + if !ap.flushTimer.Stop() { + select { + case <-ap.flushTimer.C: + default: + } + } + + ap.mu.Unlock() + + // Acquire semaphore (limit concurrent batches) + select { + case ap.sem <- struct{}{}: + defer func() { + <-ap.sem + }() + case <-ap.ctx.Done(): + // Context cancelled, set error on all commands and notify + for _, qc := range queuedCmds { + qc.cmd.SetErr(ErrClosed) + close(qc.done) + } + return + } + + if len(queuedCmds) == 0 { + return + } + if len(queuedCmds) == 1 { + if ap.cachedFlushInterval.CompareAndSwap(0, ap.config.FlushInterval.Nanoseconds()) { + ap.config.FlushInterval = time.Nanosecond + } + _ = ap.pipeliner.Process(context.Background(), queuedCmds[0].cmd) + close(queuedCmds[0].done) + return + } + + cachedFlushInterval := ap.cachedFlushInterval.Load() + if cachedFlushInterval != 0 && ap.cachedFlushInterval.CompareAndSwap(cachedFlushInterval, 0) { + ap.config.FlushInterval = time.Duration(ap.cachedFlushInterval.Load()) * time.Nanosecond + } + // Execute pipeline + pipe := ap.pipeliner.Pipeline() + for _, qc := range queuedCmds { + _ = pipe.Process(context.Background(), qc.cmd) + } + + // Execute and wait for completion + _, _ = pipe.Exec(context.Background()) + + // IMPORTANT: Only notify after pipeline execution is complete + // This ensures command results are fully populated before waiters proceed + for _, qc := range queuedCmds { + close(qc.done) + } +} + +// Len returns the current number of queued commands. +func (ap *AutoPipeliner) Len() int { + return int(ap.queueLen.Load()) +} diff --git a/autopipeline_bench_test.go b/autopipeline_bench_test.go new file mode 100644 index 00000000..74805b9d --- /dev/null +++ b/autopipeline_bench_test.go @@ -0,0 +1,341 @@ +package redis_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/redis/go-redis/v9" +) + +// BenchmarkIndividualCommands benchmarks executing commands one at a time +func BenchmarkIndividualCommands(b *testing.B) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + }) + defer client.Close() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + key := fmt.Sprintf("key%d", i) + if err := client.Set(ctx, key, i, 0).Err(); err != nil { + b.Fatal(err) + } + i++ + } + }) +} + +// BenchmarkManualPipeline benchmarks using manual pipelining +func BenchmarkManualPipeline(b *testing.B) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + }) + defer client.Close() + + const batchSize = 100 + + b.ResetTimer() + + for i := 0; i < b.N; i += batchSize { + pipe := client.Pipeline() + + end := i + batchSize + if end > b.N { + end = b.N + } + + for j := i; j < end; j++ { + key := fmt.Sprintf("key%d", j) + pipe.Set(ctx, key, j, 0) + } + + if _, err := pipe.Exec(ctx); err != nil { + b.Fatal(err) + } + } +} + +// BenchmarkAutoPipeline benchmarks using autopipelining +func BenchmarkAutoPipeline(b *testing.B) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 100, + FlushInterval: 10 * time.Millisecond, + MaxConcurrentBatches: 10, + }, + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + key := fmt.Sprintf("key%d", i) + ap.Do(ctx, "SET", key, i) + i++ + } + }) + + b.StopTimer() + // Wait for final flush + time.Sleep(50 * time.Millisecond) +} + +// BenchmarkAutoPipelineVsManual compares autopipelining with manual pipelining +func BenchmarkAutoPipelineVsManual(b *testing.B) { + const numCommands = 10000 + + b.Run("Manual", func(b *testing.B) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + }) + defer client.Close() + + b.ResetTimer() + for n := 0; n < b.N; n++ { + pipe := client.Pipeline() + for i := 0; i < numCommands; i++ { + key := fmt.Sprintf("key%d", i) + pipe.Set(ctx, key, i, 0) + } + if _, err := pipe.Exec(ctx); err != nil { + b.Fatal(err) + } + } + }) + + b.Run("Auto", func(b *testing.B) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + AutoPipelineConfig: redis.DefaultAutoPipelineConfig(), + }) + defer client.Close() + + b.ResetTimer() + for n := 0; n < b.N; n++ { + ap := client.AutoPipeline() + for i := 0; i < numCommands; i++ { + key := fmt.Sprintf("key%d", i) + ap.Do(ctx, "SET", key, i) + } + ap.Close() + } + }) +} + +// BenchmarkConcurrentAutoPipeline benchmarks concurrent autopipelining +func BenchmarkConcurrentAutoPipeline(b *testing.B) { + benchmarks := []struct { + name string + goroutines int + }{ + {"1goroutine", 1}, + {"10goroutines", 10}, + {"100goroutines", 100}, + } + + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 100, + FlushInterval: 10 * time.Millisecond, + MaxConcurrentBatches: 10, + }, + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + b.ResetTimer() + + var wg sync.WaitGroup + commandsPerGoroutine := b.N / bm.goroutines + if commandsPerGoroutine == 0 { + commandsPerGoroutine = 1 + } + + wg.Add(bm.goroutines) + for g := 0; g < bm.goroutines; g++ { + go func(goroutineID int) { + defer wg.Done() + for i := 0; i < commandsPerGoroutine; i++ { + key := fmt.Sprintf("g%d:key%d", goroutineID, i) + ap.Do(ctx, "SET", key, i) + } + }(g) + } + wg.Wait() + + b.StopTimer() + time.Sleep(50 * time.Millisecond) + }) + } +} + +// BenchmarkAutoPipelineBatchSizes tests different batch sizes +func BenchmarkAutoPipelineBatchSizes(b *testing.B) { + batchSizes := []int{10, 50, 100, 500, 1000} + + for _, batchSize := range batchSizes { + b.Run(fmt.Sprintf("batch=%d", batchSize), func(b *testing.B) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: batchSize, + FlushInterval: 10 * time.Millisecond, + MaxConcurrentBatches: 10, + }, + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + key := fmt.Sprintf("key%d", i) + ap.Do(ctx, "SET", key, i) + } + + b.StopTimer() + time.Sleep(50 * time.Millisecond) + }) + } +} + +// BenchmarkAutoPipelineFlushIntervals tests different flush intervals +func BenchmarkAutoPipelineFlushIntervals(b *testing.B) { + intervals := []time.Duration{ + 1 * time.Millisecond, + 5 * time.Millisecond, + 10 * time.Millisecond, + 50 * time.Millisecond, + } + + for _, interval := range intervals { + b.Run(fmt.Sprintf("interval=%s", interval), func(b *testing.B) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 100, + FlushInterval: interval, + MaxConcurrentBatches: 10, + }, + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + b.ResetTimer() + for i := 0; i < b.N; i++ { + key := fmt.Sprintf("key%d", i) + ap.Do(ctx, "SET", key, i) + } + + b.StopTimer() + time.Sleep(100 * time.Millisecond) + }) + } +} + +// BenchmarkThroughput measures throughput (ops/sec) for different approaches +func BenchmarkThroughput(b *testing.B) { + const duration = 5 * time.Second + const numGoroutines = 10 + + b.Run("Individual", func(b *testing.B) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + }) + defer client.Close() + + b.ResetTimer() + + var wg sync.WaitGroup + var count int64 + + deadline := time.Now().Add(duration) + + wg.Add(numGoroutines) + for g := 0; g < numGoroutines; g++ { + go func(goroutineID int) { + defer wg.Done() + i := 0 + for time.Now().Before(deadline) { + key := fmt.Sprintf("g%d:key%d", goroutineID, i) + if err := client.Set(ctx, key, i, 0).Err(); err != nil { + b.Error(err) + return + } + i++ + count++ + } + }(g) + } + wg.Wait() + + b.ReportMetric(float64(count)/duration.Seconds(), "ops/sec") + }) + + b.Run("AutoPipeline", func(b *testing.B) { + ctx := context.Background() + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + AutoPipelineConfig: redis.DefaultAutoPipelineConfig(), + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + b.ResetTimer() + + var wg sync.WaitGroup + var count int64 + + deadline := time.Now().Add(duration) + + wg.Add(numGoroutines) + for g := 0; g < numGoroutines; g++ { + go func(goroutineID int) { + defer wg.Done() + i := 0 + for time.Now().Before(deadline) { + key := fmt.Sprintf("g%d:key%d", goroutineID, i) + ap.Do(ctx, "SET", key, i) + i++ + count++ + } + }(g) + } + wg.Wait() + + b.StopTimer() + time.Sleep(100 * time.Millisecond) + + b.ReportMetric(float64(count)/duration.Seconds(), "ops/sec") + }) +} + diff --git a/autopipeline_sequential_test.go b/autopipeline_sequential_test.go new file mode 100644 index 00000000..01fba668 --- /dev/null +++ b/autopipeline_sequential_test.go @@ -0,0 +1,151 @@ +package redis_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/redis/go-redis/v9" +) + +func TestAutoPipelineSequential(t *testing.T) { + ctx := context.Background() + + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 10, + FlushInterval: 50 * time.Millisecond, + MaxConcurrentBatches: 5, + }, + }) + defer client.Close() + + if err := client.FlushDB(ctx).Err(); err != nil { + t.Fatal(err) + } + + ap := client.AutoPipeline() + defer ap.Close() + + // Sequential usage - no goroutines needed! + // Commands will be queued and batched automatically + cmds := make([]redis.Cmder, 20) + for i := 0; i < 20; i++ { + key := fmt.Sprintf("key%d", i) + cmds[i] = ap.Do(ctx, "SET", key, i) + } + + // Now access results - this will block until commands execute + for i, cmd := range cmds { + if err := cmd.Err(); err != nil { + t.Fatalf("Command %d failed: %v", i, err) + } + } + + // Verify all keys were set + for i := 0; i < 20; i++ { + key := fmt.Sprintf("key%d", i) + val, err := client.Get(ctx, key).Int() + if err != nil { + t.Fatalf("Failed to get %s: %v", key, err) + } + if val != i { + t.Fatalf("Expected %d, got %d", i, val) + } + } +} + +func TestAutoPipelineSequentialSmallBatches(t *testing.T) { + ctx := context.Background() + + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 1000, // Large batch size + FlushInterval: 20 * time.Millisecond, // Rely on timer + MaxConcurrentBatches: 5, + }, + }) + defer client.Close() + + if err := client.FlushDB(ctx).Err(); err != nil { + t.Fatal(err) + } + + ap := client.AutoPipeline() + defer ap.Close() + + // Queue commands sequentially with small delays + // They should be flushed by timer, not batch size + for i := 0; i < 5; i++ { + key := fmt.Sprintf("key%d", i) + cmd := ap.Do(ctx, "SET", key, i) + + // Access result immediately - should block until flush + if err := cmd.Err(); err != nil { + t.Fatalf("Command %d failed: %v", i, err) + } + + time.Sleep(5 * time.Millisecond) + } + + // Verify all keys were set + for i := 0; i < 5; i++ { + key := fmt.Sprintf("key%d", i) + val, err := client.Get(ctx, key).Int() + if err != nil { + t.Fatalf("Failed to get %s: %v", key, err) + } + if val != i { + t.Fatalf("Expected %d, got %d", i, val) + } + } +} + +func TestAutoPipelineSequentialMixed(t *testing.T) { + ctx := context.Background() + + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 5, + FlushInterval: 50 * time.Millisecond, + MaxConcurrentBatches: 5, + }, + }) + defer client.Close() + + if err := client.FlushDB(ctx).Err(); err != nil { + t.Fatal(err) + } + + ap := client.AutoPipeline() + defer ap.Close() + + // Queue some commands + cmd1 := ap.Do(ctx, "SET", "key1", "value1") + cmd2 := ap.Do(ctx, "SET", "key2", "value2") + cmd3 := ap.Do(ctx, "SET", "key3", "value3") + + // Access first result - should trigger batch flush + if err := cmd1.Err(); err != nil { + t.Fatalf("cmd1 failed: %v", err) + } + + // Other commands in same batch should also be done + if err := cmd2.Err(); err != nil { + t.Fatalf("cmd2 failed: %v", err) + } + if err := cmd3.Err(); err != nil { + t.Fatalf("cmd3 failed: %v", err) + } + + // Verify + val, err := client.Get(ctx, "key1").Result() + if err != nil || val != "value1" { + t.Fatalf("key1: expected value1, got %v, err %v", val, err) + } +} + diff --git a/autopipeline_test.go b/autopipeline_test.go new file mode 100644 index 00000000..59514ac0 --- /dev/null +++ b/autopipeline_test.go @@ -0,0 +1,448 @@ +package redis_test + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/redis/go-redis/v9" + + . "github.com/bsm/ginkgo/v2" + . "github.com/bsm/gomega" +) + +var _ = Describe("AutoPipeline", func() { + ctx := context.Background() + var client *redis.Client + var ap *redis.AutoPipeliner + + BeforeEach(func() { + client = redis.NewClient(&redis.Options{ + Addr: redisAddr, + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 10, + FlushInterval: 50 * time.Millisecond, + MaxConcurrentBatches: 5, + }, + }) + Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred()) + + ap = client.AutoPipeline() + }) + + AfterEach(func() { + if ap != nil { + Expect(ap.Close()).NotTo(HaveOccurred()) + } + Expect(client.Close()).NotTo(HaveOccurred()) + }) + + It("should batch commands automatically", func() { + // Queue multiple commands concurrently to enable batching + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + key := fmt.Sprintf("key%d", idx) + ap.Do(ctx, "SET", key, idx) + }(i) + } + + // Wait for all commands to complete + wg.Wait() + + // Verify all commands were executed + for i := 0; i < 5; i++ { + key := fmt.Sprintf("key%d", i) + val, err := client.Get(ctx, key).Int() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal(i)) + } + }) + + It("should flush when batch size is reached", func() { + // Queue exactly MaxBatchSize commands concurrently + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + key := fmt.Sprintf("key%d", idx) + ap.Do(ctx, "SET", key, idx) + }(i) + } + + // Wait for all commands to complete + wg.Wait() + + // Verify all commands were executed + for i := 0; i < 10; i++ { + key := fmt.Sprintf("key%d", i) + val, err := client.Get(ctx, key).Int() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal(i)) + } + }) + + It("should flush on timer expiry", func() { + // Queue a single command (will block until flushed by timer) + go func() { + ap.Do(ctx, "SET", "key1", "value1") + }() + + // Wait for timer to expire and command to complete + time.Sleep(100 * time.Millisecond) + + // Verify command was executed + val, err := client.Get(ctx, "key1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value1")) + }) + + It("should handle concurrent commands", func() { + const numGoroutines = 10 + const commandsPerGoroutine = 100 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + for g := 0; g < numGoroutines; g++ { + go func(goroutineID int) { + defer wg.Done() + for i := 0; i < commandsPerGoroutine; i++ { + key := fmt.Sprintf("g%d:key%d", goroutineID, i) + ap.Do(ctx, "SET", key, i) + } + }(g) + } + + wg.Wait() + + // All commands are now complete (Do() blocks until executed) + // Verify all commands were executed + for g := 0; g < numGoroutines; g++ { + for i := 0; i < commandsPerGoroutine; i++ { + key := fmt.Sprintf("g%d:key%d", g, i) + val, err := client.Get(ctx, key).Int() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal(i)) + } + } + }) + + It("should support manual flush", func() { + // Queue commands concurrently + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + key := fmt.Sprintf("key%d", idx) + ap.Do(ctx, "SET", key, idx) + }(i) + } + + // Wait for all commands to complete + wg.Wait() + + // Verify all commands were executed + for i := 0; i < 5; i++ { + key := fmt.Sprintf("key%d", i) + val, err := client.Get(ctx, key).Int() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal(i)) + } + }) + + It("should handle errors in commands", func() { + // Queue a mix of valid and invalid commands concurrently + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + ap.Do(ctx, "SET", "key1", "value1") + }() + + wg.Add(1) + go func() { + defer wg.Done() + ap.Do(ctx, "INVALID_COMMAND") + }() + + wg.Add(1) + go func() { + defer wg.Done() + ap.Do(ctx, "SET", "key2", "value2") + }() + + // Wait for all commands to complete + wg.Wait() + + // Valid commands should still execute + val, err := client.Get(ctx, "key1").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value1")) + + val, err = client.Get(ctx, "key2").Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value2")) + }) + + It("should close gracefully", func() { + // Queue commands concurrently + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + key := fmt.Sprintf("key%d", idx) + ap.Do(ctx, "SET", key, idx) + }(i) + } + + // Wait for all commands to complete + wg.Wait() + + // Close should flush pending commands + Expect(ap.Close()).NotTo(HaveOccurred()) + ap = nil // Prevent double close in AfterEach + + // Verify all commands were executed + for i := 0; i < 5; i++ { + key := fmt.Sprintf("key%d", i) + val, err := client.Get(ctx, key).Int() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal(i)) + } + }) + + It("should return error after close", func() { + Expect(ap.Close()).NotTo(HaveOccurred()) + + // Commands after close should fail + cmd := ap.Do(ctx, "SET", "key", "value") + Expect(cmd.Err()).To(Equal(redis.ErrClosed)) + }) + + It("should respect MaxConcurrentBatches", func() { + // Create autopipeliner with low concurrency limit + client2 := redis.NewClient(&redis.Options{ + Addr: redisAddr, + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 5, + FlushInterval: 10 * time.Millisecond, + MaxConcurrentBatches: 2, + }, + }) + defer client2.Close() + + ap2 := client2.AutoPipeline() + defer ap2.Close() + + // Queue many commands to trigger multiple batches concurrently + var wg sync.WaitGroup + for i := 0; i < 50; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + key := fmt.Sprintf("key%d", idx) + ap2.Do(ctx, "SET", key, idx) + }(i) + } + + // Wait for all commands to complete + wg.Wait() + + // Verify all commands were executed + for i := 0; i < 50; i++ { + key := fmt.Sprintf("key%d", i) + val, err := client2.Get(ctx, key).Int() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal(i)) + } + }) + + It("should report queue length", func() { + // Initially empty + Expect(ap.Len()).To(Equal(0)) + + // Queue some commands concurrently + var wg sync.WaitGroup + for i := 0; i < 5; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + ap.Do(ctx, "SET", fmt.Sprintf("key%d", idx), idx) + }(i) + } + + // Wait for all commands to complete + wg.Wait() + + // Should be empty after flush + Expect(ap.Len()).To(Equal(0)) + }) +}) + +func TestAutoPipelineBasic(t *testing.T) { + ctx := context.Background() + + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 10, + FlushInterval: 50 * time.Millisecond, + MaxConcurrentBatches: 5, + }, + }) + defer client.Close() + + if err := client.FlushDB(ctx).Err(); err != nil { + t.Fatal(err) + } + + ap := client.AutoPipeline() + defer ap.Close() + + // Queue commands concurrently + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + key := fmt.Sprintf("key%d", idx) + cmd := ap.Do(ctx, "SET", key, idx) + // Wait for command to complete + _ = cmd.Err() + }(i) + } + + // Wait for all commands to complete + wg.Wait() + + // Verify + for i := 0; i < 100; i++ { + key := fmt.Sprintf("key%d", i) + val, err := client.Get(ctx, key).Int() + if err != nil { + t.Fatalf("Failed to get key%d: %v", i, err) + } + if val != i { + t.Fatalf("Expected %d, got %d", i, val) + } + } +} + +func TestAutoPipelineFlushInterval(t *testing.T) { + ctx := context.Background() + + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 1000, // Large batch size so only timer triggers flush + FlushInterval: 50 * time.Millisecond, + MaxConcurrentBatches: 5, + }, + }) + defer client.Close() + + if err := client.FlushDB(ctx).Err(); err != nil { + t.Fatal(err) + } + + ap := client.AutoPipeline() + defer ap.Close() + + // Send commands slowly, one at a time + // They should be flushed by the timer, not by batch size + start := time.Now() + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + key := fmt.Sprintf("key%d", idx) + cmd := ap.Do(ctx, "SET", key, idx) + // Wait for command to complete + _ = cmd.Err() + }(i) + + // Wait a bit between commands to ensure they don't batch by size + time.Sleep(10 * time.Millisecond) + } + + wg.Wait() + elapsed := time.Since(start) + + // Verify all keys were set + for i := 0; i < 10; i++ { + key := fmt.Sprintf("key%d", i) + val, err := client.Get(ctx, key).Int() + if err != nil { + t.Fatalf("Failed to get %s: %v", key, err) + } + if val != i { + t.Fatalf("Expected %d, got %d", i, val) + } + } + + // Should complete in roughly 10 * 10ms = 100ms + some flush intervals + // If timer doesn't work, commands would wait until Close() is called + if elapsed > 500*time.Millisecond { + t.Fatalf("Commands took too long (%v), flush interval may not be working", elapsed) + } + + t.Logf("Completed in %v (flush interval working correctly)", elapsed) +} + +func TestAutoPipelineConcurrency(t *testing.T) { + ctx := context.Background() + + client := redis.NewClient(&redis.Options{ + Addr: ":6379", + AutoPipelineConfig: redis.DefaultAutoPipelineConfig(), + }) + defer client.Close() + + if err := client.FlushDB(ctx).Err(); err != nil { + t.Fatal(err) + } + + ap := client.AutoPipeline() + defer ap.Close() + + const numGoroutines = 100 + const commandsPerGoroutine = 100 + + var wg sync.WaitGroup + var successCount atomic.Int64 + + wg.Add(numGoroutines) + for g := 0; g < numGoroutines; g++ { + go func(goroutineID int) { + defer wg.Done() + for i := 0; i < commandsPerGoroutine; i++ { + key := fmt.Sprintf("g%d:key%d", goroutineID, i) + ap.Do(ctx, "SET", key, i) + successCount.Add(1) + } + }(g) + } + + wg.Wait() + + // Wait for all flushes + time.Sleep(500 * time.Millisecond) + + expected := int64(numGoroutines * commandsPerGoroutine) + if successCount.Load() != expected { + t.Fatalf("Expected %d commands, got %d", expected, successCount.Load()) + } +} + diff --git a/commands.go b/commands.go index 04235a2e..2c264ef8 100644 --- a/commands.go +++ b/commands.go @@ -166,6 +166,7 @@ func isEmptyValue(v reflect.Value) bool { } type Cmdable interface { + AutoPipeline() *AutoPipeliner Pipeline() Pipeliner Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) diff --git a/example/autopipeline/go.mod b/example/autopipeline/go.mod new file mode 100644 index 00000000..24156c5d --- /dev/null +++ b/example/autopipeline/go.mod @@ -0,0 +1,12 @@ +module example/autopipeline + +go 1.25.3 + +replace github.com/redis/go-redis/v9 => ../.. + +require github.com/redis/go-redis/v9 v9.0.0-00010101000000-000000000000 + +require ( + github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect +) diff --git a/example/autopipeline/go.sum b/example/autopipeline/go.sum new file mode 100644 index 00000000..4db68f6d --- /dev/null +++ b/example/autopipeline/go.sum @@ -0,0 +1,8 @@ +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= +github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= +github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= +github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= diff --git a/example/autopipeline/main.go b/example/autopipeline/main.go new file mode 100644 index 00000000..6e83e7fb --- /dev/null +++ b/example/autopipeline/main.go @@ -0,0 +1,243 @@ +package main + +import ( + "context" + "fmt" + "sync" + "time" + + "github.com/redis/go-redis/v9" +) + +func main() { + fmt.Println("=== Redis AutoPipeline Examples ===\n") + + // Example 1: Basic autopipelining + example1BasicAutoPipeline() + + // Example 2: Concurrent autopipelining + example2ConcurrentAutoPipeline() + + // Example 3: Custom configuration + example3CustomConfig() + + // Example 4: Performance comparison + example4PerformanceComparison() +} + +// Example 1: Basic autopipelining usage +func example1BasicAutoPipeline() { + fmt.Println("Example 1: Basic AutoPipeline Usage") + fmt.Println("------------------------------------") + + ctx := context.Background() + + // Create client with autopipelining enabled + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: redis.DefaultAutoPipelineConfig(), + }) + defer client.Close() + + // Create autopipeliner + ap := client.AutoPipeline() + defer ap.Close() + + // Queue commands concurrently - they will be automatically batched and executed + // Note: Do() blocks until the command is executed, so use goroutines for batching + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + key := fmt.Sprintf("user:%d", idx) + ap.Do(ctx, "SET", key, fmt.Sprintf("User %d", idx)) + }(i) + } + + // Wait for all commands to complete + wg.Wait() + + // Verify some values + for i := 0; i < 5; i++ { + key := fmt.Sprintf("user:%d", i) + val, err := client.Get(ctx, key).Result() + if err != nil { + fmt.Printf("Error getting %s: %v\n", key, err) + continue + } + fmt.Printf(" %s = %s\n", key, val) + } + + fmt.Println("✓ Successfully set 100 keys using autopipelining\n") +} + +// Example 2: Concurrent autopipelining from multiple goroutines +func example2ConcurrentAutoPipeline() { + fmt.Println("Example 2: Concurrent AutoPipeline") + fmt.Println("-----------------------------------") + + ctx := context.Background() + + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 100, + FlushInterval: 10 * time.Millisecond, + MaxConcurrentBatches: 10, + }, + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + // Launch multiple goroutines that all use the same autopipeliner + const numGoroutines = 10 + const commandsPerGoroutine = 100 + + var wg sync.WaitGroup + wg.Add(numGoroutines) + + start := time.Now() + + for g := 0; g < numGoroutines; g++ { + go func(goroutineID int) { + defer wg.Done() + for i := 0; i < commandsPerGoroutine; i++ { + key := fmt.Sprintf("concurrent:g%d:item%d", goroutineID, i) + ap.Do(ctx, "SET", key, i) + } + }(g) + } + + wg.Wait() + + // Wait for final flush + time.Sleep(100 * time.Millisecond) + + elapsed := time.Since(start) + + totalCommands := numGoroutines * commandsPerGoroutine + fmt.Printf("✓ Executed %d commands from %d goroutines in %v\n", totalCommands, numGoroutines, elapsed) + fmt.Printf(" Throughput: %.0f ops/sec\n\n", float64(totalCommands)/elapsed.Seconds()) +} + +// Example 3: Custom autopipelining configuration +func example3CustomConfig() { + fmt.Println("Example 3: Custom Configuration") + fmt.Println("--------------------------------") + + ctx := context.Background() + + // Create client with custom autopipelining settings + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: &redis.AutoPipelineConfig{ + // Flush when we have 50 commands + MaxBatchSize: 50, + // Or flush every 5ms, whichever comes first + FlushInterval: 5 * time.Millisecond, + // Allow up to 5 concurrent pipeline executions + MaxConcurrentBatches: 5, + }, + }) + defer client.Close() + + ap := client.AutoPipeline() + defer ap.Close() + + // Queue commands concurrently + var wg sync.WaitGroup + for i := 0; i < 200; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + key := fmt.Sprintf("config:item%d", idx) + ap.Do(ctx, "SET", key, idx) + }(i) + } + + // Wait for all commands to complete + wg.Wait() + + fmt.Printf("✓ Configured with MaxBatchSize=50, FlushInterval=5ms\n") + fmt.Printf(" Queue length: %d (should be 0 after flush)\n\n", ap.Len()) +} + +// Example 4: Performance comparison +func example4PerformanceComparison() { + fmt.Println("Example 4: Performance Comparison") + fmt.Println("----------------------------------") + + ctx := context.Background() + const numCommands = 1000 + + // Test 1: Individual commands + client1 := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer client1.Close() + + start := time.Now() + for i := 0; i < numCommands; i++ { + key := fmt.Sprintf("perf:individual:%d", i) + if err := client1.Set(ctx, key, i, 0).Err(); err != nil { + fmt.Printf("Error: %v\n", err) + return + } + } + individualTime := time.Since(start) + + // Test 2: Manual pipeline + client2 := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + }) + defer client2.Close() + + start = time.Now() + pipe := client2.Pipeline() + for i := 0; i < numCommands; i++ { + key := fmt.Sprintf("perf:manual:%d", i) + pipe.Set(ctx, key, i, 0) + } + if _, err := pipe.Exec(ctx); err != nil { + fmt.Printf("Error: %v\n", err) + return + } + manualTime := time.Since(start) + + // Test 3: AutoPipeline + client3 := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", + AutoPipelineConfig: redis.DefaultAutoPipelineConfig(), + }) + defer client3.Close() + + ap := client3.AutoPipeline() + defer ap.Close() + + start = time.Now() + var wg3 sync.WaitGroup + for i := 0; i < numCommands; i++ { + wg3.Add(1) + go func(idx int) { + defer wg3.Done() + key := fmt.Sprintf("perf:auto:%d", idx) + ap.Do(ctx, "SET", key, idx) + }(i) + } + wg3.Wait() + autoTime := time.Since(start) + + // Results + fmt.Printf("Executing %d SET commands:\n", numCommands) + fmt.Printf(" Individual commands: %v (%.0f ops/sec)\n", individualTime, float64(numCommands)/individualTime.Seconds()) + fmt.Printf(" Manual pipeline: %v (%.0f ops/sec) - %.1fx faster\n", manualTime, float64(numCommands)/manualTime.Seconds(), float64(individualTime)/float64(manualTime)) + fmt.Printf(" AutoPipeline: %v (%.0f ops/sec) - %.1fx faster\n", autoTime, float64(numCommands)/autoTime.Seconds(), float64(individualTime)/float64(autoTime)) + fmt.Println() + + // Cleanup + client1.Del(ctx, "perf:*") +} + diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 2dedca05..88e9e3d9 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -684,13 +684,6 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) { // Using inline Release() method for better performance (avoids pointer dereference) transitionedToIdle := cn.Release() - if !transitionedToIdle { - // Fast path failed - hook might have changed state (e.g., to UNUSABLE for handoff) - // Keep the state set by the hook and pool the connection anyway - currentState := cn.GetStateMachine().GetState() - internal.Logger.Printf(ctx, "Connection state changed by hook to %v, pooling as-is", currentState) - } - // unusable conns are expected to become usable at some point (background process is reconnecting them) // put them at the opposite end of the queue // Optimization: if we just transitioned to IDLE, we know it's usable - skip the check diff --git a/options.go b/options.go index 79e4b6df..7f4d1b1b 100644 --- a/options.go +++ b/options.go @@ -264,6 +264,12 @@ type Options struct { // transitions seamlessly. Requires Protocol: 3 (RESP3) for push notifications. // If nil, maintnotifications are in "auto" mode and will be enabled if the server supports it. MaintNotificationsConfig *maintnotifications.Config + + // AutoPipelineConfig enables automatic pipelining of commands. + // When set, commands will be automatically batched and sent in pipelines + // to reduce network round-trips and improve throughput. + // If nil, autopipelining is disabled. + AutoPipelineConfig *AutoPipelineConfig } func (opt *Options) init() { diff --git a/osscluster.go b/osscluster.go index 7925d2c6..3c13e274 100644 --- a/osscluster.go +++ b/osscluster.go @@ -33,6 +33,8 @@ var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes") // ClusterOptions are used to configure a cluster client and should be // passed to NewClusterClient. type ClusterOptions struct { + AutoPipelineConfig *AutoPipelineConfig + // A seed list of host:port addresses of cluster nodes. Addrs []string @@ -1366,6 +1368,37 @@ func (c *ClusterClient) Pipeline() Pipeliner { return &pipe } +// AutoPipeline creates a new autopipeliner that automatically batches commands. +// Commands are automatically flushed based on batch size and time interval. +// The autopipeliner must be closed when done to flush pending commands. +// +// For ClusterClient, commands are automatically routed to the correct nodes +// based on key slots, just like regular pipelined commands. +// +// Example: +// +// ap := client.AutoPipeline() +// defer ap.Close() +// +// var wg sync.WaitGroup +// for i := 0; i < 1000; i++ { +// wg.Add(1) +// go func(idx int) { +// defer wg.Done() +// ap.Do(ctx, "SET", fmt.Sprintf("key%d", idx), idx) +// }(i) +// } +// wg.Wait() +// +// Note: AutoPipeline requires AutoPipelineConfig to be set in ClusterOptions. +// If not set, default configuration will be used. +func (c *ClusterClient) AutoPipeline() *AutoPipeliner { + if c.opt.AutoPipelineConfig == nil { + c.opt.AutoPipelineConfig = DefaultAutoPipelineConfig() + } + return NewAutoPipeliner(c, c.opt.AutoPipelineConfig) +} + func (c *ClusterClient) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { return c.Pipeline().Pipelined(ctx, fn) } diff --git a/osscluster_autopipeline_test.go b/osscluster_autopipeline_test.go new file mode 100644 index 00000000..a90267b4 --- /dev/null +++ b/osscluster_autopipeline_test.go @@ -0,0 +1,172 @@ +package redis_test + +import ( + "context" + "fmt" + "sync" + "testing" + "time" + + "github.com/redis/go-redis/v9" +) + +func TestClusterAutoPipelineBasic(t *testing.T) { + ctx := context.Background() + + client := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: []string{":7000", ":7001", ":7002"}, + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 10, + FlushInterval: 50 * time.Millisecond, + MaxConcurrentBatches: 5, + }, + }) + defer client.Close() + + if err := client.FlushDB(ctx).Err(); err != nil { + t.Fatal(err) + } + + ap := client.AutoPipeline() + defer ap.Close() + + // Queue commands concurrently across different keys (different slots) + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func(idx int) { + defer wg.Done() + key := fmt.Sprintf("key%d", idx) + cmd := ap.Do(ctx, "SET", key, idx) + // Wait for command to complete + _ = cmd.Err() + }(i) + } + + // Wait for all commands to complete + wg.Wait() + + // Verify all keys were set correctly + for i := 0; i < 100; i++ { + key := fmt.Sprintf("key%d", i) + val, err := client.Get(ctx, key).Int() + if err != nil { + t.Fatalf("Failed to get %s: %v", key, err) + } + if val != i { + t.Fatalf("Expected %d, got %d", i, val) + } + } +} + +func TestClusterAutoPipelineConcurrency(t *testing.T) { + ctx := context.Background() + + client := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: []string{":7000", ":7001", ":7002"}, + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 50, + FlushInterval: 10 * time.Millisecond, + MaxConcurrentBatches: 10, + }, + }) + defer client.Close() + + if err := client.FlushDB(ctx).Err(); err != nil { + t.Fatal(err) + } + + ap := client.AutoPipeline() + defer ap.Close() + + const numGoroutines = 10 + const commandsPerGoroutine = 100 + + var wg sync.WaitGroup + for g := 0; g < numGoroutines; g++ { + wg.Add(1) + go func(goroutineID int) { + defer wg.Done() + for i := 0; i < commandsPerGoroutine; i++ { + key := fmt.Sprintf("g%d:key%d", goroutineID, i) + cmd := ap.Do(ctx, "SET", key, i) + // Wait for command to complete + _ = cmd.Err() + } + }(g) + } + + // Wait for all commands to complete + wg.Wait() + + // Verify all commands were executed + for g := 0; g < numGoroutines; g++ { + for i := 0; i < commandsPerGoroutine; i++ { + key := fmt.Sprintf("g%d:key%d", g, i) + val, err := client.Get(ctx, key).Int() + if err != nil { + t.Fatalf("Failed to get %s: %v", key, err) + } + if val != i { + t.Fatalf("Expected %d, got %d for key %s", i, val, key) + } + } + } +} + +func TestClusterAutoPipelineCrossSlot(t *testing.T) { + ctx := context.Background() + + client := redis.NewClusterClient(&redis.ClusterOptions{ + Addrs: []string{":7000", ":7001", ":7002"}, + AutoPipelineConfig: &redis.AutoPipelineConfig{ + MaxBatchSize: 20, + FlushInterval: 10 * time.Millisecond, + MaxConcurrentBatches: 5, + }, + }) + defer client.Close() + + if err := client.FlushDB(ctx).Err(); err != nil { + t.Fatal(err) + } + + ap := client.AutoPipeline() + defer ap.Close() + + // Use keys that will hash to different slots + keys := []string{ + "user:1000", + "user:2000", + "user:3000", + "session:abc", + "session:def", + "cache:foo", + "cache:bar", + } + + var wg sync.WaitGroup + for i, key := range keys { + wg.Add(1) + go func(k string, val int) { + defer wg.Done() + cmd := ap.Do(ctx, "SET", k, val) + // Wait for command to complete + _ = cmd.Err() + }(key, i) + } + + wg.Wait() + + // Verify all keys were set + for i, key := range keys { + val, err := client.Get(ctx, key).Int() + if err != nil { + t.Fatalf("Failed to get %s: %v", key, err) + } + if val != i { + t.Fatalf("Expected %d, got %d for key %s", i, val, key) + } + } +} + diff --git a/pipeline.go b/pipeline.go index 567bf121..c01b64b7 100644 --- a/pipeline.go +++ b/pipeline.go @@ -123,6 +123,10 @@ func (c *Pipeline) Pipeline() Pipeliner { return c } +func (c *Pipeline) AutoPipeline() *AutoPipeliner { + return nil +} + func (c *Pipeline) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { return c.Pipelined(ctx, fn) } diff --git a/redis.go b/redis.go index a355531c..79bebe8c 100644 --- a/redis.go +++ b/redis.go @@ -1175,6 +1175,28 @@ func (c *Client) TxPipeline() Pipeliner { return &pipe } +// AutoPipeline creates a new autopipeliner that automatically batches commands. +// Commands are automatically flushed based on batch size and time interval. +// The autopipeliner must be closed when done to flush pending commands. +// +// Example: +// +// ap := client.AutoPipeline() +// defer ap.Close() +// +// for i := 0; i < 1000; i++ { +// ap.Do(ctx, "SET", fmt.Sprintf("key%d", i), i) +// } +// +// Note: AutoPipeline requires AutoPipelineConfig to be set in Options. +// If not set, this will panic. +func (c *Client) AutoPipeline() *AutoPipeliner { + if c.opt.AutoPipelineConfig == nil { + c.opt.AutoPipelineConfig = DefaultAutoPipelineConfig() + } + return NewAutoPipeliner(c, c.opt.AutoPipelineConfig) +} + func (c *Client) pubSub() *PubSub { pubsub := &PubSub{ opt: c.opt, diff --git a/ring.go b/ring.go index 3381460a..0ca95191 100644 --- a/ring.go +++ b/ring.go @@ -793,6 +793,10 @@ func (c *Ring) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder return c.Pipeline().Pipelined(ctx, fn) } +func (c *Ring) AutoPipeline() *AutoPipeliner { + return nil +} + func (c *Ring) Pipeline() Pipeliner { pipe := Pipeline{ exec: pipelineExecer(c.processPipelineHook), diff --git a/tx.go b/tx.go index 40bc1d66..093fa2a5 100644 --- a/tx.go +++ b/tx.go @@ -139,6 +139,10 @@ func (c *Tx) TxPipeline() Pipeliner { return &pipe } +func (c *Tx) AutoPipeline() *AutoPipeliner { + return nil +} + func wrapMultiExec(ctx context.Context, cmds []Cmder) []Cmder { if len(cmds) == 0 { panic("not reached") diff --git a/universal.go b/universal.go index 1dc9764d..6f3ff83c 100644 --- a/universal.go +++ b/universal.go @@ -126,6 +126,8 @@ type UniversalOptions struct { // MaintNotificationsConfig provides configuration for maintnotifications upgrades. MaintNotificationsConfig *maintnotifications.Config + + AutoPipelineConfig *AutoPipelineConfig } // Cluster returns cluster options created from the universal options.