1
0
mirror of https://github.com/redis/go-redis.git synced 2025-12-02 06:22:31 +03:00

hybrid approach, test agains previous commit

This commit is contained in:
Nedyalko Dyakov
2025-10-28 10:35:32 +02:00
parent 7198f47baa
commit a9640cd811

View File

@@ -27,8 +27,8 @@ type AutoPipelineConfig struct {
func DefaultAutoPipelineConfig() *AutoPipelineConfig { func DefaultAutoPipelineConfig() *AutoPipelineConfig {
return &AutoPipelineConfig{ return &AutoPipelineConfig{
MaxBatchSize: 30, MaxBatchSize: 30,
FlushInterval: 10 * time.Millisecond, FlushInterval: 10 * time.Microsecond,
MaxConcurrentBatches: 20, MaxConcurrentBatches: 30,
} }
} }
@@ -78,13 +78,12 @@ type AutoPipeliner struct {
pipeliner pipelinerClient pipeliner pipelinerClient
config *AutoPipelineConfig config *AutoPipelineConfig
// Command queue // Command queue - hybrid approach for best performance
mu sync.Mutex mu sync.Mutex
queue []*queuedCmd queue []*queuedCmd
queueLen atomic.Int32 // Fast path check without lock queueLen atomic.Int32 // Fast path check without lock
// Flush control // Flush control
flushTimer *time.Timer
flushCh chan struct{} // Signal to flush immediately flushCh chan struct{} // Signal to flush immediately
// Concurrency control // Concurrency control
@@ -111,18 +110,12 @@ func NewAutoPipeliner(pipeliner pipelinerClient, config *AutoPipelineConfig) *Au
pipeliner: pipeliner, pipeliner: pipeliner,
config: config, config: config,
queue: make([]*queuedCmd, 0, config.MaxBatchSize), queue: make([]*queuedCmd, 0, config.MaxBatchSize),
flushTimer: time.NewTimer(config.FlushInterval),
flushCh: make(chan struct{}, 1), flushCh: make(chan struct{}, 1),
sem: make(chan struct{}, config.MaxConcurrentBatches), sem: make(chan struct{}, config.MaxConcurrentBatches),
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
} }
// Stop the timer initially
if !ap.flushTimer.Stop() {
<-ap.flushTimer.C
}
// Start background flusher // Start background flusher
ap.wg.Add(1) ap.wg.Add(1)
go ap.flusher() go ap.flusher()
@@ -178,29 +171,36 @@ func (ap *AutoPipeliner) process(ctx context.Context, cmd Cmder) <-chan struct{}
done: make(chan struct{}), done: make(chan struct{}),
} }
// Fast path: try to acquire lock without blocking
if ap.mu.TryLock() {
ap.queue = append(ap.queue, qc)
queueLen := len(ap.queue)
ap.queueLen.Store(int32(queueLen))
// Trigger immediate flush if batch is full
shouldFlush := queueLen >= ap.config.MaxBatchSize
ap.mu.Unlock()
if shouldFlush {
select {
case ap.flushCh <- struct{}{}:
default:
}
}
return qc.done
}
// Slow path: lock is contended, wait for it
ap.mu.Lock() ap.mu.Lock()
ap.queue = append(ap.queue, qc) ap.queue = append(ap.queue, qc)
queueLen := len(ap.queue) queueLen := len(ap.queue)
ap.queueLen.Store(int32(queueLen)) ap.queueLen.Store(int32(queueLen))
// Check if we should flush immediately // Trigger immediate flush if batch is full
shouldFlush := queueLen >= ap.config.MaxBatchSize shouldFlush := queueLen >= ap.config.MaxBatchSize
// Start flush timer if this is the first command
if queueLen == 1 {
ap.flushTimer.Reset(ap.config.FlushInterval)
}
if queueLen > 1 {
cachedFlushInterval := ap.cachedFlushInterval.Load()
if cachedFlushInterval == 0 && ap.cachedFlushInterval.CompareAndSwap(cachedFlushInterval, 0) {
ap.config.FlushInterval = time.Duration(cachedFlushInterval) * time.Nanosecond
}
}
ap.mu.Unlock() ap.mu.Unlock()
if shouldFlush { if shouldFlush {
// Signal immediate flush (non-blocking)
select { select {
case ap.flushCh <- struct{}{}: case ap.flushCh <- struct{}{}:
default: default:
@@ -251,6 +251,17 @@ func (ap *AutoPipeliner) Close() error {
func (ap *AutoPipeliner) flusher() { func (ap *AutoPipeliner) flusher() {
defer ap.wg.Done() defer ap.wg.Done()
// Adaptive delays:
// - Single command: flush almost immediately (1ns) to minimize latency
// - Multiple commands: wait a bit (10µs) to allow batching
const singleCmdDelay = 1 * time.Nanosecond
const batchDelay = 10 * time.Microsecond
// Start with batch delay
timer := time.NewTimer(batchDelay)
defer timer.Stop()
currentDelay := batchDelay
for { for {
select { select {
case <-ap.ctx.Done(): case <-ap.ctx.Done():
@@ -258,21 +269,52 @@ func (ap *AutoPipeliner) flusher() {
ap.flushBatch() ap.flushBatch()
return return
case <-ap.flushTimer.C:
// Timer expired, flush if we have commands
if ap.queueLen.Load() > 0 {
ap.flushBatch()
}
// Reset timer for next interval if queue is not empty
ap.mu.Lock()
if len(ap.queue) > 0 {
ap.flushTimer.Reset(ap.config.FlushInterval)
}
ap.mu.Unlock()
case <-ap.flushCh: case <-ap.flushCh:
// Immediate flush requested // Immediate flush requested (batch full)
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
ap.flushBatch() ap.flushBatch()
// Reset timer based on remaining queue
qLen := ap.queueLen.Load()
if qLen == 1 {
currentDelay = singleCmdDelay
} else {
currentDelay = batchDelay
}
timer.Reset(currentDelay)
case <-timer.C:
qLen := ap.queueLen.Load()
if qLen > 0 {
ap.flushBatch()
}
// Adaptive delay based on queue size after flush
qLen = ap.queueLen.Load()
var nextDelay time.Duration
if qLen == 1 {
// Single command waiting - flush very quickly
nextDelay = singleCmdDelay
} else if qLen > 1 {
// Multiple commands - use batch delay to accumulate more
nextDelay = batchDelay
} else {
// Empty queue - use batch delay
nextDelay = batchDelay
}
// Only reset timer if delay changed
if nextDelay != currentDelay {
currentDelay = nextDelay
timer.Reset(nextDelay)
} else {
timer.Reset(currentDelay)
}
} }
} }
} }
@@ -291,15 +333,6 @@ func (ap *AutoPipeliner) flushBatch() {
queuedCmds := ap.queue queuedCmds := ap.queue
ap.queue = make([]*queuedCmd, 0, ap.config.MaxBatchSize) ap.queue = make([]*queuedCmd, 0, ap.config.MaxBatchSize)
ap.queueLen.Store(0) ap.queueLen.Store(0)
// Stop timer
if !ap.flushTimer.Stop() {
select {
case <-ap.flushTimer.C:
default:
}
}
ap.mu.Unlock() ap.mu.Unlock()
// Acquire semaphore (limit concurrent batches) // Acquire semaphore (limit concurrent batches)
@@ -320,20 +353,15 @@ func (ap *AutoPipeliner) flushBatch() {
if len(queuedCmds) == 0 { if len(queuedCmds) == 0 {
return return
} }
// Fast path for single command
if len(queuedCmds) == 1 { if len(queuedCmds) == 1 {
if ap.cachedFlushInterval.CompareAndSwap(0, ap.config.FlushInterval.Nanoseconds()) {
ap.config.FlushInterval = time.Nanosecond
}
_ = ap.pipeliner.Process(context.Background(), queuedCmds[0].cmd) _ = ap.pipeliner.Process(context.Background(), queuedCmds[0].cmd)
close(queuedCmds[0].done) close(queuedCmds[0].done)
return return
} }
cachedFlushInterval := ap.cachedFlushInterval.Load() // Execute pipeline for multiple commands
if cachedFlushInterval != 0 && ap.cachedFlushInterval.CompareAndSwap(cachedFlushInterval, 0) {
ap.config.FlushInterval = time.Duration(ap.cachedFlushInterval.Load()) * time.Nanosecond
}
// Execute pipeline
pipe := ap.pipeliner.Pipeline() pipe := ap.pipeliner.Pipeline()
for _, qc := range queuedCmds { for _, qc := range queuedCmds {
_ = pipe.Process(context.Background(), qc.cmd) _ = pipe.Process(context.Background(), qc.cmd)