mirror of
https://github.com/redis/go-redis.git
synced 2025-12-02 06:22:31 +03:00
wip
This commit is contained in:
391
autopipeline.go
391
autopipeline.go
@@ -5,6 +5,8 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9/internal"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AutoPipelineConfig configures the autopipelining behavior.
|
// AutoPipelineConfig configures the autopipelining behavior.
|
||||||
@@ -13,30 +15,11 @@ type AutoPipelineConfig struct {
|
|||||||
// Default: 100
|
// Default: 100
|
||||||
MaxBatchSize int
|
MaxBatchSize int
|
||||||
|
|
||||||
// FlushInterval is the maximum time to wait before flushing pending commands.
|
|
||||||
// Default: 10ms
|
|
||||||
FlushInterval time.Duration
|
|
||||||
|
|
||||||
// MaxConcurrentBatches is the maximum number of concurrent pipeline executions.
|
// MaxConcurrentBatches is the maximum number of concurrent pipeline executions.
|
||||||
// This prevents overwhelming the server with too many concurrent pipelines.
|
// This prevents overwhelming the server with too many concurrent pipelines.
|
||||||
// Default: 10
|
// Default: 10
|
||||||
MaxConcurrentBatches int
|
MaxConcurrentBatches int
|
||||||
|
|
||||||
// UseRingBuffer enables the high-performance ring buffer queue.
|
|
||||||
// When enabled, uses a pre-allocated ring buffer with lock-free enqueue
|
|
||||||
// instead of the slice-based queue. This provides:
|
|
||||||
// - 6x faster enqueue operations
|
|
||||||
// - 100% reduction in allocations during enqueue
|
|
||||||
// - Better performance under high concurrency
|
|
||||||
// Default: true (enabled)
|
|
||||||
UseRingBuffer bool
|
|
||||||
|
|
||||||
// RingBufferSize is the size of the ring buffer queue.
|
|
||||||
// Only used when UseRingBuffer is true.
|
|
||||||
// Must be a power of 2 for optimal performance (will be rounded up if not).
|
|
||||||
// Default: 1024
|
|
||||||
RingBufferSize int
|
|
||||||
|
|
||||||
// MaxFlushDelay is the maximum delay after flushing before checking for more commands.
|
// MaxFlushDelay is the maximum delay after flushing before checking for more commands.
|
||||||
// A small delay (e.g., 100μs) can significantly reduce CPU usage by allowing
|
// A small delay (e.g., 100μs) can significantly reduce CPU usage by allowing
|
||||||
// more commands to batch together, at the cost of slightly higher latency.
|
// more commands to batch together, at the cost of slightly higher latency.
|
||||||
@@ -56,19 +39,16 @@ type AutoPipelineConfig struct {
|
|||||||
func DefaultAutoPipelineConfig() *AutoPipelineConfig {
|
func DefaultAutoPipelineConfig() *AutoPipelineConfig {
|
||||||
return &AutoPipelineConfig{
|
return &AutoPipelineConfig{
|
||||||
MaxBatchSize: 50,
|
MaxBatchSize: 50,
|
||||||
FlushInterval: time.Millisecond,
|
|
||||||
MaxConcurrentBatches: 10,
|
MaxConcurrentBatches: 10,
|
||||||
UseRingBuffer: true, // Enable ring buffer by default
|
|
||||||
RingBufferSize: 1024,
|
|
||||||
MaxFlushDelay: 0, // No delay by default (lowest latency)
|
MaxFlushDelay: 0, // No delay by default (lowest latency)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// pipelinerClient is an interface for clients that support pipelining.
|
// cmdableClient is an interface for clients that support pipelining.
|
||||||
// Both Client and ClusterClient implement this interface.
|
// Both Client and ClusterClient implement this interface.
|
||||||
type pipelinerClient interface {
|
type cmdableClient interface {
|
||||||
|
Cmdable
|
||||||
Process(ctx context.Context, cmd Cmder) error
|
Process(ctx context.Context, cmd Cmder) error
|
||||||
Pipeline() Pipeliner
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// queuedCmd wraps a command with a done channel for completion notification
|
// queuedCmd wraps a command with a done channel for completion notification
|
||||||
@@ -77,6 +57,91 @@ type queuedCmd struct {
|
|||||||
done chan struct{}
|
done chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// doneChanPool is a sync.Pool for done channels to reduce allocations
|
||||||
|
// We use buffered channels so we can signal completion without blocking
|
||||||
|
var doneChanPool = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return make(chan struct{}, 1)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// getDoneChan gets a done channel from the pool
|
||||||
|
func getDoneChan() chan struct{} {
|
||||||
|
ch := doneChanPool.Get().(chan struct{})
|
||||||
|
// Make sure the channel is empty
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
// putDoneChan returns a done channel to the pool after draining it
|
||||||
|
func putDoneChan(ch chan struct{}) {
|
||||||
|
// Drain the channel completely
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ch:
|
||||||
|
default:
|
||||||
|
doneChanPool.Put(ch)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// queuedCmdPool is a sync.Pool for queuedCmd to reduce allocations
|
||||||
|
var queuedCmdPool = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
return &queuedCmd{}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// getQueuedCmd gets a queuedCmd from the pool and initializes it
|
||||||
|
func getQueuedCmd(cmd Cmder) *queuedCmd {
|
||||||
|
qc := queuedCmdPool.Get().(*queuedCmd)
|
||||||
|
qc.cmd = cmd
|
||||||
|
qc.done = getDoneChan()
|
||||||
|
return qc
|
||||||
|
}
|
||||||
|
|
||||||
|
// putQueuedCmd returns a queuedCmd to the pool after clearing it
|
||||||
|
func putQueuedCmd(qc *queuedCmd) {
|
||||||
|
qc.cmd = nil
|
||||||
|
if qc.done != nil {
|
||||||
|
putDoneChan(qc.done)
|
||||||
|
qc.done = nil
|
||||||
|
}
|
||||||
|
queuedCmdPool.Put(qc)
|
||||||
|
}
|
||||||
|
|
||||||
|
// queueSlicePool is a sync.Pool for queue slices to reduce allocations
|
||||||
|
var queueSlicePool = sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
// Create a slice with capacity for typical batch size
|
||||||
|
return make([]*queuedCmd, 0, 100)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// getQueueSlice gets a queue slice from the pool
|
||||||
|
func getQueueSlice(capacity int) []*queuedCmd {
|
||||||
|
slice := queueSlicePool.Get().([]*queuedCmd)
|
||||||
|
// Clear the slice but keep capacity
|
||||||
|
slice = slice[:0]
|
||||||
|
// If the capacity is too small, allocate a new one
|
||||||
|
if cap(slice) < capacity {
|
||||||
|
return make([]*queuedCmd, 0, capacity)
|
||||||
|
}
|
||||||
|
return slice
|
||||||
|
}
|
||||||
|
|
||||||
|
// putQueueSlice returns a queue slice to the pool
|
||||||
|
func putQueueSlice(slice []*queuedCmd) {
|
||||||
|
// Only pool slices that aren't too large (avoid memory bloat)
|
||||||
|
if cap(slice) <= 1000 {
|
||||||
|
queueSlicePool.Put(slice)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// autoPipelineCmd wraps a command and blocks on result access until execution completes.
|
// autoPipelineCmd wraps a command and blocks on result access until execution completes.
|
||||||
type autoPipelineCmd struct {
|
type autoPipelineCmd struct {
|
||||||
Cmder
|
Cmder
|
||||||
@@ -100,7 +165,6 @@ func (c *autoPipelineCmd) String() string {
|
|||||||
// 1. Collecting commands from multiple goroutines into a shared queue
|
// 1. Collecting commands from multiple goroutines into a shared queue
|
||||||
// 2. Automatically flushing the queue when:
|
// 2. Automatically flushing the queue when:
|
||||||
// - The batch size reaches MaxBatchSize
|
// - The batch size reaches MaxBatchSize
|
||||||
// - The flush interval (FlushInterval) expires
|
|
||||||
//
|
//
|
||||||
// 3. Executing batched commands using Redis pipelining
|
// 3. Executing batched commands using Redis pipelining
|
||||||
//
|
//
|
||||||
@@ -116,56 +180,59 @@ func (c *autoPipelineCmd) String() string {
|
|||||||
type AutoPipeliner struct {
|
type AutoPipeliner struct {
|
||||||
cmdable // Embed cmdable to get all Redis command methods
|
cmdable // Embed cmdable to get all Redis command methods
|
||||||
|
|
||||||
pipeliner pipelinerClient
|
pipeliner cmdableClient
|
||||||
config *AutoPipelineConfig
|
config *AutoPipelineConfig
|
||||||
|
|
||||||
// Command queue - either slice-based or ring buffer
|
// Command queue - either slice-based or ring buffer
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
queue []*queuedCmd // Slice-based queue (legacy)
|
queue []*queuedCmd // Slice-based queue (legacy)
|
||||||
ring *autoPipelineRing // Ring buffer queue (high-performance)
|
queueLen atomic.Int32 // Fast path check without lock
|
||||||
queueLen atomic.Int32 // Fast path check without lock
|
|
||||||
|
|
||||||
// Flush control
|
// Flush control
|
||||||
flushCh chan struct{} // Signal to flush immediately
|
flushCh chan struct{} // Signal to flush immediately
|
||||||
|
|
||||||
// Concurrency control
|
// Concurrency control
|
||||||
sem chan struct{} // Semaphore for concurrent batch limit
|
sem *internal.FastSemaphore // Semaphore for concurrent batch limit
|
||||||
|
|
||||||
// Lifecycle
|
// Lifecycle
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
closed atomic.Bool
|
closed atomic.Bool
|
||||||
cachedFlushInterval atomic.Int64
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewAutoPipeliner creates a new autopipeliner for the given client.
|
// NewAutoPipeliner creates a new autopipeliner for the given client.
|
||||||
// The client can be either *Client or *ClusterClient.
|
// The client can be either *Client or *ClusterClient.
|
||||||
func NewAutoPipeliner(pipeliner pipelinerClient, config *AutoPipelineConfig) *AutoPipeliner {
|
func NewAutoPipeliner(pipeliner cmdableClient, config *AutoPipelineConfig) *AutoPipeliner {
|
||||||
if config == nil {
|
if config == nil {
|
||||||
config = DefaultAutoPipelineConfig()
|
config = DefaultAutoPipelineConfig()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply defaults for zero values
|
||||||
|
if config.MaxBatchSize <= 0 {
|
||||||
|
config.MaxBatchSize = 50
|
||||||
|
}
|
||||||
|
|
||||||
|
if config.MaxConcurrentBatches <= 0 {
|
||||||
|
config.MaxConcurrentBatches = 10
|
||||||
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
|
||||||
ap := &AutoPipeliner{
|
ap := &AutoPipeliner{
|
||||||
pipeliner: pipeliner,
|
pipeliner: pipeliner,
|
||||||
config: config,
|
config: config,
|
||||||
flushCh: make(chan struct{}, 1),
|
flushCh: make(chan struct{}, 1),
|
||||||
sem: make(chan struct{}, config.MaxConcurrentBatches),
|
sem: internal.NewFastSemaphore(int32(config.MaxConcurrentBatches)),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize cmdable to route all commands through Process
|
// Initialize cmdable to route all commands through processAndBlock
|
||||||
ap.cmdable = ap.Process
|
ap.cmdable = ap.processAndBlock
|
||||||
|
|
||||||
// Initialize queue based on configuration
|
// Initialize queue based on configuration
|
||||||
if config.UseRingBuffer {
|
ap.queue = getQueueSlice(config.MaxBatchSize)
|
||||||
ap.ring = newAutoPipelineRing(config.RingBufferSize)
|
|
||||||
} else {
|
|
||||||
ap.queue = make([]*queuedCmd, 0, config.MaxBatchSize)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start background flusher
|
// Start background flusher
|
||||||
ap.wg.Add(1)
|
ap.wg.Add(1)
|
||||||
@@ -223,28 +290,41 @@ func (ap *AutoPipeliner) Process(ctx context.Context, cmd Cmder) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// processAndBlock is used by the cmdable interface.
|
||||||
|
// It queues the command and blocks until execution completes.
|
||||||
|
// This allows typed methods like Get(), Set(), etc. to work correctly with autopipelining.
|
||||||
|
func (ap *AutoPipeliner) processAndBlock(ctx context.Context, cmd Cmder) error {
|
||||||
|
// Check if this is a blocking command (has read timeout set)
|
||||||
|
// Blocking commands like BLPOP, BRPOP, BZMPOP should not be autopipelined
|
||||||
|
if cmd.readTimeout() != nil {
|
||||||
|
// Execute blocking commands directly without autopipelining
|
||||||
|
return ap.pipeliner.Process(ctx, cmd)
|
||||||
|
}
|
||||||
|
|
||||||
|
done := ap.process(ctx, cmd)
|
||||||
|
|
||||||
|
// Block until the command is executed
|
||||||
|
<-done
|
||||||
|
|
||||||
|
return cmd.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// closedChan is a reusable closed channel for error cases
|
||||||
|
var closedChan = func() chan struct{} {
|
||||||
|
ch := make(chan struct{})
|
||||||
|
close(ch)
|
||||||
|
return ch
|
||||||
|
}()
|
||||||
|
|
||||||
// process is the internal method that queues a command and returns its done channel.
|
// process is the internal method that queues a command and returns its done channel.
|
||||||
func (ap *AutoPipeliner) process(ctx context.Context, cmd Cmder) <-chan struct{} {
|
func (ap *AutoPipeliner) process(ctx context.Context, cmd Cmder) <-chan struct{} {
|
||||||
if ap.closed.Load() {
|
if ap.closed.Load() {
|
||||||
cmd.SetErr(ErrClosed)
|
cmd.SetErr(ErrClosed)
|
||||||
closedCh := make(chan struct{})
|
return closedChan
|
||||||
close(closedCh)
|
|
||||||
return closedCh
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use ring buffer if enabled
|
// Get queued command from pool
|
||||||
if ap.config.UseRingBuffer {
|
qc := getQueuedCmd(cmd)
|
||||||
done := ap.ring.putOne(cmd)
|
|
||||||
// putOne will signal the flusher via condition variable if needed
|
|
||||||
return done
|
|
||||||
}
|
|
||||||
|
|
||||||
// Legacy slice-based queue
|
|
||||||
// Create queued command with done channel
|
|
||||||
qc := &queuedCmd{
|
|
||||||
cmd: cmd,
|
|
||||||
done: make(chan struct{}),
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fast path: try to acquire lock without blocking
|
// Fast path: try to acquire lock without blocking
|
||||||
if ap.mu.TryLock() {
|
if ap.mu.TryLock() {
|
||||||
@@ -268,12 +348,11 @@ func (ap *AutoPipeliner) process(ctx context.Context, cmd Cmder) <-chan struct{}
|
|||||||
ap.queueLen.Store(int32(queueLen))
|
ap.queueLen.Store(int32(queueLen))
|
||||||
ap.mu.Unlock()
|
ap.mu.Unlock()
|
||||||
|
|
||||||
// Always signal the flusher (non-blocking)
|
// always signal the flusher (non-blocking)
|
||||||
select {
|
select {
|
||||||
case ap.flushCh <- struct{}{}:
|
case ap.flushCh <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
return qc.done
|
return qc.done
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -308,11 +387,6 @@ func (ap *AutoPipeliner) Close() error {
|
|||||||
// Cancel context to stop flusher
|
// Cancel context to stop flusher
|
||||||
ap.cancel()
|
ap.cancel()
|
||||||
|
|
||||||
// Wake up flusher if it's waiting
|
|
||||||
if ap.config.UseRingBuffer {
|
|
||||||
ap.ring.wakeAll()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for flusher to finish
|
// Wait for flusher to finish
|
||||||
ap.wg.Wait()
|
ap.wg.Wait()
|
||||||
|
|
||||||
@@ -322,106 +396,8 @@ func (ap *AutoPipeliner) Close() error {
|
|||||||
// flusher is the background goroutine that flushes batches.
|
// flusher is the background goroutine that flushes batches.
|
||||||
func (ap *AutoPipeliner) flusher() {
|
func (ap *AutoPipeliner) flusher() {
|
||||||
defer ap.wg.Done()
|
defer ap.wg.Done()
|
||||||
|
ap.flusherSlice()
|
||||||
if !ap.config.UseRingBuffer {
|
return
|
||||||
// Legacy slice-based flusher
|
|
||||||
ap.flusherSlice()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ring buffer flusher
|
|
||||||
var (
|
|
||||||
cmds = make([]Cmder, 0, ap.config.MaxBatchSize)
|
|
||||||
doneChans = make([]chan struct{}, 0, ap.config.MaxBatchSize)
|
|
||||||
)
|
|
||||||
|
|
||||||
for {
|
|
||||||
// Try to get next command (non-blocking)
|
|
||||||
cmd, done := ap.ring.nextWriteCmd()
|
|
||||||
|
|
||||||
if cmd == nil {
|
|
||||||
// No command available
|
|
||||||
// If we have buffered commands, execute them first
|
|
||||||
if len(cmds) > 0 {
|
|
||||||
ap.executeBatch(cmds, doneChans)
|
|
||||||
cmds = cmds[:0]
|
|
||||||
doneChans = doneChans[:0]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for shutdown before blocking
|
|
||||||
select {
|
|
||||||
case <-ap.ctx.Done():
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for next command (blocking)
|
|
||||||
// This will be woken up by wakeAll() during shutdown
|
|
||||||
cmd, done = ap.ring.waitForWrite()
|
|
||||||
|
|
||||||
// If nil, ring is closed
|
|
||||||
if cmd == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add command to batch
|
|
||||||
cmds = append(cmds, cmd)
|
|
||||||
doneChans = append(doneChans, done)
|
|
||||||
|
|
||||||
// Execute batch if full
|
|
||||||
if len(cmds) >= ap.config.MaxBatchSize {
|
|
||||||
ap.executeBatch(cmds, doneChans)
|
|
||||||
cmds = cmds[:0]
|
|
||||||
doneChans = doneChans[:0]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// executeBatch executes a batch of commands.
|
|
||||||
func (ap *AutoPipeliner) executeBatch(cmds []Cmder, doneChans []chan struct{}) {
|
|
||||||
if len(cmds) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Acquire semaphore (limit concurrent batches)
|
|
||||||
select {
|
|
||||||
case ap.sem <- struct{}{}:
|
|
||||||
defer func() {
|
|
||||||
<-ap.sem
|
|
||||||
}()
|
|
||||||
case <-ap.ctx.Done():
|
|
||||||
// Context cancelled, set error on all commands and notify
|
|
||||||
for i, cmd := range cmds {
|
|
||||||
cmd.SetErr(ErrClosed)
|
|
||||||
doneChans[i] <- struct{}{} // Send signal instead of close
|
|
||||||
ap.ring.finishCmd()
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fast path for single command
|
|
||||||
if len(cmds) == 1 {
|
|
||||||
_ = ap.pipeliner.Process(context.Background(), cmds[0])
|
|
||||||
doneChans[0] <- struct{}{} // Send signal instead of close
|
|
||||||
ap.ring.finishCmd()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execute pipeline for multiple commands
|
|
||||||
pipe := ap.pipeliner.Pipeline()
|
|
||||||
for _, cmd := range cmds {
|
|
||||||
_ = pipe.Process(context.Background(), cmd)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execute and wait for completion
|
|
||||||
_, _ = pipe.Exec(context.Background())
|
|
||||||
|
|
||||||
// Notify completion and finish slots
|
|
||||||
for _, done := range doneChans {
|
|
||||||
done <- struct{}{} // Send signal instead of close
|
|
||||||
ap.ring.finishCmd()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// flusherSlice is the legacy slice-based flusher.
|
// flusherSlice is the legacy slice-based flusher.
|
||||||
@@ -441,6 +417,9 @@ func (ap *AutoPipeliner) flusherSlice() {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ap.flushCh:
|
case <-ap.flushCh:
|
||||||
|
if ap.Len() >= ap.config.MaxBatchSize {
|
||||||
|
goto drained
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
goto drained
|
goto drained
|
||||||
}
|
}
|
||||||
@@ -465,8 +444,6 @@ func (ap *AutoPipeliner) flusherSlice() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// flushBatchSlice flushes commands from the slice-based queue (legacy).
|
// flushBatchSlice flushes commands from the slice-based queue (legacy).
|
||||||
func (ap *AutoPipeliner) flushBatchSlice() {
|
func (ap *AutoPipeliner) flushBatchSlice() {
|
||||||
// Get commands from queue
|
// Get commands from queue
|
||||||
@@ -479,33 +456,43 @@ func (ap *AutoPipeliner) flushBatchSlice() {
|
|||||||
|
|
||||||
// Take ownership of current queue
|
// Take ownership of current queue
|
||||||
queuedCmds := ap.queue
|
queuedCmds := ap.queue
|
||||||
ap.queue = make([]*queuedCmd, 0, ap.config.MaxBatchSize)
|
ap.queue = getQueueSlice(ap.config.MaxBatchSize)
|
||||||
ap.queueLen.Store(0)
|
ap.queueLen.Store(0)
|
||||||
ap.mu.Unlock()
|
ap.mu.Unlock()
|
||||||
|
|
||||||
// Acquire semaphore (limit concurrent batches)
|
// Acquire semaphore (limit concurrent batches)
|
||||||
select {
|
// Try fast path first
|
||||||
case ap.sem <- struct{}{}:
|
if !ap.sem.TryAcquire() {
|
||||||
defer func() {
|
// Fast path failed, need to wait
|
||||||
<-ap.sem
|
// essentially, this is a blocking call
|
||||||
}()
|
err := ap.sem.Acquire(ap.ctx, 5*time.Second, context.DeadlineExceeded)
|
||||||
case <-ap.ctx.Done():
|
if err != nil {
|
||||||
// Context cancelled, set error on all commands and notify
|
// Context cancelled, set error on all commands and notify
|
||||||
for _, qc := range queuedCmds {
|
for _, qc := range queuedCmds {
|
||||||
qc.cmd.SetErr(ErrClosed)
|
qc.cmd.SetErr(ErrClosed)
|
||||||
close(qc.done)
|
// Signal completion by sending to buffered channel
|
||||||
|
qc.done <- struct{}{}
|
||||||
|
putQueuedCmd(qc)
|
||||||
|
}
|
||||||
|
putQueueSlice(queuedCmds)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(queuedCmds) == 0 {
|
if len(queuedCmds) == 0 {
|
||||||
|
ap.sem.Release()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Fast path for single command
|
// Fast path for single command
|
||||||
if len(queuedCmds) == 1 {
|
if len(queuedCmds) == 1 {
|
||||||
_ = ap.pipeliner.Process(context.Background(), queuedCmds[0].cmd)
|
qc := queuedCmds[0]
|
||||||
close(queuedCmds[0].done)
|
_ = ap.pipeliner.Process(context.Background(), qc.cmd)
|
||||||
|
// Signal completion by sending to buffered channel
|
||||||
|
qc.done <- struct{}{}
|
||||||
|
ap.sem.Release()
|
||||||
|
putQueuedCmd(qc)
|
||||||
|
putQueueSlice(queuedCmds)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -521,15 +508,16 @@ func (ap *AutoPipeliner) flushBatchSlice() {
|
|||||||
// IMPORTANT: Only notify after pipeline execution is complete
|
// IMPORTANT: Only notify after pipeline execution is complete
|
||||||
// This ensures command results are fully populated before waiters proceed
|
// This ensures command results are fully populated before waiters proceed
|
||||||
for _, qc := range queuedCmds {
|
for _, qc := range queuedCmds {
|
||||||
close(qc.done)
|
// Signal completion by sending to buffered channel
|
||||||
|
qc.done <- struct{}{}
|
||||||
|
putQueuedCmd(qc)
|
||||||
}
|
}
|
||||||
|
ap.sem.Release()
|
||||||
|
putQueueSlice(queuedCmds)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Len returns the current number of queued commands.
|
// Len returns the current number of queued commands.
|
||||||
func (ap *AutoPipeliner) Len() int {
|
func (ap *AutoPipeliner) Len() int {
|
||||||
if ap.config.UseRingBuffer {
|
|
||||||
return ap.ring.len()
|
|
||||||
}
|
|
||||||
return int(ap.queueLen.Load())
|
return int(ap.queueLen.Load())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -559,17 +547,7 @@ func (ap *AutoPipeliner) Pipelined(ctx context.Context, fn func(Pipeliner) error
|
|||||||
// Note: This uses the underlying client's TxPipeline if available (Client, Ring, ClusterClient).
|
// Note: This uses the underlying client's TxPipeline if available (Client, Ring, ClusterClient).
|
||||||
// For other clients, this will panic.
|
// For other clients, this will panic.
|
||||||
func (ap *AutoPipeliner) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
|
func (ap *AutoPipeliner) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
|
||||||
// Try to get TxPipeline from the underlying client
|
return ap.pipeliner.TxPipeline().Pipelined(ctx, fn)
|
||||||
// This works for Client, Ring, and ClusterClient
|
|
||||||
type txPipeliner interface {
|
|
||||||
TxPipeline() Pipeliner
|
|
||||||
}
|
|
||||||
|
|
||||||
if txp, ok := ap.pipeliner.(txPipeliner); ok {
|
|
||||||
return txp.TxPipeline().Pipelined(ctx, fn)
|
|
||||||
}
|
|
||||||
|
|
||||||
panic("redis: TxPipelined not supported by this client type")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TxPipeline returns a new transaction pipeline that uses the underlying pipeliner.
|
// TxPipeline returns a new transaction pipeline that uses the underlying pipeliner.
|
||||||
@@ -578,15 +556,8 @@ func (ap *AutoPipeliner) TxPipelined(ctx context.Context, fn func(Pipeliner) err
|
|||||||
// Note: This uses the underlying client's TxPipeline if available (Client, Ring, ClusterClient).
|
// Note: This uses the underlying client's TxPipeline if available (Client, Ring, ClusterClient).
|
||||||
// For other clients, this will panic.
|
// For other clients, this will panic.
|
||||||
func (ap *AutoPipeliner) TxPipeline() Pipeliner {
|
func (ap *AutoPipeliner) TxPipeline() Pipeliner {
|
||||||
// Try to get TxPipeline from the underlying client
|
return ap.pipeliner.TxPipeline()
|
||||||
// This works for Client, Ring, and ClusterClient
|
|
||||||
type txPipeliner interface {
|
|
||||||
TxPipeline() Pipeliner
|
|
||||||
}
|
|
||||||
|
|
||||||
if txp, ok := ap.pipeliner.(txPipeliner); ok {
|
|
||||||
return txp.TxPipeline()
|
|
||||||
}
|
|
||||||
|
|
||||||
panic("redis: TxPipeline not supported by this client type")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// validate AutoPipeliner implements Cmdable
|
||||||
|
var _ Cmdable = (*AutoPipeliner)(nil)
|
||||||
|
|||||||
@@ -69,7 +69,7 @@ func BenchmarkAutoPipeline(b *testing.B) {
|
|||||||
Addr: ":6379",
|
Addr: ":6379",
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 100,
|
MaxBatchSize: 100,
|
||||||
FlushInterval: 10 * time.Millisecond,
|
MaxFlushDelay: 10 * time.Millisecond,
|
||||||
MaxConcurrentBatches: 10,
|
MaxConcurrentBatches: 10,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -155,7 +155,7 @@ func BenchmarkConcurrentAutoPipeline(b *testing.B) {
|
|||||||
Addr: ":6379",
|
Addr: ":6379",
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 100,
|
MaxBatchSize: 100,
|
||||||
FlushInterval: 10 * time.Millisecond,
|
MaxFlushDelay: 10 * time.Millisecond,
|
||||||
MaxConcurrentBatches: 10,
|
MaxConcurrentBatches: 10,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -201,7 +201,7 @@ func BenchmarkAutoPipelineBatchSizes(b *testing.B) {
|
|||||||
Addr: ":6379",
|
Addr: ":6379",
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: batchSize,
|
MaxBatchSize: batchSize,
|
||||||
FlushInterval: 10 * time.Millisecond,
|
MaxFlushDelay: 10 * time.Millisecond,
|
||||||
MaxConcurrentBatches: 10,
|
MaxConcurrentBatches: 10,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -222,23 +222,23 @@ func BenchmarkAutoPipelineBatchSizes(b *testing.B) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// BenchmarkAutoPipelineFlushIntervals tests different flush intervals
|
// BenchmarkAutoPipelineMaxFlushDelays tests different max flush delays
|
||||||
func BenchmarkAutoPipelineFlushIntervals(b *testing.B) {
|
func BenchmarkAutoPipelineMaxFlushDelays(b *testing.B) {
|
||||||
intervals := []time.Duration{
|
delays := []time.Duration{
|
||||||
1 * time.Millisecond,
|
1 * time.Millisecond,
|
||||||
5 * time.Millisecond,
|
5 * time.Millisecond,
|
||||||
10 * time.Millisecond,
|
10 * time.Millisecond,
|
||||||
50 * time.Millisecond,
|
50 * time.Millisecond,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, interval := range intervals {
|
for _, delay := range delays {
|
||||||
b.Run(fmt.Sprintf("interval=%s", interval), func(b *testing.B) {
|
b.Run(fmt.Sprintf("delay=%s", delay), func(b *testing.B) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
client := redis.NewClient(&redis.Options{
|
client := redis.NewClient(&redis.Options{
|
||||||
Addr: ":6379",
|
Addr: ":6379",
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 100,
|
MaxBatchSize: 100,
|
||||||
FlushInterval: interval,
|
MaxFlushDelay: delay,
|
||||||
MaxConcurrentBatches: 10,
|
MaxConcurrentBatches: 10,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -349,10 +349,8 @@ func BenchmarkRingBufferVsSliceQueue(b *testing.B) {
|
|||||||
Addr: ":6379",
|
Addr: ":6379",
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 50,
|
MaxBatchSize: 50,
|
||||||
FlushInterval: time.Millisecond,
|
MaxFlushDelay: time.Millisecond,
|
||||||
MaxConcurrentBatches: 10,
|
MaxConcurrentBatches: 10,
|
||||||
UseRingBuffer: true,
|
|
||||||
RingBufferSize: 1024,
|
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
@@ -378,9 +376,8 @@ func BenchmarkRingBufferVsSliceQueue(b *testing.B) {
|
|||||||
Addr: ":6379",
|
Addr: ":6379",
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 50,
|
MaxBatchSize: 50,
|
||||||
FlushInterval: time.Millisecond,
|
MaxFlushDelay: time.Millisecond,
|
||||||
MaxConcurrentBatches: 10,
|
MaxConcurrentBatches: 10,
|
||||||
UseRingBuffer: false, // Use legacy slice queue
|
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
@@ -417,11 +414,8 @@ func BenchmarkMaxFlushDelay(b *testing.B) {
|
|||||||
Addr: ":6379",
|
Addr: ":6379",
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 50,
|
MaxBatchSize: 50,
|
||||||
FlushInterval: time.Millisecond,
|
|
||||||
MaxConcurrentBatches: 10,
|
|
||||||
UseRingBuffer: true,
|
|
||||||
RingBufferSize: 1024,
|
|
||||||
MaxFlushDelay: delay,
|
MaxFlushDelay: delay,
|
||||||
|
MaxConcurrentBatches: 10,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
@@ -462,10 +456,8 @@ func BenchmarkBufferSizes(b *testing.B) {
|
|||||||
WriteBufferSize: size,
|
WriteBufferSize: size,
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 50,
|
MaxBatchSize: 50,
|
||||||
FlushInterval: time.Millisecond,
|
MaxFlushDelay: time.Millisecond,
|
||||||
MaxConcurrentBatches: 10,
|
MaxConcurrentBatches: 10,
|
||||||
UseRingBuffer: true,
|
|
||||||
RingBufferSize: 1024,
|
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
@@ -487,27 +479,25 @@ func BenchmarkBufferSizes(b *testing.B) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// BenchmarkRingBufferSizes benchmarks different ring buffer sizes
|
// BenchmarkAutoPipelineMaxBatchSizes benchmarks different max batch sizes
|
||||||
func BenchmarkRingBufferSizes(b *testing.B) {
|
func BenchmarkAutoPipelineMaxBatchSizes(b *testing.B) {
|
||||||
ringSizes := []int{
|
batchSizes := []int{
|
||||||
256,
|
10,
|
||||||
512,
|
50, // default
|
||||||
1024, // default
|
100,
|
||||||
2048,
|
200,
|
||||||
4096,
|
500,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, size := range ringSizes {
|
for _, size := range batchSizes {
|
||||||
b.Run(fmt.Sprintf("ring_%d", size), func(b *testing.B) {
|
b.Run(fmt.Sprintf("batch_%d", size), func(b *testing.B) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
client := redis.NewClient(&redis.Options{
|
client := redis.NewClient(&redis.Options{
|
||||||
Addr: ":6379",
|
Addr: ":6379",
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 50,
|
MaxBatchSize: size,
|
||||||
FlushInterval: time.Millisecond,
|
MaxFlushDelay: time.Millisecond,
|
||||||
MaxConcurrentBatches: 10,
|
MaxConcurrentBatches: 10,
|
||||||
UseRingBuffer: true,
|
|
||||||
RingBufferSize: size,
|
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
|||||||
@@ -1,236 +0,0 @@
|
|||||||
package redis
|
|
||||||
|
|
||||||
import (
|
|
||||||
"math/bits"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
)
|
|
||||||
|
|
||||||
// autoPipelineRing is a pre-allocated ring buffer queue for autopipelining.
|
|
||||||
// It provides lock-free enqueue and FIFO ordering guarantees.
|
|
||||||
//
|
|
||||||
// Ring buffer architecture:
|
|
||||||
// - Pre-allocated slots (no allocations during enqueue)
|
|
||||||
// - Per-slot channels for request-response matching
|
|
||||||
// - Atomic write pointer for lock-free enqueue
|
|
||||||
// - Separate read pointers for write and read goroutines
|
|
||||||
//
|
|
||||||
// The ring buffer uses three pointers:
|
|
||||||
// - write: Where app goroutines add commands (atomic increment)
|
|
||||||
// - read1: Where flush goroutine reads commands to send
|
|
||||||
// - read2: Where result goroutine matches responses (currently unused, for future optimization)
|
|
||||||
type autoPipelineRing struct {
|
|
||||||
store []autoPipelineSlot // Pre-allocated slots
|
|
||||||
mask uint32 // Size - 1 (for fast modulo via bitwise AND)
|
|
||||||
write uint32 // Write position (atomic, incremented by app goroutines)
|
|
||||||
read1 uint32 // Read position for flush goroutine
|
|
||||||
read2 uint32 // Read position for result matching (reserved for future use)
|
|
||||||
cmds []Cmder // Persistent buffer for collecting commands (reused, no allocations)
|
|
||||||
doneChans []chan struct{} // Persistent buffer for collecting done channels (reused, no allocations)
|
|
||||||
}
|
|
||||||
|
|
||||||
// autoPipelineSlot represents a single command slot in the ring buffer.
|
|
||||||
type autoPipelineSlot struct {
|
|
||||||
c1 *sync.Cond // Condition variable for write synchronization (shared mutex with c2)
|
|
||||||
c2 *sync.Cond // Condition variable for wait/signal (shared mutex with c1)
|
|
||||||
cmd Cmder // The command to execute
|
|
||||||
done chan struct{} // Completion notification channel (pre-allocated, reused)
|
|
||||||
mark uint32 // State: 0=empty, 1=queued, 2=sent (atomic)
|
|
||||||
slept bool // Whether writer goroutine is sleeping on this slot
|
|
||||||
}
|
|
||||||
|
|
||||||
// State constants for autoPipelineSlot.mark
|
|
||||||
const (
|
|
||||||
apSlotEmpty uint32 = 0 // Slot is empty and available
|
|
||||||
apSlotQueued uint32 = 1 // Command queued, ready to be sent
|
|
||||||
apSlotSent uint32 = 2 // Command sent, waiting for response
|
|
||||||
apSlotClosed uint32 = 3 // Ring is closed, stop waiting
|
|
||||||
)
|
|
||||||
|
|
||||||
// newAutoPipelineRing creates a new ring buffer with the specified size.
|
|
||||||
// Size will be rounded up to the next power of 2 for efficient modulo operations.
|
|
||||||
func newAutoPipelineRing(size int) *autoPipelineRing {
|
|
||||||
// Round up to power of 2 for fast modulo via bitwise AND
|
|
||||||
if size <= 0 {
|
|
||||||
size = 1024 // Default size
|
|
||||||
}
|
|
||||||
if size&(size-1) != 0 {
|
|
||||||
// Not a power of 2, round up
|
|
||||||
size = 1 << (32 - bits.LeadingZeros32(uint32(size)))
|
|
||||||
}
|
|
||||||
|
|
||||||
r := &autoPipelineRing{
|
|
||||||
store: make([]autoPipelineSlot, size),
|
|
||||||
mask: uint32(size - 1),
|
|
||||||
cmds: make([]Cmder, 0, size), // Persistent buffer, reused
|
|
||||||
doneChans: make([]chan struct{}, 0, size), // Persistent buffer, reused
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize each slot with condition variables and pre-allocated channel
|
|
||||||
for i := range r.store {
|
|
||||||
m := &sync.Mutex{}
|
|
||||||
r.store[i].c1 = sync.NewCond(m)
|
|
||||||
r.store[i].c2 = sync.NewCond(m) // Share the same mutex
|
|
||||||
r.store[i].done = make(chan struct{}, 1) // Buffered channel for signal (not close)
|
|
||||||
}
|
|
||||||
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
// putOne enqueues a command into the ring buffer.
|
|
||||||
// Returns the done channel that will be signaled when the command completes.
|
|
||||||
//
|
|
||||||
// Ring buffer enqueue implementation:
|
|
||||||
// - Atomic increment for write position
|
|
||||||
// - Wait on condition variable if slot is full
|
|
||||||
// - Signal reader if it's sleeping
|
|
||||||
func (r *autoPipelineRing) putOne(cmd Cmder) <-chan struct{} {
|
|
||||||
// Atomic increment to get next slot
|
|
||||||
slot := &r.store[atomic.AddUint32(&r.write, 1)&r.mask]
|
|
||||||
|
|
||||||
// Lock the slot
|
|
||||||
slot.c1.L.Lock()
|
|
||||||
|
|
||||||
// Wait if slot is not empty (mark != 0)
|
|
||||||
for slot.mark != 0 {
|
|
||||||
slot.c1.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store command and mark as queued
|
|
||||||
slot.cmd = cmd
|
|
||||||
slot.mark = 1
|
|
||||||
s := slot.slept
|
|
||||||
|
|
||||||
slot.c1.L.Unlock()
|
|
||||||
|
|
||||||
// If reader is sleeping, wake it up
|
|
||||||
if s {
|
|
||||||
slot.c2.Broadcast()
|
|
||||||
}
|
|
||||||
|
|
||||||
return slot.done
|
|
||||||
}
|
|
||||||
|
|
||||||
// nextWriteCmd tries to get the next command (non-blocking).
|
|
||||||
// Returns nil if no command is available.
|
|
||||||
// Should only be called by the flush goroutine.
|
|
||||||
func (r *autoPipelineRing) nextWriteCmd() (Cmder, chan struct{}) {
|
|
||||||
r.read1++
|
|
||||||
p := r.read1 & r.mask
|
|
||||||
slot := &r.store[p]
|
|
||||||
|
|
||||||
slot.c1.L.Lock()
|
|
||||||
if slot.mark == 1 {
|
|
||||||
cmd := slot.cmd
|
|
||||||
done := slot.done
|
|
||||||
slot.mark = 2
|
|
||||||
slot.c1.L.Unlock()
|
|
||||||
return cmd, done
|
|
||||||
}
|
|
||||||
// No command available, rollback read position
|
|
||||||
r.read1--
|
|
||||||
slot.c1.L.Unlock()
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// waitForWrite waits for the next command (blocking).
|
|
||||||
// Should only be called by the flush goroutine.
|
|
||||||
// Returns nil if the ring is closed.
|
|
||||||
func (r *autoPipelineRing) waitForWrite() (Cmder, chan struct{}) {
|
|
||||||
r.read1++
|
|
||||||
p := r.read1 & r.mask
|
|
||||||
slot := &r.store[p]
|
|
||||||
|
|
||||||
slot.c1.L.Lock()
|
|
||||||
// Wait until command is available (mark == 1) or closed (mark == 3)
|
|
||||||
for slot.mark != 1 && slot.mark != apSlotClosed {
|
|
||||||
slot.slept = true
|
|
||||||
slot.c2.Wait() // c1 and c2 share the same mutex
|
|
||||||
slot.slept = false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if closed
|
|
||||||
if slot.mark == apSlotClosed {
|
|
||||||
r.read1-- // Rollback read position
|
|
||||||
slot.c1.L.Unlock()
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
cmd := slot.cmd
|
|
||||||
done := slot.done
|
|
||||||
slot.mark = 2
|
|
||||||
slot.c1.L.Unlock()
|
|
||||||
return cmd, done
|
|
||||||
}
|
|
||||||
|
|
||||||
// finishCmd marks a command as completed and clears the slot.
|
|
||||||
// Should only be called by the flush goroutine.
|
|
||||||
func (r *autoPipelineRing) finishCmd() {
|
|
||||||
r.read2++
|
|
||||||
p := r.read2 & r.mask
|
|
||||||
slot := &r.store[p]
|
|
||||||
|
|
||||||
slot.c1.L.Lock()
|
|
||||||
if slot.mark == 2 {
|
|
||||||
// Drain the done channel before reusing
|
|
||||||
select {
|
|
||||||
case <-slot.done:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clear slot for reuse
|
|
||||||
slot.cmd = nil
|
|
||||||
slot.mark = 0
|
|
||||||
}
|
|
||||||
slot.c1.L.Unlock()
|
|
||||||
slot.c1.Signal() // Wake up any writer waiting on this slot
|
|
||||||
}
|
|
||||||
|
|
||||||
// len returns the approximate number of queued commands.
|
|
||||||
// This is an estimate and may not be exact due to concurrent access.
|
|
||||||
func (r *autoPipelineRing) len() int {
|
|
||||||
write := atomic.LoadUint32(&r.write)
|
|
||||||
read := atomic.LoadUint32(&r.read1)
|
|
||||||
|
|
||||||
// Handle wrap-around
|
|
||||||
if write >= read {
|
|
||||||
return int(write - read)
|
|
||||||
}
|
|
||||||
// Wrapped around
|
|
||||||
return int(write + (^uint32(0) - read) + 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// cap returns the capacity of the ring buffer.
|
|
||||||
func (r *autoPipelineRing) cap() int {
|
|
||||||
return len(r.store)
|
|
||||||
}
|
|
||||||
|
|
||||||
// reset resets the ring buffer to empty state.
|
|
||||||
// This should only be called when no goroutines are accessing the ring.
|
|
||||||
func (r *autoPipelineRing) reset() {
|
|
||||||
atomic.StoreUint32(&r.write, 0)
|
|
||||||
atomic.StoreUint32(&r.read1, 0)
|
|
||||||
atomic.StoreUint32(&r.read2, 0)
|
|
||||||
|
|
||||||
for i := range r.store {
|
|
||||||
r.store[i].c1.L.Lock()
|
|
||||||
r.store[i].cmd = nil
|
|
||||||
r.store[i].mark = 0
|
|
||||||
r.store[i].slept = false
|
|
||||||
r.store[i].c1.L.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// wakeAll wakes up all waiting goroutines.
|
|
||||||
// This is used during shutdown to unblock the flusher.
|
|
||||||
func (r *autoPipelineRing) wakeAll() {
|
|
||||||
for i := range r.store {
|
|
||||||
r.store[i].c1.L.Lock()
|
|
||||||
if r.store[i].mark == 0 {
|
|
||||||
r.store[i].mark = apSlotClosed
|
|
||||||
}
|
|
||||||
r.store[i].c1.L.Unlock()
|
|
||||||
r.store[i].c2.Broadcast()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@@ -16,7 +16,7 @@ func TestAutoPipelineSequential(t *testing.T) {
|
|||||||
Addr: ":6379",
|
Addr: ":6379",
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 10,
|
MaxBatchSize: 10,
|
||||||
FlushInterval: 50 * time.Millisecond,
|
MaxFlushDelay: 50 * time.Millisecond,
|
||||||
MaxConcurrentBatches: 5,
|
MaxConcurrentBatches: 5,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -64,7 +64,7 @@ func TestAutoPipelineSequentialSmallBatches(t *testing.T) {
|
|||||||
Addr: ":6379",
|
Addr: ":6379",
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 1000, // Large batch size
|
MaxBatchSize: 1000, // Large batch size
|
||||||
FlushInterval: 20 * time.Millisecond, // Rely on timer
|
MaxFlushDelay: 20 * time.Millisecond, // Rely on timer
|
||||||
MaxConcurrentBatches: 5,
|
MaxConcurrentBatches: 5,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -111,7 +111,7 @@ func TestAutoPipelineSequentialMixed(t *testing.T) {
|
|||||||
Addr: ":6379",
|
Addr: ":6379",
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 5,
|
MaxBatchSize: 5,
|
||||||
FlushInterval: 50 * time.Millisecond,
|
MaxFlushDelay: 50 * time.Millisecond,
|
||||||
MaxConcurrentBatches: 5,
|
MaxConcurrentBatches: 5,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ var _ = Describe("AutoPipeline", func() {
|
|||||||
Addr: redisAddr,
|
Addr: redisAddr,
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 10,
|
MaxBatchSize: 10,
|
||||||
FlushInterval: 50 * time.Millisecond,
|
MaxFlushDelay: 50 * time.Millisecond,
|
||||||
MaxConcurrentBatches: 5,
|
MaxConcurrentBatches: 5,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -235,7 +235,7 @@ var _ = Describe("AutoPipeline", func() {
|
|||||||
Addr: redisAddr,
|
Addr: redisAddr,
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 5,
|
MaxBatchSize: 5,
|
||||||
FlushInterval: 10 * time.Millisecond,
|
MaxFlushDelay: 10 * time.Millisecond,
|
||||||
MaxConcurrentBatches: 2,
|
MaxConcurrentBatches: 2,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -296,7 +296,7 @@ func TestAutoPipelineBasic(t *testing.T) {
|
|||||||
Addr: ":6379",
|
Addr: ":6379",
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 10,
|
MaxBatchSize: 10,
|
||||||
FlushInterval: 50 * time.Millisecond,
|
MaxFlushDelay: 50 * time.Millisecond,
|
||||||
MaxConcurrentBatches: 5,
|
MaxConcurrentBatches: 5,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -338,14 +338,14 @@ func TestAutoPipelineBasic(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAutoPipelineFlushInterval(t *testing.T) {
|
func TestAutoPipelineMaxFlushDelay(t *testing.T) {
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
client := redis.NewClient(&redis.Options{
|
client := redis.NewClient(&redis.Options{
|
||||||
Addr: ":6379",
|
Addr: ":6379",
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 1000, // Large batch size so only timer triggers flush
|
MaxBatchSize: 1000, // Large batch size so only timer triggers flush
|
||||||
FlushInterval: 50 * time.Millisecond,
|
MaxFlushDelay: 50 * time.Millisecond,
|
||||||
MaxConcurrentBatches: 5,
|
MaxConcurrentBatches: 5,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|||||||
126
autopipeline_typed_test.go
Normal file
126
autopipeline_typed_test.go
Normal file
@@ -0,0 +1,126 @@
|
|||||||
|
package redis_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestAutoPipelineTypedCommands tests that typed commands like Get() and Set()
|
||||||
|
// work correctly with autopipelining and block until execution.
|
||||||
|
func TestAutoPipelineTypedCommands(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: ":6379",
|
||||||
|
})
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Skip if Redis is not available
|
||||||
|
if err := client.Ping(ctx).Err(); err != nil {
|
||||||
|
t.Skipf("Redis not available: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ap := client.AutoPipeline()
|
||||||
|
defer ap.Close()
|
||||||
|
|
||||||
|
// Test Set and Get
|
||||||
|
setResult, setErr := ap.Set(ctx, "test_key", "test_value", 0).Result()
|
||||||
|
if setErr != nil {
|
||||||
|
t.Fatalf("Set failed: %v", setErr)
|
||||||
|
}
|
||||||
|
if setResult != "OK" {
|
||||||
|
t.Errorf("Expected 'OK', got '%s'", setResult)
|
||||||
|
}
|
||||||
|
|
||||||
|
getResult, getErr := ap.Get(ctx, "test_key").Result()
|
||||||
|
if getErr != nil {
|
||||||
|
t.Fatalf("Get failed: %v", getErr)
|
||||||
|
}
|
||||||
|
if getResult != "test_value" {
|
||||||
|
t.Errorf("Expected 'test_value', got '%s'", getResult)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up
|
||||||
|
client.Del(ctx, "test_key")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestAutoPipelineTypedCommandsMultiple tests multiple typed commands
|
||||||
|
func TestAutoPipelineTypedCommandsMultiple(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: ":6379",
|
||||||
|
})
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Skip if Redis is not available
|
||||||
|
if err := client.Ping(ctx).Err(); err != nil {
|
||||||
|
t.Skipf("Redis not available: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ap := client.AutoPipeline()
|
||||||
|
defer ap.Close()
|
||||||
|
|
||||||
|
// Queue multiple Set commands
|
||||||
|
ap.Set(ctx, "key1", "value1", 0)
|
||||||
|
ap.Set(ctx, "key2", "value2", 0)
|
||||||
|
ap.Set(ctx, "key3", "value3", 0)
|
||||||
|
|
||||||
|
// Get the values - should block until execution
|
||||||
|
val1, err1 := ap.Get(ctx, "key1").Result()
|
||||||
|
if err1 != nil {
|
||||||
|
t.Fatalf("Get key1 failed: %v", err1)
|
||||||
|
}
|
||||||
|
if val1 != "value1" {
|
||||||
|
t.Errorf("Expected 'value1', got '%s'", val1)
|
||||||
|
}
|
||||||
|
|
||||||
|
val2, err2 := ap.Get(ctx, "key2").Result()
|
||||||
|
if err2 != nil {
|
||||||
|
t.Fatalf("Get key2 failed: %v", err2)
|
||||||
|
}
|
||||||
|
if val2 != "value2" {
|
||||||
|
t.Errorf("Expected 'value2', got '%s'", val2)
|
||||||
|
}
|
||||||
|
|
||||||
|
val3, err3 := ap.Get(ctx, "key3").Result()
|
||||||
|
if err3 != nil {
|
||||||
|
t.Fatalf("Get key3 failed: %v", err3)
|
||||||
|
}
|
||||||
|
if val3 != "value3" {
|
||||||
|
t.Errorf("Expected 'value3', got '%s'", val3)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up
|
||||||
|
client.Del(ctx, "key1", "key2", "key3")
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestAutoPipelineTypedCommandsVal tests the Val() method
|
||||||
|
func TestAutoPipelineTypedCommandsVal(t *testing.T) {
|
||||||
|
ctx := context.Background()
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: ":6379",
|
||||||
|
})
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Skip if Redis is not available
|
||||||
|
if err := client.Ping(ctx).Err(); err != nil {
|
||||||
|
t.Skipf("Redis not available: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ap := client.AutoPipeline()
|
||||||
|
defer ap.Close()
|
||||||
|
|
||||||
|
// Set a value
|
||||||
|
ap.Set(ctx, "test_val_key", "test_val_value", 0)
|
||||||
|
|
||||||
|
// Get using Val() - should block until execution
|
||||||
|
val := ap.Get(ctx, "test_val_key").Val()
|
||||||
|
if val != "test_val_value" {
|
||||||
|
t.Errorf("Expected 'test_val_value', got '%s'", val)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up
|
||||||
|
client.Del(ctx, "test_val_key")
|
||||||
|
}
|
||||||
|
|
||||||
@@ -18,43 +18,6 @@ import (
|
|||||||
|
|
||||||
var noDeadline = time.Time{}
|
var noDeadline = time.Time{}
|
||||||
|
|
||||||
// Global time cache updated every 100ms by background goroutine.
|
|
||||||
// This avoids expensive time.Now() syscalls in hot paths like getEffectiveReadTimeout.
|
|
||||||
// Max staleness: 100ms, which is acceptable for timeout deadline checks (timeouts are typically 3-30 seconds).
|
|
||||||
var globalTimeCache struct {
|
|
||||||
nowNs atomic.Int64
|
|
||||||
}
|
|
||||||
|
|
||||||
func init() {
|
|
||||||
// Initialize immediately
|
|
||||||
globalTimeCache.nowNs.Store(time.Now().UnixNano())
|
|
||||||
|
|
||||||
// Start background updater
|
|
||||||
go func() {
|
|
||||||
ticker := time.NewTicker(100 * time.Millisecond)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for range ticker.C {
|
|
||||||
globalTimeCache.nowNs.Store(time.Now().UnixNano())
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// getCachedTimeNs returns the current time in nanoseconds from the global cache.
|
|
||||||
// This is updated every 100ms by a background goroutine, avoiding expensive syscalls.
|
|
||||||
// Max staleness: 100ms.
|
|
||||||
func getCachedTimeNs() int64 {
|
|
||||||
return globalTimeCache.nowNs.Load()
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetCachedTimeNs returns the current time in nanoseconds from the global cache.
|
|
||||||
// This is updated every 100ms by a background goroutine, avoiding expensive syscalls.
|
|
||||||
// Max staleness: 100ms.
|
|
||||||
// Exported for use by other packages that need fast time access.
|
|
||||||
func GetCachedTimeNs() int64 {
|
|
||||||
return getCachedTimeNs()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Global time cache updated every 50ms by background goroutine.
|
// Global time cache updated every 50ms by background goroutine.
|
||||||
// This avoids expensive time.Now() syscalls in hot paths like getEffectiveReadTimeout.
|
// This avoids expensive time.Now() syscalls in hot paths like getEffectiveReadTimeout.
|
||||||
// Max staleness: 50ms, which is acceptable for timeout deadline checks (timeouts are typically 3-30 seconds).
|
// Max staleness: 50ms, which is acceptable for timeout deadline checks (timeouts are typically 3-30 seconds).
|
||||||
|
|||||||
106
internal/proto/writer_bench_test.go
Normal file
106
internal/proto/writer_bench_test.go
Normal file
@@ -0,0 +1,106 @@
|
|||||||
|
package proto
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BenchmarkWriteArgs benchmarks writing command arguments
|
||||||
|
func BenchmarkWriteArgs(b *testing.B) {
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
args []interface{}
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "simple_get",
|
||||||
|
args: []interface{}{"GET", "key"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "simple_set",
|
||||||
|
args: []interface{}{"SET", "key", "value"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "set_with_expiry",
|
||||||
|
args: []interface{}{"SET", "key", "value", "EX", 3600},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "zadd",
|
||||||
|
args: []interface{}{"ZADD", "myset", 1.0, "member1", 2.0, "member2", 3.0, "member3"},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "large_command",
|
||||||
|
args: make([]interface{}, 100),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Initialize large command
|
||||||
|
for i := range testCases[len(testCases)-1].args {
|
||||||
|
testCases[len(testCases)-1].args[i] = "arg"
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
b.Run(tc.name, func(b *testing.B) {
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
wr := NewWriter(buf)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
b.ReportAllocs()
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
buf.Reset()
|
||||||
|
if err := wr.WriteArgs(tc.args); err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BenchmarkWriteArgsParallel benchmarks concurrent writes
|
||||||
|
func BenchmarkWriteArgsParallel(b *testing.B) {
|
||||||
|
args := []interface{}{"SET", "key", "value", "EX", 3600}
|
||||||
|
|
||||||
|
b.ReportAllocs()
|
||||||
|
b.RunParallel(func(pb *testing.PB) {
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
wr := NewWriter(buf)
|
||||||
|
|
||||||
|
for pb.Next() {
|
||||||
|
buf.Reset()
|
||||||
|
if err := wr.WriteArgs(args); err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// BenchmarkWriteCmds benchmarks writing multiple commands (pipeline)
|
||||||
|
func BenchmarkWriteCmds(b *testing.B) {
|
||||||
|
sizes := []int{1, 10, 50, 100, 500}
|
||||||
|
|
||||||
|
for _, size := range sizes {
|
||||||
|
b.Run(string(rune('0'+size/10)), func(b *testing.B) {
|
||||||
|
// Create commands
|
||||||
|
cmds := make([][]interface{}, size)
|
||||||
|
for i := range cmds {
|
||||||
|
cmds[i] = []interface{}{"SET", "key", i}
|
||||||
|
}
|
||||||
|
|
||||||
|
buf := &bytes.Buffer{}
|
||||||
|
wr := NewWriter(buf)
|
||||||
|
|
||||||
|
b.ResetTimer()
|
||||||
|
b.ReportAllocs()
|
||||||
|
|
||||||
|
for i := 0; i < b.N; i++ {
|
||||||
|
buf.Reset()
|
||||||
|
for _, args := range cmds {
|
||||||
|
if err := wr.WriteArgs(args); err != nil {
|
||||||
|
b.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@@ -281,6 +281,9 @@ type Options struct {
|
|||||||
// to reduce network round-trips and improve throughput.
|
// to reduce network round-trips and improve throughput.
|
||||||
// If nil, autopipelining is disabled.
|
// If nil, autopipelining is disabled.
|
||||||
AutoPipelineConfig *AutoPipelineConfig
|
AutoPipelineConfig *AutoPipelineConfig
|
||||||
|
|
||||||
|
// AutoPipelineEnabled enables automatic pipelining of commands.
|
||||||
|
AutoPipelineEnabled bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (opt *Options) init() {
|
func (opt *Options) init() {
|
||||||
|
|||||||
@@ -1020,6 +1020,7 @@ type ClusterClient struct {
|
|||||||
nodes *clusterNodes
|
nodes *clusterNodes
|
||||||
state *clusterStateHolder
|
state *clusterStateHolder
|
||||||
cmdsInfoCache *cmdsInfoCache
|
cmdsInfoCache *cmdsInfoCache
|
||||||
|
autopipeliner *AutoPipeliner
|
||||||
cmdable
|
cmdable
|
||||||
hooksMixin
|
hooksMixin
|
||||||
}
|
}
|
||||||
@@ -1051,6 +1052,14 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
|
|||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *ClusterClient) WithAutoPipeline() AutoPipelinedClient {
|
||||||
|
if c.autopipeliner != nil && !c.autopipeliner.closed.Load() {
|
||||||
|
return c.autopipeliner
|
||||||
|
}
|
||||||
|
c.autopipeliner = c.AutoPipeline()
|
||||||
|
return c.autopipeliner
|
||||||
|
}
|
||||||
|
|
||||||
// Options returns read-only Options that were used to create the client.
|
// Options returns read-only Options that were used to create the client.
|
||||||
func (c *ClusterClient) Options() *ClusterOptions {
|
func (c *ClusterClient) Options() *ClusterOptions {
|
||||||
return c.opt
|
return c.opt
|
||||||
|
|||||||
@@ -17,7 +17,7 @@ func TestClusterAutoPipelineBasic(t *testing.T) {
|
|||||||
Addrs: []string{":7000", ":7001", ":7002"},
|
Addrs: []string{":7000", ":7001", ":7002"},
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 10,
|
MaxBatchSize: 10,
|
||||||
FlushInterval: 50 * time.Millisecond,
|
MaxFlushDelay: 50 * time.Millisecond,
|
||||||
MaxConcurrentBatches: 5,
|
MaxConcurrentBatches: 5,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -66,7 +66,7 @@ func TestClusterAutoPipelineConcurrency(t *testing.T) {
|
|||||||
Addrs: []string{":7000", ":7001", ":7002"},
|
Addrs: []string{":7000", ":7001", ":7002"},
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 50,
|
MaxBatchSize: 50,
|
||||||
FlushInterval: 10 * time.Millisecond,
|
MaxFlushDelay: 10 * time.Millisecond,
|
||||||
MaxConcurrentBatches: 10,
|
MaxConcurrentBatches: 10,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
@@ -121,7 +121,7 @@ func TestClusterAutoPipelineCrossSlot(t *testing.T) {
|
|||||||
Addrs: []string{":7000", ":7001", ":7002"},
|
Addrs: []string{":7000", ":7001", ":7002"},
|
||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 20,
|
MaxBatchSize: 20,
|
||||||
FlushInterval: 10 * time.Millisecond,
|
MaxFlushDelay: 10 * time.Millisecond,
|
||||||
MaxConcurrentBatches: 5,
|
MaxConcurrentBatches: 5,
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|||||||
48
redis.go
48
redis.go
@@ -208,6 +208,12 @@ func (hs *hooksMixin) processTxPipelineHook(ctx context.Context, cmds []Cmder) e
|
|||||||
return hs.current.txPipeline(ctx, cmds)
|
return hs.current.txPipeline(ctx, cmds)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------------------------------------
|
||||||
|
type AutoPipelinedClient interface {
|
||||||
|
Cmdable
|
||||||
|
Close() error
|
||||||
|
}
|
||||||
|
|
||||||
//------------------------------------------------------------------------------
|
//------------------------------------------------------------------------------
|
||||||
|
|
||||||
type baseClient struct {
|
type baseClient struct {
|
||||||
@@ -1007,6 +1013,7 @@ func (c *baseClient) txPipelineReadQueued(ctx context.Context, cn *pool.Conn, rd
|
|||||||
type Client struct {
|
type Client struct {
|
||||||
*baseClient
|
*baseClient
|
||||||
cmdable
|
cmdable
|
||||||
|
autopipeliner *AutoPipeliner
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewClient returns a client to the Redis Server specified by Options.
|
// NewClient returns a client to the Redis Server specified by Options.
|
||||||
@@ -1083,6 +1090,16 @@ func (c *Client) init() {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *Client) WithAutoPipeline() AutoPipelinedClient {
|
||||||
|
if c.autopipeliner != nil {
|
||||||
|
if !c.autopipeliner.closed.Load() {
|
||||||
|
return c.autopipeliner
|
||||||
|
}
|
||||||
|
}
|
||||||
|
c.autopipeliner = c.AutoPipeline()
|
||||||
|
return c.autopipeliner
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) WithTimeout(timeout time.Duration) *Client {
|
func (c *Client) WithTimeout(timeout time.Duration) *Client {
|
||||||
clone := *c
|
clone := *c
|
||||||
clone.baseClient = c.baseClient.withTimeout(timeout)
|
clone.baseClient = c.baseClient.withTimeout(timeout)
|
||||||
@@ -1165,6 +1182,37 @@ func (c *Client) Pipeline() Pipeliner {
|
|||||||
return &pipe
|
return &pipe
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// AutoPipeline creates a new autopipeliner that automatically batches commands.
|
||||||
|
// Commands are automatically flushed based on batch size and time interval.
|
||||||
|
// The autopipeliner must be closed when done to flush pending commands.
|
||||||
|
//
|
||||||
|
// Example:
|
||||||
|
//
|
||||||
|
// ap := client.AutoPipeline()
|
||||||
|
// defer ap.Close()
|
||||||
|
//
|
||||||
|
// var wg sync.WaitGroup
|
||||||
|
// for i := 0; i < 1000; i++ {
|
||||||
|
// wg.Add(1)
|
||||||
|
// go func(idx int) {
|
||||||
|
// defer wg.Done()
|
||||||
|
// ap.Do(ctx, "SET", fmt.Sprintf("key%d", idx), idx)
|
||||||
|
// }(i)
|
||||||
|
// }
|
||||||
|
// wg.Wait()
|
||||||
|
//
|
||||||
|
// Note: AutoPipeline requires AutoPipelineConfig to be set in Options.
|
||||||
|
// If not set, default configuration will be used.
|
||||||
|
func (c *Client) AutoPipeline() *AutoPipeliner {
|
||||||
|
if c.autopipeliner != nil {
|
||||||
|
return c.autopipeliner
|
||||||
|
}
|
||||||
|
if c.opt.AutoPipelineConfig == nil {
|
||||||
|
c.opt.AutoPipelineConfig = DefaultAutoPipelineConfig()
|
||||||
|
}
|
||||||
|
return NewAutoPipeliner(c, c.opt.AutoPipelineConfig)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
|
func (c *Client) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
|
||||||
return c.TxPipeline().Pipelined(ctx, fn)
|
return c.TxPipeline().Pipelined(ctx, fn)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user