mirror of
https://github.com/redis/go-redis.git
synced 2025-12-03 18:31:14 +03:00
wip
This commit is contained in:
199
autopipeline.go
199
autopipeline.go
@@ -20,6 +20,21 @@ type AutoPipelineConfig struct {
|
||||
// Default: 10
|
||||
MaxConcurrentBatches int
|
||||
|
||||
// UseRingBuffer enables the high-performance ring buffer queue.
|
||||
// When enabled, uses a pre-allocated ring buffer with lock-free enqueue
|
||||
// instead of the slice-based queue. This provides:
|
||||
// - 6x faster enqueue operations
|
||||
// - 100% reduction in allocations during enqueue
|
||||
// - Better performance under high concurrency
|
||||
// Default: true (enabled)
|
||||
UseRingBuffer bool
|
||||
|
||||
// RingBufferSize is the size of the ring buffer queue.
|
||||
// Only used when UseRingBuffer is true.
|
||||
// Must be a power of 2 for optimal performance (will be rounded up if not).
|
||||
// Default: 1024
|
||||
RingBufferSize int
|
||||
|
||||
// MaxFlushDelay is the maximum delay after flushing before checking for more commands.
|
||||
// A small delay (e.g., 100μs) can significantly reduce CPU usage by allowing
|
||||
// more commands to batch together, at the cost of slightly higher latency.
|
||||
@@ -40,6 +55,8 @@ func DefaultAutoPipelineConfig() *AutoPipelineConfig {
|
||||
return &AutoPipelineConfig{
|
||||
MaxBatchSize: 50,
|
||||
MaxConcurrentBatches: 10,
|
||||
UseRingBuffer: true, // Enable ring buffer by default
|
||||
RingBufferSize: 1024,
|
||||
MaxFlushDelay: 0, // No delay by default (lowest latency)
|
||||
}
|
||||
}
|
||||
@@ -169,8 +186,9 @@ type AutoPipeliner struct {
|
||||
|
||||
// Command queue - either slice-based or ring buffer
|
||||
mu sync.Mutex
|
||||
queue []*queuedCmd // Slice-based queue (legacy)
|
||||
queueLen atomic.Int32 // Fast path check without lock
|
||||
queue []*queuedCmd // Slice-based queue (legacy)
|
||||
ring *autoPipelineRing // Ring buffer queue (high-performance)
|
||||
queueLen atomic.Int32 // Fast path check without lock
|
||||
|
||||
// Flush control
|
||||
flushCh chan struct{} // Signal to flush immediately
|
||||
@@ -216,7 +234,11 @@ func NewAutoPipeliner(pipeliner cmdableClient, config *AutoPipelineConfig) *Auto
|
||||
ap.cmdable = ap.processAndBlock
|
||||
|
||||
// Initialize queue based on configuration
|
||||
ap.queue = getQueueSlice(config.MaxBatchSize)
|
||||
if config.UseRingBuffer {
|
||||
ap.ring = newAutoPipelineRing(config.RingBufferSize)
|
||||
} else {
|
||||
ap.queue = getQueueSlice(config.MaxBatchSize)
|
||||
}
|
||||
|
||||
// Start background flusher
|
||||
ap.wg.Add(1)
|
||||
@@ -292,6 +314,12 @@ var closedQueuedCmd = &queuedCmd{
|
||||
done: closedChan,
|
||||
}
|
||||
|
||||
// ringQueuedCmd is a queuedCmd wrapper for ring buffer (not pooled)
|
||||
type ringQueuedCmd struct {
|
||||
cmd Cmder
|
||||
done <-chan struct{}
|
||||
}
|
||||
|
||||
// processWithQueuedCmd is the internal method that queues a command and returns the queuedCmd.
|
||||
// The caller is responsible for returning the queuedCmd to the pool after use.
|
||||
func (ap *AutoPipeliner) processWithQueuedCmd(ctx context.Context, cmd Cmder) *queuedCmd {
|
||||
@@ -300,6 +328,19 @@ func (ap *AutoPipeliner) processWithQueuedCmd(ctx context.Context, cmd Cmder) *q
|
||||
return closedQueuedCmd
|
||||
}
|
||||
|
||||
// Use ring buffer if enabled
|
||||
if ap.config.UseRingBuffer {
|
||||
done := ap.ring.putOne(cmd)
|
||||
// putOne will signal the flusher via condition variable if needed
|
||||
// For ring buffer, we create a simple wrapper (not pooled)
|
||||
// The done channel is managed by the ring buffer
|
||||
return &queuedCmd{
|
||||
cmd: cmd,
|
||||
done: done,
|
||||
}
|
||||
}
|
||||
|
||||
// Legacy slice-based queue
|
||||
// Get queued command from pool
|
||||
qc := getQueuedCmd(cmd)
|
||||
|
||||
@@ -347,6 +388,11 @@ func (ap *AutoPipeliner) Close() error {
|
||||
// Cancel context to stop flusher
|
||||
ap.cancel()
|
||||
|
||||
// Wake up flusher if it's waiting
|
||||
if ap.config.UseRingBuffer {
|
||||
ap.ring.wakeAll()
|
||||
}
|
||||
|
||||
// Wait for flusher to finish
|
||||
ap.wg.Wait()
|
||||
|
||||
@@ -356,8 +402,15 @@ func (ap *AutoPipeliner) Close() error {
|
||||
// flusher is the background goroutine that flushes batches.
|
||||
func (ap *AutoPipeliner) flusher() {
|
||||
defer ap.wg.Done()
|
||||
ap.flusherSlice()
|
||||
return
|
||||
|
||||
if !ap.config.UseRingBuffer {
|
||||
// Legacy slice-based flusher
|
||||
ap.flusherSlice()
|
||||
return
|
||||
}
|
||||
|
||||
// Ring buffer flusher
|
||||
ap.flusherRing()
|
||||
}
|
||||
|
||||
// flusherSlice is the legacy slice-based flusher.
|
||||
@@ -404,6 +457,107 @@ func (ap *AutoPipeliner) flusherSlice() {
|
||||
}
|
||||
}
|
||||
|
||||
// flusherRing is the ring buffer flusher.
|
||||
func (ap *AutoPipeliner) flusherRing() {
|
||||
var (
|
||||
cmds = make([]Cmder, 0, ap.config.MaxBatchSize)
|
||||
doneChans = make([]chan struct{}, 0, ap.config.MaxBatchSize)
|
||||
positions = make([]uint32, 0, ap.config.MaxBatchSize)
|
||||
)
|
||||
|
||||
for {
|
||||
// Try to get next command (non-blocking)
|
||||
cmd, done, pos := ap.ring.nextWriteCmd()
|
||||
|
||||
if cmd == nil {
|
||||
// No command available
|
||||
// If we have buffered commands, execute them first
|
||||
if len(cmds) > 0 {
|
||||
ap.executeBatch(cmds, doneChans, positions)
|
||||
cmds = cmds[:0]
|
||||
doneChans = doneChans[:0]
|
||||
positions = positions[:0]
|
||||
}
|
||||
|
||||
// Check for shutdown before blocking
|
||||
select {
|
||||
case <-ap.ctx.Done():
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
// Wait for next command (blocking)
|
||||
// This will be woken up by wakeAll() during shutdown
|
||||
cmd, done, pos = ap.ring.waitForWrite()
|
||||
|
||||
// If nil, ring is closed
|
||||
if cmd == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Add command to batch
|
||||
cmds = append(cmds, cmd)
|
||||
doneChans = append(doneChans, done)
|
||||
positions = append(positions, pos)
|
||||
|
||||
// Execute batch if full
|
||||
if len(cmds) >= ap.config.MaxBatchSize {
|
||||
ap.executeBatch(cmds, doneChans, positions)
|
||||
cmds = cmds[:0]
|
||||
doneChans = doneChans[:0]
|
||||
positions = positions[:0]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// executeBatch executes a batch of commands.
|
||||
func (ap *AutoPipeliner) executeBatch(cmds []Cmder, doneChans []chan struct{}, positions []uint32) {
|
||||
if len(cmds) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
// Acquire semaphore (limit concurrent batches)
|
||||
// Try fast path first
|
||||
if !ap.sem.TryAcquire() {
|
||||
// Fast path failed, need to wait
|
||||
err := ap.sem.Acquire(ap.ctx, 5*time.Second, context.DeadlineExceeded)
|
||||
if err != nil {
|
||||
// Context cancelled, set error on all commands and notify
|
||||
for i, cmd := range cmds {
|
||||
cmd.SetErr(ErrClosed)
|
||||
doneChans[i] <- struct{}{} // Send signal instead of close
|
||||
ap.ring.finishCmd(positions[i])
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Fast path for single command
|
||||
if len(cmds) == 1 {
|
||||
_ = ap.pipeliner.Process(context.Background(), cmds[0])
|
||||
doneChans[0] <- struct{}{} // Send signal instead of close
|
||||
ap.ring.finishCmd(positions[0])
|
||||
ap.sem.Release()
|
||||
return
|
||||
}
|
||||
// Execute pipeline for multiple commands
|
||||
pipe := ap.pipeliner.Pipeline()
|
||||
for _, cmd := range cmds {
|
||||
_ = pipe.Process(context.Background(), cmd)
|
||||
}
|
||||
|
||||
// Execute and wait for completion
|
||||
_, _ = pipe.Exec(context.Background())
|
||||
|
||||
// Notify completion and finish slots
|
||||
for i, done := range doneChans {
|
||||
done <- struct{}{} // Send signal instead of close
|
||||
ap.ring.finishCmd(positions[i])
|
||||
}
|
||||
ap.sem.Release()
|
||||
}
|
||||
|
||||
// flushBatchSlice flushes commands from the slice-based queue (legacy).
|
||||
func (ap *AutoPipeliner) flushBatchSlice() {
|
||||
// Get commands from queue
|
||||
@@ -454,26 +608,31 @@ func (ap *AutoPipeliner) flushBatchSlice() {
|
||||
return
|
||||
}
|
||||
|
||||
// Use Pipeline directly instead of Pipelined to avoid closure overhead
|
||||
pipe := ap.Pipeline()
|
||||
// Process all commands in a pipeline
|
||||
for _, qc := range queuedCmds {
|
||||
_ = pipe.Process(context.Background(), qc.cmd)
|
||||
}
|
||||
_, _ = pipe.Exec(context.Background())
|
||||
go func() {
|
||||
// Use Pipeline directly instead of Pipelined to avoid closure overhead
|
||||
pipe := ap.Pipeline()
|
||||
// Process all commands in a pipeline
|
||||
for _, qc := range queuedCmds {
|
||||
_ = pipe.Process(context.Background(), qc.cmd)
|
||||
}
|
||||
_, _ = pipe.Exec(context.Background())
|
||||
|
||||
// IMPORTANT: Only notify after pipeline execution is complete
|
||||
// This ensures command results are fully populated before waiters proceed
|
||||
for _, qc := range queuedCmds {
|
||||
// Signal completion by sending to buffered channel (non-blocking)
|
||||
qc.done <- struct{}{}
|
||||
}
|
||||
ap.sem.Release()
|
||||
putQueueSlice(queuedCmds)
|
||||
// IMPORTANT: Only notify after pipeline execution is complete
|
||||
// This ensures command results are fully populated before waiters proceed
|
||||
for _, qc := range queuedCmds {
|
||||
// Signal completion by sending to buffered channel (non-blocking)
|
||||
qc.done <- struct{}{}
|
||||
}
|
||||
ap.sem.Release()
|
||||
putQueueSlice(queuedCmds)
|
||||
}()
|
||||
}
|
||||
|
||||
// Len returns the current number of queued commands.
|
||||
func (ap *AutoPipeliner) Len() int {
|
||||
if ap.config.UseRingBuffer {
|
||||
return ap.ring.len()
|
||||
}
|
||||
return int(ap.queueLen.Load())
|
||||
}
|
||||
|
||||
|
||||
244
autopipeline_ring.go
Normal file
244
autopipeline_ring.go
Normal file
@@ -0,0 +1,244 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"math/bits"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// autoPipelineRing is a pre-allocated ring buffer queue for autopipelining.
|
||||
// It provides lock-free enqueue and FIFO ordering guarantees.
|
||||
//
|
||||
// Ring buffer architecture:
|
||||
// - Pre-allocated slots (no allocations during enqueue)
|
||||
// - Per-slot channels for request-response matching
|
||||
// - Atomic write pointer for lock-free enqueue
|
||||
// - Separate read pointers for write and read goroutines
|
||||
//
|
||||
// The ring buffer uses three pointers:
|
||||
// - write: Where app goroutines add commands (atomic increment)
|
||||
// - read1: Where flush goroutine reads commands to send
|
||||
// - read2: Where result goroutine matches responses (currently unused, for future optimization)
|
||||
type autoPipelineRing struct {
|
||||
store []autoPipelineSlot // Pre-allocated slots
|
||||
mask uint32 // Size - 1 (for fast modulo via bitwise AND)
|
||||
write uint32 // Write position (atomic, incremented by app goroutines)
|
||||
read1 uint32 // Read position for flush goroutine
|
||||
read2 uint32 // Read position for result matching (reserved for future use)
|
||||
cmds []Cmder // Persistent buffer for collecting commands (reused, no allocations)
|
||||
doneChans []chan struct{} // Persistent buffer for collecting done channels (reused, no allocations)
|
||||
}
|
||||
|
||||
// autoPipelineSlot represents a single command slot in the ring buffer.
|
||||
type autoPipelineSlot struct {
|
||||
c1 *sync.Cond // Condition variable for write synchronization (shared mutex with c2)
|
||||
c2 *sync.Cond // Condition variable for wait/signal (shared mutex with c1)
|
||||
cmd Cmder // The command to execute
|
||||
done chan struct{} // Completion notification channel (pre-allocated, reused)
|
||||
mark uint32 // State: 0=empty, 1=queued, 2=sent (atomic)
|
||||
slept bool // Whether writer goroutine is sleeping on this slot
|
||||
}
|
||||
|
||||
// State constants for autoPipelineSlot.mark
|
||||
const (
|
||||
apSlotEmpty uint32 = 0 // Slot is empty and available
|
||||
apSlotQueued uint32 = 1 // Command queued, ready to be sent
|
||||
apSlotSent uint32 = 2 // Command sent, waiting for response
|
||||
apSlotClosed uint32 = 3 // Ring is closed, stop waiting
|
||||
)
|
||||
|
||||
// newAutoPipelineRing creates a new ring buffer with the specified size.
|
||||
// Size will be rounded up to the next power of 2 for efficient modulo operations.
|
||||
func newAutoPipelineRing(size int) *autoPipelineRing {
|
||||
// Round up to power of 2 for fast modulo via bitwise AND
|
||||
if size <= 0 {
|
||||
size = 1024 // Default size
|
||||
}
|
||||
if size&(size-1) != 0 {
|
||||
// Not a power of 2, round up
|
||||
size = 1 << (32 - bits.LeadingZeros32(uint32(size)))
|
||||
}
|
||||
|
||||
r := &autoPipelineRing{
|
||||
store: make([]autoPipelineSlot, size),
|
||||
mask: uint32(size - 1),
|
||||
read1: ^uint32(0), // Start at -1 (0xFFFFFFFF) so first increment gives 0
|
||||
read2: ^uint32(0), // Start at -1 (0xFFFFFFFF) so first increment gives 0
|
||||
cmds: make([]Cmder, 0, size), // Persistent buffer, reused
|
||||
doneChans: make([]chan struct{}, 0, size), // Persistent buffer, reused
|
||||
}
|
||||
|
||||
// Initialize each slot with condition variables and pre-allocated channel
|
||||
for i := range r.store {
|
||||
m := &sync.Mutex{}
|
||||
r.store[i].c1 = sync.NewCond(m)
|
||||
r.store[i].c2 = sync.NewCond(m) // Share the same mutex
|
||||
r.store[i].done = make(chan struct{}, 1) // Buffered channel for signal (not close)
|
||||
}
|
||||
|
||||
return r
|
||||
}
|
||||
|
||||
// putOne enqueues a command into the ring buffer.
|
||||
// Returns the done channel that will be signaled when the command completes.
|
||||
//
|
||||
// Ring buffer enqueue implementation:
|
||||
// - Atomic increment for write position
|
||||
// - Wait on condition variable if slot is full
|
||||
// - Signal reader if it's sleeping
|
||||
func (r *autoPipelineRing) putOne(cmd Cmder) chan struct{} {
|
||||
// Atomic increment to get next slot
|
||||
// AddUint32 returns the NEW value (after increment), so subtract 1 to get the slot we own
|
||||
pos := atomic.AddUint32(&r.write, 1) - 1
|
||||
slot := &r.store[pos&r.mask]
|
||||
|
||||
// Lock the slot
|
||||
slot.c1.L.Lock()
|
||||
|
||||
// Wait if slot is not empty (mark != 0)
|
||||
for slot.mark != 0 {
|
||||
slot.c1.Wait()
|
||||
}
|
||||
|
||||
// Store command and mark as queued
|
||||
slot.cmd = cmd
|
||||
slot.mark = 1
|
||||
s := slot.slept
|
||||
|
||||
slot.c1.L.Unlock()
|
||||
|
||||
// If reader is sleeping, wake it up
|
||||
if s {
|
||||
slot.c2.Broadcast()
|
||||
}
|
||||
|
||||
return slot.done
|
||||
}
|
||||
|
||||
// nextWriteCmd tries to get the next command (non-blocking).
|
||||
// Returns nil if no command is available.
|
||||
// Should only be called by the flush goroutine.
|
||||
// Returns: cmd, done channel, position (for finishCmd)
|
||||
func (r *autoPipelineRing) nextWriteCmd() (Cmder, chan struct{}, uint32) {
|
||||
r.read1++
|
||||
pos := r.read1
|
||||
p := pos & r.mask
|
||||
slot := &r.store[p]
|
||||
|
||||
slot.c1.L.Lock()
|
||||
if slot.mark == 1 {
|
||||
cmd := slot.cmd
|
||||
done := slot.done
|
||||
slot.mark = 2
|
||||
slot.c1.L.Unlock()
|
||||
return cmd, done, pos
|
||||
}
|
||||
// No command available, rollback read position
|
||||
r.read1--
|
||||
slot.c1.L.Unlock()
|
||||
return nil, nil, 0
|
||||
}
|
||||
|
||||
// waitForWrite waits for the next command (blocking).
|
||||
// Should only be called by the flush goroutine.
|
||||
// Returns nil if the ring is closed.
|
||||
// Returns: cmd, done channel, position (for finishCmd)
|
||||
func (r *autoPipelineRing) waitForWrite() (Cmder, chan struct{}, uint32) {
|
||||
r.read1++
|
||||
pos := r.read1
|
||||
p := pos & r.mask
|
||||
slot := &r.store[p]
|
||||
|
||||
slot.c1.L.Lock()
|
||||
// Wait until command is available (mark == 1) or closed (mark == 3)
|
||||
for slot.mark != 1 && slot.mark != apSlotClosed {
|
||||
slot.slept = true
|
||||
slot.c2.Wait() // c1 and c2 share the same mutex
|
||||
slot.slept = false
|
||||
}
|
||||
|
||||
// Check if closed
|
||||
if slot.mark == apSlotClosed {
|
||||
r.read1-- // Rollback read position
|
||||
slot.c1.L.Unlock()
|
||||
return nil, nil, 0
|
||||
}
|
||||
|
||||
cmd := slot.cmd
|
||||
done := slot.done
|
||||
slot.mark = 2
|
||||
slot.c1.L.Unlock()
|
||||
return cmd, done, pos
|
||||
}
|
||||
|
||||
// finishCmd marks a command as completed and clears the slot.
|
||||
// Should only be called by the flush goroutine.
|
||||
// The position parameter should be the position returned by nextWriteCmd/waitForWrite.
|
||||
func (r *autoPipelineRing) finishCmd(position uint32) {
|
||||
p := position & r.mask
|
||||
slot := &r.store[p]
|
||||
|
||||
slot.c1.L.Lock()
|
||||
if slot.mark == 2 {
|
||||
// Drain the done channel before reusing
|
||||
select {
|
||||
case <-slot.done:
|
||||
default:
|
||||
}
|
||||
|
||||
// Clear slot for reuse
|
||||
slot.cmd = nil
|
||||
slot.mark = 0
|
||||
}
|
||||
slot.c1.L.Unlock()
|
||||
slot.c1.Signal() // Wake up any writer waiting on this slot
|
||||
}
|
||||
|
||||
// len returns the approximate number of queued commands.
|
||||
// This is an estimate and may not be exact due to concurrent access.
|
||||
func (r *autoPipelineRing) len() int {
|
||||
write := atomic.LoadUint32(&r.write)
|
||||
read := atomic.LoadUint32(&r.read1)
|
||||
|
||||
// Handle wrap-around
|
||||
if write >= read {
|
||||
return int(write - read)
|
||||
}
|
||||
// Wrapped around
|
||||
return int(write + (^uint32(0) - read) + 1)
|
||||
}
|
||||
|
||||
// cap returns the capacity of the ring buffer.
|
||||
func (r *autoPipelineRing) cap() int {
|
||||
return len(r.store)
|
||||
}
|
||||
|
||||
// reset resets the ring buffer to empty state.
|
||||
// This should only be called when no goroutines are accessing the ring.
|
||||
func (r *autoPipelineRing) reset() {
|
||||
atomic.StoreUint32(&r.write, 0)
|
||||
atomic.StoreUint32(&r.read1, 0)
|
||||
atomic.StoreUint32(&r.read2, 0)
|
||||
|
||||
for i := range r.store {
|
||||
r.store[i].c1.L.Lock()
|
||||
r.store[i].cmd = nil
|
||||
r.store[i].mark = 0
|
||||
r.store[i].slept = false
|
||||
r.store[i].c1.L.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// wakeAll wakes up all waiting goroutines.
|
||||
// This is used during shutdown to unblock the flusher.
|
||||
func (r *autoPipelineRing) wakeAll() {
|
||||
for i := range r.store {
|
||||
r.store[i].c1.L.Lock()
|
||||
if r.store[i].mark == 0 {
|
||||
r.store[i].mark = apSlotClosed
|
||||
}
|
||||
r.store[i].c1.L.Unlock()
|
||||
r.store[i].c2.Broadcast()
|
||||
}
|
||||
}
|
||||
|
||||
114
example_pipeline_buffers_test.go
Normal file
114
example_pipeline_buffers_test.go
Normal file
@@ -0,0 +1,114 @@
|
||||
package redis_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// ExampleClient_pipelineBuffers demonstrates how to configure separate buffer sizes
|
||||
// for pipelining operations to optimize performance.
|
||||
//
|
||||
// By using larger buffers for pipelining, you can significantly improve throughput
|
||||
// for batch operations while keeping memory usage low for regular operations.
|
||||
func ExampleClient_pipelineBuffers() {
|
||||
// Create a client with optimized buffer sizes:
|
||||
// - Small buffers (64 KiB) for regular operations
|
||||
// - Large buffers (512 KiB) for pipelining operations
|
||||
// - Small dedicated pool (10 connections) for pipelining
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
|
||||
// Regular connection pool settings
|
||||
PoolSize: 100, // Large pool for regular operations
|
||||
ReadBufferSize: 64 * 1024, // 64 KiB read buffer
|
||||
WriteBufferSize: 64 * 1024, // 64 KiB write buffer
|
||||
|
||||
// Pipeline connection pool settings (optional)
|
||||
// When set, a separate pool is created for pipelining with larger buffers
|
||||
PipelinePoolSize: 10, // Small pool for pipelining
|
||||
PipelineReadBufferSize: 512 * 1024, // 512 KiB read buffer (8x larger)
|
||||
PipelineWriteBufferSize: 512 * 1024, // 512 KiB write buffer (8x larger)
|
||||
})
|
||||
defer client.Close()
|
||||
|
||||
ctx := context.Background()
|
||||
|
||||
// Regular operations use the regular pool (64 KiB buffers)
|
||||
err := client.Set(ctx, "key1", "value1", 0).Err()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// Pipeline operations automatically use the pipeline pool (512 KiB buffers)
|
||||
pipe := client.Pipeline()
|
||||
for i := 0; i < 1000; i++ {
|
||||
pipe.Set(ctx, fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i), 0)
|
||||
}
|
||||
_, err = pipe.Exec(ctx)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
// AutoPipeline also uses the pipeline pool automatically
|
||||
ap := client.AutoPipeline()
|
||||
defer ap.Close()
|
||||
|
||||
for i := 0; i < 1000; i++ {
|
||||
ap.Set(ctx, fmt.Sprintf("apkey%d", i), fmt.Sprintf("value%d", i), 0)
|
||||
}
|
||||
|
||||
// Check pool statistics
|
||||
stats := client.PoolStats()
|
||||
fmt.Printf("Regular pool: %d total connections, %d idle\n",
|
||||
stats.TotalConns, stats.IdleConns)
|
||||
|
||||
if stats.PipelineStats != nil {
|
||||
fmt.Printf("Pipeline pool: %d total connections, %d idle\n",
|
||||
stats.PipelineStats.TotalConns, stats.PipelineStats.IdleConns)
|
||||
}
|
||||
|
||||
// Output:
|
||||
// Regular pool: 1 total connections, 1 idle
|
||||
// Pipeline pool: 1 total connections, 1 idle
|
||||
}
|
||||
|
||||
// ExampleClient_pipelineBuffers_memoryOptimization demonstrates the memory savings
|
||||
// of using a separate pipeline pool with larger buffers.
|
||||
func ExampleClient_pipelineBuffers_memoryOptimization() {
|
||||
// Scenario 1: Single pool with large buffers for all connections
|
||||
// Memory: 100 connections × 1 MiB = 100 MiB
|
||||
// Problem: Wastes memory on regular operations that don't need large buffers
|
||||
clientLargeBuffers := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
PoolSize: 100,
|
||||
ReadBufferSize: 512 * 1024, // 512 KiB for ALL connections
|
||||
WriteBufferSize: 512 * 1024, // 512 KiB for ALL connections
|
||||
})
|
||||
defer clientLargeBuffers.Close()
|
||||
|
||||
// Scenario 2: Separate pools with optimized buffer sizes (RECOMMENDED)
|
||||
// Memory: (100 × 128 KiB) + (10 × 1 MiB) = 12.8 MiB + 10 MiB = 22.8 MiB
|
||||
// Savings: 77.2 MiB (77% reduction!)
|
||||
clientOptimized := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
PoolSize: 100, // Large pool for regular operations
|
||||
|
||||
// Small buffers for regular operations
|
||||
ReadBufferSize: 64 * 1024, // 64 KiB
|
||||
WriteBufferSize: 64 * 1024, // 64 KiB
|
||||
|
||||
// Large buffers only for pipelining
|
||||
PipelinePoolSize: 10, // Small pool
|
||||
PipelineReadBufferSize: 512 * 1024, // 512 KiB
|
||||
PipelineWriteBufferSize: 512 * 1024, // 512 KiB
|
||||
})
|
||||
defer clientOptimized.Close()
|
||||
|
||||
fmt.Println("Optimized configuration saves 77% memory!")
|
||||
|
||||
// Output:
|
||||
// Optimized configuration saves 77% memory!
|
||||
}
|
||||
|
||||
78
internal/pool/conn_relaxed_timeout_bench_test.go
Normal file
78
internal/pool/conn_relaxed_timeout_bench_test.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// BenchmarkGetEffectiveReadTimeout_NoRelaxedTimeout benchmarks the fast path (no relaxed timeout set)
|
||||
func BenchmarkGetEffectiveReadTimeout_NoRelaxedTimeout(b *testing.B) {
|
||||
netConn := &net.TCPConn{}
|
||||
cn := NewConn(netConn)
|
||||
defer cn.Close()
|
||||
|
||||
normalTimeout := 5 * time.Second
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = cn.getEffectiveReadTimeout(normalTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkGetEffectiveReadTimeout_WithRelaxedTimeout benchmarks when relaxed timeout is set
|
||||
func BenchmarkGetEffectiveReadTimeout_WithRelaxedTimeout(b *testing.B) {
|
||||
netConn := &net.TCPConn{}
|
||||
cn := NewConn(netConn)
|
||||
defer cn.Close()
|
||||
|
||||
// Set relaxed timeout with a deadline far in the future
|
||||
cn.SetRelaxedTimeoutWithDeadline(10*time.Second, 10*time.Second, time.Now().Add(1*time.Hour))
|
||||
|
||||
normalTimeout := 5 * time.Second
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = cn.getEffectiveReadTimeout(normalTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkGetEffectiveWriteTimeout_NoRelaxedTimeout benchmarks the fast path (no relaxed timeout set)
|
||||
func BenchmarkGetEffectiveWriteTimeout_NoRelaxedTimeout(b *testing.B) {
|
||||
netConn := &net.TCPConn{}
|
||||
cn := NewConn(netConn)
|
||||
defer cn.Close()
|
||||
|
||||
normalTimeout := 5 * time.Second
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = cn.getEffectiveWriteTimeout(normalTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkGetEffectiveWriteTimeout_WithRelaxedTimeout benchmarks when relaxed timeout is set
|
||||
func BenchmarkGetEffectiveWriteTimeout_WithRelaxedTimeout(b *testing.B) {
|
||||
netConn := &net.TCPConn{}
|
||||
cn := NewConn(netConn)
|
||||
defer cn.Close()
|
||||
|
||||
// Set relaxed timeout with a deadline far in the future
|
||||
cn.SetRelaxedTimeoutWithDeadline(10*time.Second, 10*time.Second, time.Now().Add(1*time.Hour))
|
||||
|
||||
normalTimeout := 5 * time.Second
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = cn.getEffectiveWriteTimeout(normalTimeout)
|
||||
}
|
||||
}
|
||||
|
||||
96
internal/pool/goroutine_overhead_test.go
Normal file
96
internal/pool/goroutine_overhead_test.go
Normal file
@@ -0,0 +1,96 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestBackgroundGoroutineOverhead measures the overhead of the background time updater
|
||||
func TestBackgroundGoroutineOverhead(t *testing.T) {
|
||||
// Measure baseline goroutines
|
||||
runtime.GC()
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
baselineGoroutines := runtime.NumGoroutine()
|
||||
|
||||
t.Logf("Baseline goroutines: %d", baselineGoroutines)
|
||||
|
||||
// The background goroutine is already running from init()
|
||||
// Let's verify it's updating the cache
|
||||
time1 := getCachedTimeNs()
|
||||
time.Sleep(100 * time.Millisecond) // Wait for at least 2 updates (50ms each)
|
||||
time2 := getCachedTimeNs()
|
||||
|
||||
if time2 <= time1 {
|
||||
t.Errorf("Time cache not updating: time1=%d, time2=%d", time1, time2)
|
||||
}
|
||||
|
||||
diff := time2 - time1
|
||||
expectedDiff := int64(100 * time.Millisecond)
|
||||
|
||||
// Allow 10% tolerance
|
||||
if diff < expectedDiff*9/10 || diff > expectedDiff*11/10 {
|
||||
t.Logf("Warning: Time diff %dns not close to expected %dns (within 10%%)", diff, expectedDiff)
|
||||
} else {
|
||||
t.Logf("Time cache updating correctly: diff=%dns, expected~%dns", diff, expectedDiff)
|
||||
}
|
||||
|
||||
// Check goroutine count (should be baseline + 1 for our updater)
|
||||
currentGoroutines := runtime.NumGoroutine()
|
||||
t.Logf("Current goroutines: %d (expected: %d)", currentGoroutines, baselineGoroutines)
|
||||
}
|
||||
|
||||
// BenchmarkBackgroundGoroutineImpact measures if the background goroutine impacts performance
|
||||
func BenchmarkBackgroundGoroutineImpact(b *testing.B) {
|
||||
// This benchmark runs alongside the background goroutine
|
||||
// to see if there's any measurable impact on other operations
|
||||
|
||||
var counter atomic.Int64
|
||||
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
counter.Add(1)
|
||||
_ = getCachedTimeNs()
|
||||
}
|
||||
})
|
||||
|
||||
b.Logf("Total operations: %d", counter.Load())
|
||||
}
|
||||
|
||||
// BenchmarkTickerOverhead measures the overhead of time.Ticker
|
||||
func BenchmarkTickerOverhead(b *testing.B) {
|
||||
// Create a ticker to measure its overhead
|
||||
ticker := time.NewTicker(50 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
|
||||
var updates atomic.Int64
|
||||
done := make(chan struct{})
|
||||
|
||||
// Background goroutine that receives from ticker
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
updates.Add(1)
|
||||
case <-done:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Run benchmark for 3 seconds
|
||||
time.Sleep(3 * time.Second)
|
||||
close(done)
|
||||
|
||||
totalUpdates := updates.Load()
|
||||
expectedUpdates := int64(3000 / 50) // 3000ms / 50ms = 60 updates
|
||||
|
||||
b.Logf("Ticker fired %d times in 3s (expected ~%d)", totalUpdates, expectedUpdates)
|
||||
|
||||
if totalUpdates < expectedUpdates*9/10 || totalUpdates > expectedUpdates*11/10 {
|
||||
b.Logf("Warning: Update count not within 10%% of expected")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -64,7 +64,8 @@ type Stats struct {
|
||||
IdleConns uint32 // number of idle connections in the pool
|
||||
StaleConns uint32 // number of stale connections removed from the pool
|
||||
|
||||
PubSubStats PubSubStats
|
||||
PubSubStats PubSubStats
|
||||
PipelineStats *Stats // stats for the separate pipeline pool (if configured)
|
||||
}
|
||||
|
||||
type Pooler interface {
|
||||
|
||||
43
internal/pool/time_benchmark_test.go
Normal file
43
internal/pool/time_benchmark_test.go
Normal file
@@ -0,0 +1,43 @@
|
||||
package pool
|
||||
|
||||
import (
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// BenchmarkTimeNow measures the cost of time.Now()
|
||||
func BenchmarkTimeNow(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkTimeNowUnixNano measures the cost of time.Now().UnixNano()
|
||||
func BenchmarkTimeNowUnixNano(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = time.Now().UnixNano()
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkAtomicLoad measures the cost of atomic load
|
||||
func BenchmarkAtomicLoad(b *testing.B) {
|
||||
var val atomic.Int64
|
||||
val.Store(12345)
|
||||
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = val.Load()
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkGetCachedTimeNs measures the cost of our cached time function
|
||||
func BenchmarkGetCachedTimeNs(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
for i := 0; i < b.N; i++ {
|
||||
_ = getCachedTimeNs()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,12 +15,19 @@ func BenchmarkReadStringReply(b *testing.B) {
|
||||
// Create a RESP bulk string reply
|
||||
value := bytes.Repeat([]byte("x"), size)
|
||||
reply := fmt.Sprintf("$%d\r\n%s\r\n", size, value)
|
||||
data := []byte(reply)
|
||||
|
||||
// Reuse reader (realistic usage pattern)
|
||||
reader := bytes.NewReader(data)
|
||||
r := NewReader(reader)
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
r := NewReader(bytes.NewReader([]byte(reply)))
|
||||
reader.Reset(data)
|
||||
r.Reset(reader)
|
||||
|
||||
line, err := r.readLine()
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
@@ -49,10 +56,19 @@ func BenchmarkReadString(b *testing.B) {
|
||||
|
||||
for _, tc := range testCases {
|
||||
b.Run(tc.name, func(b *testing.B) {
|
||||
// Create reader once and reuse it (realistic usage pattern)
|
||||
data := []byte(tc.reply)
|
||||
reader := bytes.NewReader(data)
|
||||
r := NewReader(reader)
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
r := NewReader(bytes.NewReader([]byte(tc.reply)))
|
||||
// Reset reader to beginning for each iteration
|
||||
reader.Reset(data)
|
||||
r.Reset(reader)
|
||||
|
||||
_, err := r.ReadString()
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
@@ -65,11 +81,18 @@ func BenchmarkReadString(b *testing.B) {
|
||||
// BenchmarkReadStringParallel benchmarks concurrent ReadString calls
|
||||
func BenchmarkReadStringParallel(b *testing.B) {
|
||||
reply := "$100\r\n" + string(bytes.Repeat([]byte("x"), 100)) + "\r\n"
|
||||
data := []byte(reply)
|
||||
|
||||
b.ReportAllocs()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
// Each goroutine gets its own reader (realistic usage)
|
||||
reader := bytes.NewReader(data)
|
||||
r := NewReader(reader)
|
||||
|
||||
for pb.Next() {
|
||||
r := NewReader(bytes.NewReader([]byte(reply)))
|
||||
reader.Reset(data)
|
||||
r.Reset(reader)
|
||||
|
||||
_, err := r.ReadString()
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
@@ -78,3 +101,68 @@ func BenchmarkReadStringParallel(b *testing.B) {
|
||||
})
|
||||
}
|
||||
|
||||
// BenchmarkReadInt benchmarks integer parsing
|
||||
func BenchmarkReadInt(b *testing.B) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
reply string
|
||||
}{
|
||||
{"small_int", ":42\r\n"},
|
||||
{"large_int", ":9223372036854775807\r\n"},
|
||||
{"negative_int", ":-12345\r\n"},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
b.Run(tc.name, func(b *testing.B) {
|
||||
data := []byte(tc.reply)
|
||||
reader := bytes.NewReader(data)
|
||||
r := NewReader(reader)
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
reader.Reset(data)
|
||||
r.Reset(reader)
|
||||
|
||||
_, err := r.ReadInt()
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkReadArrayLen benchmarks array length parsing
|
||||
func BenchmarkReadArrayLen(b *testing.B) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
reply string
|
||||
}{
|
||||
{"small_array", "*3\r\n"},
|
||||
{"large_array", "*1000\r\n"},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
b.Run(tc.name, func(b *testing.B) {
|
||||
data := []byte(tc.reply)
|
||||
reader := bytes.NewReader(data)
|
||||
r := NewReader(reader)
|
||||
|
||||
b.ResetTimer()
|
||||
b.ReportAllocs()
|
||||
|
||||
for i := 0; i < b.N; i++ {
|
||||
reader.Reset(data)
|
||||
r.Reset(reader)
|
||||
|
||||
_, err := r.ReadArrayLen()
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
54
options.go
54
options.go
@@ -165,6 +165,60 @@ type Options struct {
|
||||
// default: 64 KiB (65536 bytes)
|
||||
WriteBufferSize int
|
||||
|
||||
// PipelineReadBufferSize is the size of the bufio.Reader buffer for pipeline connections.
|
||||
// If set to a value > 0, a separate connection pool will be created specifically for
|
||||
// pipelining operations (Pipeline() and AutoPipeline()) with this buffer size.
|
||||
//
|
||||
// This allows you to use large buffers for pipelining (to reduce syscalls and improve
|
||||
// throughput) while keeping regular command buffers small (to save memory).
|
||||
//
|
||||
// If not set (0), pipeline operations will use the regular connection pool with
|
||||
// ReadBufferSize buffers.
|
||||
//
|
||||
// Recommended: 512 KiB for high-throughput pipelining workloads.
|
||||
//
|
||||
// Example:
|
||||
// client := redis.NewClient(&redis.Options{
|
||||
// Addr: "localhost:6379",
|
||||
// ReadBufferSize: 64 * 1024, // 64 KiB for regular commands
|
||||
// PipelineReadBufferSize: 512 * 1024, // 512 KiB for pipelining
|
||||
// PipelineWriteBufferSize: 512 * 1024,
|
||||
// })
|
||||
//
|
||||
// Memory impact: With PoolSize=100 and PipelinePoolSize=10:
|
||||
// - Without pipeline pool: 100 conns × 1 MiB = 100 MB (if all use 512 KiB buffers)
|
||||
// - With pipeline pool: (100 × 128 KiB) + (10 × 1 MiB) = 22.8 MB (77% savings)
|
||||
//
|
||||
// default: 0 (use ReadBufferSize)
|
||||
PipelineReadBufferSize int
|
||||
|
||||
// PipelineWriteBufferSize is the size of the bufio.Writer buffer for pipeline connections.
|
||||
// If set to a value > 0, a separate connection pool will be created specifically for
|
||||
// pipelining operations (Pipeline() and AutoPipeline()) with this buffer size.
|
||||
//
|
||||
// This allows you to use large buffers for pipelining (to reduce syscalls and improve
|
||||
// throughput) while keeping regular command buffers small (to save memory).
|
||||
//
|
||||
// If not set (0), pipeline operations will use the regular connection pool with
|
||||
// WriteBufferSize buffers.
|
||||
//
|
||||
// Recommended: 512 KiB for high-throughput pipelining workloads.
|
||||
//
|
||||
// default: 0 (use WriteBufferSize)
|
||||
PipelineWriteBufferSize int
|
||||
|
||||
// PipelinePoolSize is the pool size for the separate pipeline connection pool.
|
||||
// Only used if PipelineReadBufferSize or PipelineWriteBufferSize is set.
|
||||
//
|
||||
// Pipelining typically needs fewer connections than regular operations because
|
||||
// batching reduces connection contention. A smaller pool saves memory while
|
||||
// maintaining high throughput.
|
||||
//
|
||||
// If not set (0), defaults to 10 connections.
|
||||
//
|
||||
// default: 10
|
||||
PipelinePoolSize int
|
||||
|
||||
// PoolFIFO type of connection pool.
|
||||
//
|
||||
// - true for FIFO pool
|
||||
|
||||
234
pipeline_buffer_test.go
Normal file
234
pipeline_buffer_test.go
Normal file
@@ -0,0 +1,234 @@
|
||||
package redis_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
// TestPipelineBufferSizes verifies that pipeline pool is created with custom buffer sizes
|
||||
func TestPipelineBufferSizes(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Create client with custom pipeline buffer sizes
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
ReadBufferSize: 64 * 1024, // 64 KiB for regular connections
|
||||
WriteBufferSize: 64 * 1024, // 64 KiB for regular connections
|
||||
PipelineReadBufferSize: 512 * 1024, // 512 KiB for pipeline connections
|
||||
PipelineWriteBufferSize: 512 * 1024, // 512 KiB for pipeline connections
|
||||
PipelinePoolSize: 5, // Small pool for pipelining
|
||||
})
|
||||
defer client.Close()
|
||||
|
||||
// Test that regular commands work
|
||||
err := client.Set(ctx, "test_key", "test_value", 0).Err()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to set key: %v", err)
|
||||
}
|
||||
|
||||
val, err := client.Get(ctx, "test_key").Result()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get key: %v", err)
|
||||
}
|
||||
if val != "test_value" {
|
||||
t.Fatalf("Expected 'test_value', got '%s'", val)
|
||||
}
|
||||
|
||||
// Test that pipeline works
|
||||
pipe := client.Pipeline()
|
||||
pipe.Set(ctx, "pipe_key1", "value1", 0)
|
||||
pipe.Set(ctx, "pipe_key2", "value2", 0)
|
||||
pipe.Get(ctx, "pipe_key1")
|
||||
pipe.Get(ctx, "pipe_key2")
|
||||
|
||||
cmds, err := pipe.Exec(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Pipeline execution failed: %v", err)
|
||||
}
|
||||
|
||||
if len(cmds) != 4 {
|
||||
t.Fatalf("Expected 4 commands, got %d", len(cmds))
|
||||
}
|
||||
|
||||
// Verify results
|
||||
if cmds[2].(*redis.StringCmd).Val() != "value1" {
|
||||
t.Fatalf("Expected 'value1', got '%s'", cmds[2].(*redis.StringCmd).Val())
|
||||
}
|
||||
if cmds[3].(*redis.StringCmd).Val() != "value2" {
|
||||
t.Fatalf("Expected 'value2', got '%s'", cmds[3].(*redis.StringCmd).Val())
|
||||
}
|
||||
|
||||
t.Log("Pipeline with custom buffer sizes works correctly")
|
||||
}
|
||||
|
||||
// TestPipelineBufferSizesWithAutoPipeline verifies that autopipeline uses the pipeline pool
|
||||
func TestPipelineBufferSizesWithAutoPipeline(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Create client with custom pipeline buffer sizes
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
ReadBufferSize: 64 * 1024, // 64 KiB for regular connections
|
||||
WriteBufferSize: 64 * 1024, // 64 KiB for regular connections
|
||||
PipelineReadBufferSize: 512 * 1024, // 512 KiB for pipeline connections
|
||||
PipelineWriteBufferSize: 512 * 1024, // 512 KiB for pipeline connections
|
||||
PipelinePoolSize: 5, // Small pool for pipelining
|
||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||
MaxBatchSize: 10,
|
||||
MaxConcurrentBatches: 2,
|
||||
UseRingBuffer: true,
|
||||
RingBufferSize: 64,
|
||||
},
|
||||
})
|
||||
defer client.Close()
|
||||
|
||||
// Test autopipeline
|
||||
ap := client.AutoPipeline()
|
||||
defer ap.Close()
|
||||
|
||||
// Send multiple commands
|
||||
for i := 0; i < 20; i++ {
|
||||
key := "ap_key_" + string(rune('0'+i%10))
|
||||
val := "value_" + string(rune('0'+i%10))
|
||||
ap.Set(ctx, key, val, 0).Err()
|
||||
}
|
||||
|
||||
// Verify some values
|
||||
val, err := client.Get(ctx, "ap_key_0").Result()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to get autopipelined key: %v", err)
|
||||
}
|
||||
if val != "value_0" {
|
||||
t.Fatalf("Expected 'value_0', got '%s'", val)
|
||||
}
|
||||
|
||||
t.Log("AutoPipeline with custom buffer sizes works correctly")
|
||||
}
|
||||
|
||||
// TestNoPipelinePool verifies that client works without pipeline pool (backward compatibility)
|
||||
func TestNoPipelinePool(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Create client WITHOUT custom pipeline buffer sizes
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
ReadBufferSize: 64 * 1024, // 64 KiB for all connections
|
||||
WriteBufferSize: 64 * 1024, // 64 KiB for all connections
|
||||
// No PipelineReadBufferSize or PipelineWriteBufferSize
|
||||
})
|
||||
defer client.Close()
|
||||
|
||||
// Test that pipeline still works (using regular pool)
|
||||
pipe := client.Pipeline()
|
||||
pipe.Set(ctx, "no_pipe_pool_key1", "value1", 0)
|
||||
pipe.Set(ctx, "no_pipe_pool_key2", "value2", 0)
|
||||
pipe.Get(ctx, "no_pipe_pool_key1")
|
||||
pipe.Get(ctx, "no_pipe_pool_key2")
|
||||
|
||||
cmds, err := pipe.Exec(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Pipeline execution failed: %v", err)
|
||||
}
|
||||
|
||||
if len(cmds) != 4 {
|
||||
t.Fatalf("Expected 4 commands, got %d", len(cmds))
|
||||
}
|
||||
|
||||
// Verify results
|
||||
if cmds[2].(*redis.StringCmd).Val() != "value1" {
|
||||
t.Fatalf("Expected 'value1', got '%s'", cmds[2].(*redis.StringCmd).Val())
|
||||
}
|
||||
if cmds[3].(*redis.StringCmd).Val() != "value2" {
|
||||
t.Fatalf("Expected 'value2', got '%s'", cmds[3].(*redis.StringCmd).Val())
|
||||
}
|
||||
|
||||
t.Log("Pipeline without custom buffer sizes (backward compatibility) works correctly")
|
||||
}
|
||||
|
||||
// TestPipelinePoolStats verifies that PoolStats includes pipeline pool stats
|
||||
func TestPipelinePoolStats(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Create client with custom pipeline buffer sizes
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
ReadBufferSize: 64 * 1024, // 64 KiB for regular connections
|
||||
WriteBufferSize: 64 * 1024, // 64 KiB for regular connections
|
||||
PipelineReadBufferSize: 512 * 1024, // 512 KiB for pipeline connections
|
||||
PipelineWriteBufferSize: 512 * 1024, // 512 KiB for pipeline connections
|
||||
PipelinePoolSize: 5, // Small pool for pipelining
|
||||
})
|
||||
defer client.Close()
|
||||
|
||||
// Execute some pipeline commands
|
||||
pipe := client.Pipeline()
|
||||
for i := 0; i < 10; i++ {
|
||||
pipe.Set(ctx, "stats_key", "value", 0)
|
||||
}
|
||||
_, err := pipe.Exec(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Pipeline execution failed: %v", err)
|
||||
}
|
||||
|
||||
// Get pool stats
|
||||
stats := client.PoolStats()
|
||||
if stats == nil {
|
||||
t.Fatal("PoolStats returned nil")
|
||||
}
|
||||
|
||||
// Verify pipeline stats are included
|
||||
if stats.PipelineStats == nil {
|
||||
t.Fatal("PipelineStats is nil - pipeline pool stats not included")
|
||||
}
|
||||
|
||||
t.Logf("Regular pool stats: TotalConns=%d, IdleConns=%d, Hits=%d, Misses=%d",
|
||||
stats.TotalConns, stats.IdleConns, stats.Hits, stats.Misses)
|
||||
t.Logf("Pipeline pool stats: TotalConns=%d, IdleConns=%d, Hits=%d, Misses=%d",
|
||||
stats.PipelineStats.TotalConns, stats.PipelineStats.IdleConns,
|
||||
stats.PipelineStats.Hits, stats.PipelineStats.Misses)
|
||||
|
||||
// Verify pipeline pool has connections
|
||||
if stats.PipelineStats.TotalConns == 0 {
|
||||
t.Error("Pipeline pool has no connections")
|
||||
}
|
||||
|
||||
t.Log("PoolStats includes pipeline pool stats correctly")
|
||||
}
|
||||
|
||||
// TestNoPipelinePoolStats verifies that PoolStats works without pipeline pool
|
||||
func TestNoPipelinePoolStats(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Create client WITHOUT custom pipeline buffer sizes
|
||||
client := redis.NewClient(&redis.Options{
|
||||
Addr: "localhost:6379",
|
||||
ReadBufferSize: 64 * 1024, // 64 KiB for all connections
|
||||
WriteBufferSize: 64 * 1024, // 64 KiB for all connections
|
||||
})
|
||||
defer client.Close()
|
||||
|
||||
// Execute some commands
|
||||
err := client.Set(ctx, "test_key", "test_value", 0).Err()
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to set key: %v", err)
|
||||
}
|
||||
|
||||
// Get pool stats
|
||||
stats := client.PoolStats()
|
||||
if stats == nil {
|
||||
t.Fatal("PoolStats returned nil")
|
||||
}
|
||||
|
||||
// Verify pipeline stats are nil (no pipeline pool)
|
||||
if stats.PipelineStats != nil {
|
||||
t.Error("PipelineStats should be nil when no pipeline pool is configured")
|
||||
}
|
||||
|
||||
t.Logf("Regular pool stats: TotalConns=%d, IdleConns=%d, Hits=%d, Misses=%d",
|
||||
stats.TotalConns, stats.IdleConns, stats.Hits, stats.Misses)
|
||||
|
||||
t.Log("PoolStats works correctly without pipeline pool")
|
||||
}
|
||||
|
||||
139
redis.go
139
redis.go
@@ -223,6 +223,11 @@ type baseClient struct {
|
||||
pubSubPool *pool.PubSubPool
|
||||
hooksMixin
|
||||
|
||||
// pipelinePool is a separate connection pool for pipelining operations.
|
||||
// Created only if PipelineReadBufferSize or PipelineWriteBufferSize is set in Options.
|
||||
// This allows using large buffers for pipelining while keeping regular buffers small.
|
||||
pipelinePool pool.Pooler
|
||||
|
||||
onClose func() error // hook called when client is closed
|
||||
|
||||
// Push notification processing
|
||||
@@ -244,6 +249,7 @@ func (c *baseClient) clone() *baseClient {
|
||||
clone := &baseClient{
|
||||
opt: c.opt,
|
||||
connPool: c.connPool,
|
||||
pipelinePool: c.pipelinePool,
|
||||
onClose: c.onClose,
|
||||
pushProcessor: c.pushProcessor,
|
||||
maintNotificationsManager: maintNotificationsManager,
|
||||
@@ -641,6 +647,55 @@ func (c *baseClient) withConn(
|
||||
return fnErr
|
||||
}
|
||||
|
||||
// withPipelineConn executes fn with a connection from the pipeline pool (if available),
|
||||
// otherwise from the regular pool.
|
||||
func (c *baseClient) withPipelineConn(
|
||||
ctx context.Context, fn func(context.Context, *pool.Conn) error,
|
||||
) error {
|
||||
// Use pipeline pool if available, otherwise fall back to regular pool
|
||||
if c.pipelinePool != nil {
|
||||
cn, err := c.pipelinePool.Get(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Initialize connection if needed
|
||||
if !cn.IsInited() {
|
||||
if err := c.initConn(ctx, cn); err != nil {
|
||||
c.pipelinePool.Remove(ctx, cn, err)
|
||||
if err := errors.Unwrap(err); err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// initConn will transition to IDLE state, so we need to acquire it
|
||||
if !cn.TryAcquire() {
|
||||
return fmt.Errorf("redis: connection is not usable")
|
||||
}
|
||||
}
|
||||
|
||||
var fnErr error
|
||||
defer func() {
|
||||
if isBadConn(fnErr, false, c.opt.Addr) {
|
||||
c.pipelinePool.Remove(ctx, cn, fnErr)
|
||||
} else {
|
||||
// process any pending push notifications before returning the connection to the pool
|
||||
if err := c.processPushNotifications(ctx, cn); err != nil {
|
||||
internal.Logger.Printf(ctx, "push: error processing pending notifications before releasing connection: %v", err)
|
||||
}
|
||||
c.pipelinePool.Put(ctx, cn)
|
||||
}
|
||||
}()
|
||||
|
||||
fnErr = fn(ctx, cn)
|
||||
return fnErr
|
||||
}
|
||||
|
||||
// Fall back to regular pool
|
||||
return c.withConn(ctx, fn)
|
||||
}
|
||||
|
||||
func (c *baseClient) dial(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
return c.opt.Dialer(ctx, network, addr)
|
||||
}
|
||||
@@ -824,6 +879,12 @@ func (c *baseClient) Close() error {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
// Close pipeline pool if it exists
|
||||
if c.pipelinePool != nil {
|
||||
if err := c.pipelinePool.Close(); err != nil && firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
return firstErr
|
||||
}
|
||||
|
||||
@@ -832,6 +893,14 @@ func (c *baseClient) getAddr() string {
|
||||
}
|
||||
|
||||
func (c *baseClient) processPipeline(ctx context.Context, cmds []Cmder) error {
|
||||
// Use pipeline pool if available
|
||||
if c.pipelinePool != nil {
|
||||
if err := c.generalProcessPipelineWithPool(ctx, cmds, c.pipelineProcessCmds); err != nil {
|
||||
return err
|
||||
}
|
||||
return cmdsFirstErr(cmds)
|
||||
}
|
||||
// Fall back to regular pool
|
||||
if err := c.generalProcessPipeline(ctx, cmds, c.pipelineProcessCmds); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -881,6 +950,41 @@ func (c *baseClient) generalProcessPipeline(
|
||||
return lastErr
|
||||
}
|
||||
|
||||
// generalProcessPipelineWithPool is like generalProcessPipeline but uses the pipeline pool.
|
||||
func (c *baseClient) generalProcessPipelineWithPool(
|
||||
ctx context.Context, cmds []Cmder, p pipelineProcessor,
|
||||
) error {
|
||||
var lastErr error
|
||||
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
|
||||
if attempt > 0 {
|
||||
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
|
||||
setCmdsErr(cmds, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// Enable retries by default to retry dial errors returned by withPipelineConn.
|
||||
canRetry := true
|
||||
lastErr = c.withPipelineConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
|
||||
// Process any pending push notifications before executing the pipeline
|
||||
if err := c.processPushNotifications(ctx, cn); err != nil {
|
||||
internal.Logger.Printf(ctx, "push: error processing pending notifications before processing pipeline: %v", err)
|
||||
}
|
||||
var err error
|
||||
canRetry, err = p(ctx, cn, cmds)
|
||||
return err
|
||||
})
|
||||
if lastErr == nil || !canRetry || !shouldRetry(lastErr, true) {
|
||||
// The error should be set here only when failing to obtain the conn.
|
||||
if !isRedisError(lastErr) {
|
||||
setCmdsErr(cmds, lastErr)
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
}
|
||||
return lastErr
|
||||
}
|
||||
|
||||
func (c *baseClient) pipelineProcessCmds(
|
||||
ctx context.Context, cn *pool.Conn, cmds []Cmder,
|
||||
) (bool, error) {
|
||||
@@ -1056,6 +1160,37 @@ func NewClient(opt *Options) *Client {
|
||||
c.connPool.AddPoolHook(c.streamingCredentialsManager.PoolHook())
|
||||
}
|
||||
|
||||
// Create separate pipeline pool if custom buffer sizes are configured
|
||||
// This must be done after streamingCredentialsManager is created
|
||||
if opt.PipelineReadBufferSize > 0 || opt.PipelineWriteBufferSize > 0 {
|
||||
pipelineOpt := opt.clone()
|
||||
|
||||
// Use pipeline buffer sizes (fall back to regular sizes if not set)
|
||||
if opt.PipelineReadBufferSize > 0 {
|
||||
pipelineOpt.ReadBufferSize = opt.PipelineReadBufferSize
|
||||
}
|
||||
if opt.PipelineWriteBufferSize > 0 {
|
||||
pipelineOpt.WriteBufferSize = opt.PipelineWriteBufferSize
|
||||
}
|
||||
|
||||
// Use pipeline pool size (default: 10)
|
||||
if opt.PipelinePoolSize > 0 {
|
||||
pipelineOpt.PoolSize = opt.PipelinePoolSize
|
||||
} else {
|
||||
pipelineOpt.PoolSize = 10 // Default smaller pool for pipelining
|
||||
}
|
||||
|
||||
c.pipelinePool, err = newConnPool(pipelineOpt, c.dialHook)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("redis: failed to create pipeline pool: %w", err))
|
||||
}
|
||||
|
||||
// Add streaming credentials hook to pipeline pool if configured
|
||||
if c.streamingCredentialsManager != nil {
|
||||
c.pipelinePool.AddPoolHook(c.streamingCredentialsManager.PoolHook())
|
||||
}
|
||||
}
|
||||
|
||||
// Initialize maintnotifications first if enabled and protocol is RESP3
|
||||
if opt.MaintNotificationsConfig != nil && opt.MaintNotificationsConfig.Mode != maintnotifications.ModeDisabled && opt.Protocol == 3 {
|
||||
err := c.enableMaintNotificationsUpgrades()
|
||||
@@ -1167,6 +1302,10 @@ type PoolStats pool.Stats
|
||||
func (c *Client) PoolStats() *PoolStats {
|
||||
stats := c.connPool.Stats()
|
||||
stats.PubSubStats = *(c.pubSubPool.Stats())
|
||||
// Include pipeline pool stats if available
|
||||
if c.pipelinePool != nil {
|
||||
stats.PipelineStats = c.pipelinePool.Stats()
|
||||
}
|
||||
return (*PoolStats)(stats)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user