mirror of
https://github.com/redis/go-redis.git
synced 2025-12-02 06:22:31 +03:00
wip, remove ring
This commit is contained in:
179
autopipeline.go
179
autopipeline.go
@@ -20,21 +20,6 @@ type AutoPipelineConfig struct {
|
|||||||
// Default: 10
|
// Default: 10
|
||||||
MaxConcurrentBatches int
|
MaxConcurrentBatches int
|
||||||
|
|
||||||
// UseRingBuffer enables the high-performance ring buffer queue.
|
|
||||||
// When enabled, uses a pre-allocated ring buffer with lock-free enqueue
|
|
||||||
// instead of the slice-based queue. This provides:
|
|
||||||
// - 6x faster enqueue operations
|
|
||||||
// - 100% reduction in allocations during enqueue
|
|
||||||
// - Better performance under high concurrency
|
|
||||||
// Default: true (enabled)
|
|
||||||
UseRingBuffer bool
|
|
||||||
|
|
||||||
// RingBufferSize is the size of the ring buffer queue.
|
|
||||||
// Only used when UseRingBuffer is true.
|
|
||||||
// Must be a power of 2 for optimal performance (will be rounded up if not).
|
|
||||||
// Default: 1024
|
|
||||||
RingBufferSize int
|
|
||||||
|
|
||||||
// MaxFlushDelay is the maximum delay after flushing before checking for more commands.
|
// MaxFlushDelay is the maximum delay after flushing before checking for more commands.
|
||||||
// A small delay (e.g., 100μs) can significantly reduce CPU usage by allowing
|
// A small delay (e.g., 100μs) can significantly reduce CPU usage by allowing
|
||||||
// more commands to batch together, at the cost of slightly higher latency.
|
// more commands to batch together, at the cost of slightly higher latency.
|
||||||
@@ -55,8 +40,6 @@ func DefaultAutoPipelineConfig() *AutoPipelineConfig {
|
|||||||
return &AutoPipelineConfig{
|
return &AutoPipelineConfig{
|
||||||
MaxBatchSize: 50,
|
MaxBatchSize: 50,
|
||||||
MaxConcurrentBatches: 10,
|
MaxConcurrentBatches: 10,
|
||||||
UseRingBuffer: true, // Enable ring buffer by default
|
|
||||||
RingBufferSize: 1024,
|
|
||||||
MaxFlushDelay: 0, // No delay by default (lowest latency)
|
MaxFlushDelay: 0, // No delay by default (lowest latency)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -184,11 +167,10 @@ type AutoPipeliner struct {
|
|||||||
pipeliner cmdableClient
|
pipeliner cmdableClient
|
||||||
config *AutoPipelineConfig
|
config *AutoPipelineConfig
|
||||||
|
|
||||||
// Command queue - either slice-based or ring buffer
|
// Command queue
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
queue []*queuedCmd // Slice-based queue (legacy)
|
queue []*queuedCmd // Slice-based queue
|
||||||
ring *autoPipelineRing // Ring buffer queue (high-performance)
|
queueLen atomic.Int32 // Fast path check without lock
|
||||||
queueLen atomic.Int32 // Fast path check without lock
|
|
||||||
|
|
||||||
// Flush control
|
// Flush control
|
||||||
flushCh chan struct{} // Signal to flush immediately
|
flushCh chan struct{} // Signal to flush immediately
|
||||||
@@ -233,12 +215,8 @@ func NewAutoPipeliner(pipeliner cmdableClient, config *AutoPipelineConfig) *Auto
|
|||||||
// Initialize cmdable to route all commands through processAndBlock
|
// Initialize cmdable to route all commands through processAndBlock
|
||||||
ap.cmdable = ap.processAndBlock
|
ap.cmdable = ap.processAndBlock
|
||||||
|
|
||||||
// Initialize queue based on configuration
|
// Initialize queue
|
||||||
if config.UseRingBuffer {
|
ap.queue = getQueueSlice(config.MaxBatchSize)
|
||||||
ap.ring = newAutoPipelineRing(config.RingBufferSize)
|
|
||||||
} else {
|
|
||||||
ap.queue = getQueueSlice(config.MaxBatchSize)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start background flusher
|
// Start background flusher
|
||||||
ap.wg.Add(1)
|
ap.wg.Add(1)
|
||||||
@@ -314,12 +292,6 @@ var closedQueuedCmd = &queuedCmd{
|
|||||||
done: closedChan,
|
done: closedChan,
|
||||||
}
|
}
|
||||||
|
|
||||||
// ringQueuedCmd is a queuedCmd wrapper for ring buffer (not pooled)
|
|
||||||
type ringQueuedCmd struct {
|
|
||||||
cmd Cmder
|
|
||||||
done <-chan struct{}
|
|
||||||
}
|
|
||||||
|
|
||||||
// processWithQueuedCmd is the internal method that queues a command and returns the queuedCmd.
|
// processWithQueuedCmd is the internal method that queues a command and returns the queuedCmd.
|
||||||
// The caller is responsible for returning the queuedCmd to the pool after use.
|
// The caller is responsible for returning the queuedCmd to the pool after use.
|
||||||
func (ap *AutoPipeliner) processWithQueuedCmd(ctx context.Context, cmd Cmder) *queuedCmd {
|
func (ap *AutoPipeliner) processWithQueuedCmd(ctx context.Context, cmd Cmder) *queuedCmd {
|
||||||
@@ -328,19 +300,6 @@ func (ap *AutoPipeliner) processWithQueuedCmd(ctx context.Context, cmd Cmder) *q
|
|||||||
return closedQueuedCmd
|
return closedQueuedCmd
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use ring buffer if enabled
|
|
||||||
if ap.config.UseRingBuffer {
|
|
||||||
done := ap.ring.putOne(cmd)
|
|
||||||
// putOne will signal the flusher via condition variable if needed
|
|
||||||
// For ring buffer, we create a simple wrapper (not pooled)
|
|
||||||
// The done channel is managed by the ring buffer
|
|
||||||
return &queuedCmd{
|
|
||||||
cmd: cmd,
|
|
||||||
done: done,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Legacy slice-based queue
|
|
||||||
// Get queued command from pool
|
// Get queued command from pool
|
||||||
qc := getQueuedCmd(cmd)
|
qc := getQueuedCmd(cmd)
|
||||||
|
|
||||||
@@ -388,11 +347,6 @@ func (ap *AutoPipeliner) Close() error {
|
|||||||
// Cancel context to stop flusher
|
// Cancel context to stop flusher
|
||||||
ap.cancel()
|
ap.cancel()
|
||||||
|
|
||||||
// Wake up flusher if it's waiting
|
|
||||||
if ap.config.UseRingBuffer {
|
|
||||||
ap.ring.wakeAll()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for flusher to finish
|
// Wait for flusher to finish
|
||||||
ap.wg.Wait()
|
ap.wg.Wait()
|
||||||
|
|
||||||
@@ -402,19 +356,6 @@ func (ap *AutoPipeliner) Close() error {
|
|||||||
// flusher is the background goroutine that flushes batches.
|
// flusher is the background goroutine that flushes batches.
|
||||||
func (ap *AutoPipeliner) flusher() {
|
func (ap *AutoPipeliner) flusher() {
|
||||||
defer ap.wg.Done()
|
defer ap.wg.Done()
|
||||||
|
|
||||||
if !ap.config.UseRingBuffer {
|
|
||||||
// Legacy slice-based flusher
|
|
||||||
ap.flusherSlice()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ring buffer flusher
|
|
||||||
ap.flusherRing()
|
|
||||||
}
|
|
||||||
|
|
||||||
// flusherSlice is the legacy slice-based flusher.
|
|
||||||
func (ap *AutoPipeliner) flusherSlice() {
|
|
||||||
for {
|
for {
|
||||||
// Wait for a command to arrive
|
// Wait for a command to arrive
|
||||||
select {
|
select {
|
||||||
@@ -457,122 +398,21 @@ func (ap *AutoPipeliner) flusherSlice() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// flusherRing is the ring buffer flusher.
|
// flushBatchSlice flushes commands from the slice-based queue.
|
||||||
func (ap *AutoPipeliner) flusherRing() {
|
|
||||||
var (
|
|
||||||
cmds = make([]Cmder, 0, ap.config.MaxBatchSize)
|
|
||||||
doneChans = make([]chan struct{}, 0, ap.config.MaxBatchSize)
|
|
||||||
positions = make([]uint32, 0, ap.config.MaxBatchSize)
|
|
||||||
)
|
|
||||||
|
|
||||||
for {
|
|
||||||
// Try to get next command (non-blocking)
|
|
||||||
cmd, done, pos := ap.ring.nextWriteCmd()
|
|
||||||
|
|
||||||
if cmd == nil {
|
|
||||||
// No command available
|
|
||||||
// If we have buffered commands, execute them first
|
|
||||||
if len(cmds) > 0 {
|
|
||||||
ap.executeBatch(cmds, doneChans, positions)
|
|
||||||
cmds = cmds[:0]
|
|
||||||
doneChans = doneChans[:0]
|
|
||||||
positions = positions[:0]
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for shutdown before blocking
|
|
||||||
select {
|
|
||||||
case <-ap.ctx.Done():
|
|
||||||
return
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for next command (blocking)
|
|
||||||
// This will be woken up by wakeAll() during shutdown
|
|
||||||
cmd, done, pos = ap.ring.waitForWrite()
|
|
||||||
|
|
||||||
// If nil, ring is closed
|
|
||||||
if cmd == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add command to batch
|
|
||||||
cmds = append(cmds, cmd)
|
|
||||||
doneChans = append(doneChans, done)
|
|
||||||
positions = append(positions, pos)
|
|
||||||
|
|
||||||
// Execute batch if full
|
|
||||||
if len(cmds) >= ap.config.MaxBatchSize {
|
|
||||||
ap.executeBatch(cmds, doneChans, positions)
|
|
||||||
cmds = cmds[:0]
|
|
||||||
doneChans = doneChans[:0]
|
|
||||||
positions = positions[:0]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// executeBatch executes a batch of commands.
|
|
||||||
func (ap *AutoPipeliner) executeBatch(cmds []Cmder, doneChans []chan struct{}, positions []uint32) {
|
|
||||||
if len(cmds) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Acquire semaphore (limit concurrent batches)
|
|
||||||
// Try fast path first
|
|
||||||
if !ap.sem.TryAcquire() {
|
|
||||||
// Fast path failed, need to wait
|
|
||||||
err := ap.sem.Acquire(ap.ctx, 5*time.Second, context.DeadlineExceeded)
|
|
||||||
if err != nil {
|
|
||||||
// Context cancelled, set error on all commands and notify
|
|
||||||
for i, cmd := range cmds {
|
|
||||||
cmd.SetErr(ErrClosed)
|
|
||||||
doneChans[i] <- struct{}{} // Send signal instead of close
|
|
||||||
ap.ring.finishCmd(positions[i])
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fast path for single command
|
|
||||||
if len(cmds) == 1 {
|
|
||||||
_ = ap.pipeliner.Process(context.Background(), cmds[0])
|
|
||||||
doneChans[0] <- struct{}{} // Send signal instead of close
|
|
||||||
ap.ring.finishCmd(positions[0])
|
|
||||||
ap.sem.Release()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
// Execute pipeline for multiple commands
|
|
||||||
pipe := ap.pipeliner.Pipeline()
|
|
||||||
for _, cmd := range cmds {
|
|
||||||
_ = pipe.Process(context.Background(), cmd)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Execute and wait for completion
|
|
||||||
_, _ = pipe.Exec(context.Background())
|
|
||||||
|
|
||||||
// Notify completion and finish slots
|
|
||||||
for i, done := range doneChans {
|
|
||||||
done <- struct{}{} // Send signal instead of close
|
|
||||||
ap.ring.finishCmd(positions[i])
|
|
||||||
}
|
|
||||||
ap.sem.Release()
|
|
||||||
}
|
|
||||||
|
|
||||||
// flushBatchSlice flushes commands from the slice-based queue (legacy).
|
|
||||||
func (ap *AutoPipeliner) flushBatchSlice() {
|
func (ap *AutoPipeliner) flushBatchSlice() {
|
||||||
// Get commands from queue
|
// Get commands from queue
|
||||||
ap.mu.Lock()
|
ap.mu.Lock()
|
||||||
if len(ap.queue) == 0 {
|
if len(ap.queue) == 0 {
|
||||||
ap.queueLen.Store(0)
|
|
||||||
ap.mu.Unlock()
|
ap.mu.Unlock()
|
||||||
|
ap.queueLen.Store(0)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Take ownership of current queue
|
// Take ownership of current queue
|
||||||
queuedCmds := ap.queue
|
queuedCmds := ap.queue
|
||||||
ap.queue = getQueueSlice(ap.config.MaxBatchSize)
|
ap.queue = getQueueSlice(ap.config.MaxBatchSize)
|
||||||
ap.queueLen.Store(0)
|
|
||||||
ap.mu.Unlock()
|
ap.mu.Unlock()
|
||||||
|
ap.queueLen.Store(0)
|
||||||
|
|
||||||
// Acquire semaphore (limit concurrent batches)
|
// Acquire semaphore (limit concurrent batches)
|
||||||
// Try fast path first
|
// Try fast path first
|
||||||
@@ -630,9 +470,6 @@ func (ap *AutoPipeliner) flushBatchSlice() {
|
|||||||
|
|
||||||
// Len returns the current number of queued commands.
|
// Len returns the current number of queued commands.
|
||||||
func (ap *AutoPipeliner) Len() int {
|
func (ap *AutoPipeliner) Len() int {
|
||||||
if ap.config.UseRingBuffer {
|
|
||||||
return ap.ring.len()
|
|
||||||
}
|
|
||||||
return int(ap.queueLen.Load())
|
return int(ap.queueLen.Load())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,244 +0,0 @@
|
|||||||
package redis
|
|
||||||
|
|
||||||
import (
|
|
||||||
"math/bits"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
)
|
|
||||||
|
|
||||||
// autoPipelineRing is a pre-allocated ring buffer queue for autopipelining.
|
|
||||||
// It provides lock-free enqueue and FIFO ordering guarantees.
|
|
||||||
//
|
|
||||||
// Ring buffer architecture:
|
|
||||||
// - Pre-allocated slots (no allocations during enqueue)
|
|
||||||
// - Per-slot channels for request-response matching
|
|
||||||
// - Atomic write pointer for lock-free enqueue
|
|
||||||
// - Separate read pointers for write and read goroutines
|
|
||||||
//
|
|
||||||
// The ring buffer uses three pointers:
|
|
||||||
// - write: Where app goroutines add commands (atomic increment)
|
|
||||||
// - read1: Where flush goroutine reads commands to send
|
|
||||||
// - read2: Where result goroutine matches responses (currently unused, for future optimization)
|
|
||||||
type autoPipelineRing struct {
|
|
||||||
store []autoPipelineSlot // Pre-allocated slots
|
|
||||||
mask uint32 // Size - 1 (for fast modulo via bitwise AND)
|
|
||||||
write uint32 // Write position (atomic, incremented by app goroutines)
|
|
||||||
read1 uint32 // Read position for flush goroutine
|
|
||||||
read2 uint32 // Read position for result matching (reserved for future use)
|
|
||||||
cmds []Cmder // Persistent buffer for collecting commands (reused, no allocations)
|
|
||||||
doneChans []chan struct{} // Persistent buffer for collecting done channels (reused, no allocations)
|
|
||||||
}
|
|
||||||
|
|
||||||
// autoPipelineSlot represents a single command slot in the ring buffer.
|
|
||||||
type autoPipelineSlot struct {
|
|
||||||
c1 *sync.Cond // Condition variable for write synchronization (shared mutex with c2)
|
|
||||||
c2 *sync.Cond // Condition variable for wait/signal (shared mutex with c1)
|
|
||||||
cmd Cmder // The command to execute
|
|
||||||
done chan struct{} // Completion notification channel (pre-allocated, reused)
|
|
||||||
mark uint32 // State: 0=empty, 1=queued, 2=sent (atomic)
|
|
||||||
slept bool // Whether writer goroutine is sleeping on this slot
|
|
||||||
}
|
|
||||||
|
|
||||||
// State constants for autoPipelineSlot.mark
|
|
||||||
const (
|
|
||||||
apSlotEmpty uint32 = 0 // Slot is empty and available
|
|
||||||
apSlotQueued uint32 = 1 // Command queued, ready to be sent
|
|
||||||
apSlotSent uint32 = 2 // Command sent, waiting for response
|
|
||||||
apSlotClosed uint32 = 3 // Ring is closed, stop waiting
|
|
||||||
)
|
|
||||||
|
|
||||||
// newAutoPipelineRing creates a new ring buffer with the specified size.
|
|
||||||
// Size will be rounded up to the next power of 2 for efficient modulo operations.
|
|
||||||
func newAutoPipelineRing(size int) *autoPipelineRing {
|
|
||||||
// Round up to power of 2 for fast modulo via bitwise AND
|
|
||||||
if size <= 0 {
|
|
||||||
size = 1024 // Default size
|
|
||||||
}
|
|
||||||
if size&(size-1) != 0 {
|
|
||||||
// Not a power of 2, round up
|
|
||||||
size = 1 << (32 - bits.LeadingZeros32(uint32(size)))
|
|
||||||
}
|
|
||||||
|
|
||||||
r := &autoPipelineRing{
|
|
||||||
store: make([]autoPipelineSlot, size),
|
|
||||||
mask: uint32(size - 1),
|
|
||||||
read1: ^uint32(0), // Start at -1 (0xFFFFFFFF) so first increment gives 0
|
|
||||||
read2: ^uint32(0), // Start at -1 (0xFFFFFFFF) so first increment gives 0
|
|
||||||
cmds: make([]Cmder, 0, size), // Persistent buffer, reused
|
|
||||||
doneChans: make([]chan struct{}, 0, size), // Persistent buffer, reused
|
|
||||||
}
|
|
||||||
|
|
||||||
// Initialize each slot with condition variables and pre-allocated channel
|
|
||||||
for i := range r.store {
|
|
||||||
m := &sync.Mutex{}
|
|
||||||
r.store[i].c1 = sync.NewCond(m)
|
|
||||||
r.store[i].c2 = sync.NewCond(m) // Share the same mutex
|
|
||||||
r.store[i].done = make(chan struct{}, 1) // Buffered channel for signal (not close)
|
|
||||||
}
|
|
||||||
|
|
||||||
return r
|
|
||||||
}
|
|
||||||
|
|
||||||
// putOne enqueues a command into the ring buffer.
|
|
||||||
// Returns the done channel that will be signaled when the command completes.
|
|
||||||
//
|
|
||||||
// Ring buffer enqueue implementation:
|
|
||||||
// - Atomic increment for write position
|
|
||||||
// - Wait on condition variable if slot is full
|
|
||||||
// - Signal reader if it's sleeping
|
|
||||||
func (r *autoPipelineRing) putOne(cmd Cmder) chan struct{} {
|
|
||||||
// Atomic increment to get next slot
|
|
||||||
// AddUint32 returns the NEW value (after increment), so subtract 1 to get the slot we own
|
|
||||||
pos := atomic.AddUint32(&r.write, 1) - 1
|
|
||||||
slot := &r.store[pos&r.mask]
|
|
||||||
|
|
||||||
// Lock the slot
|
|
||||||
slot.c1.L.Lock()
|
|
||||||
|
|
||||||
// Wait if slot is not empty (mark != 0)
|
|
||||||
for slot.mark != 0 {
|
|
||||||
slot.c1.Wait()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Store command and mark as queued
|
|
||||||
slot.cmd = cmd
|
|
||||||
slot.mark = 1
|
|
||||||
s := slot.slept
|
|
||||||
|
|
||||||
slot.c1.L.Unlock()
|
|
||||||
|
|
||||||
// If reader is sleeping, wake it up
|
|
||||||
if s {
|
|
||||||
slot.c2.Broadcast()
|
|
||||||
}
|
|
||||||
|
|
||||||
return slot.done
|
|
||||||
}
|
|
||||||
|
|
||||||
// nextWriteCmd tries to get the next command (non-blocking).
|
|
||||||
// Returns nil if no command is available.
|
|
||||||
// Should only be called by the flush goroutine.
|
|
||||||
// Returns: cmd, done channel, position (for finishCmd)
|
|
||||||
func (r *autoPipelineRing) nextWriteCmd() (Cmder, chan struct{}, uint32) {
|
|
||||||
r.read1++
|
|
||||||
pos := r.read1
|
|
||||||
p := pos & r.mask
|
|
||||||
slot := &r.store[p]
|
|
||||||
|
|
||||||
slot.c1.L.Lock()
|
|
||||||
if slot.mark == 1 {
|
|
||||||
cmd := slot.cmd
|
|
||||||
done := slot.done
|
|
||||||
slot.mark = 2
|
|
||||||
slot.c1.L.Unlock()
|
|
||||||
return cmd, done, pos
|
|
||||||
}
|
|
||||||
// No command available, rollback read position
|
|
||||||
r.read1--
|
|
||||||
slot.c1.L.Unlock()
|
|
||||||
return nil, nil, 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// waitForWrite waits for the next command (blocking).
|
|
||||||
// Should only be called by the flush goroutine.
|
|
||||||
// Returns nil if the ring is closed.
|
|
||||||
// Returns: cmd, done channel, position (for finishCmd)
|
|
||||||
func (r *autoPipelineRing) waitForWrite() (Cmder, chan struct{}, uint32) {
|
|
||||||
r.read1++
|
|
||||||
pos := r.read1
|
|
||||||
p := pos & r.mask
|
|
||||||
slot := &r.store[p]
|
|
||||||
|
|
||||||
slot.c1.L.Lock()
|
|
||||||
// Wait until command is available (mark == 1) or closed (mark == 3)
|
|
||||||
for slot.mark != 1 && slot.mark != apSlotClosed {
|
|
||||||
slot.slept = true
|
|
||||||
slot.c2.Wait() // c1 and c2 share the same mutex
|
|
||||||
slot.slept = false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if closed
|
|
||||||
if slot.mark == apSlotClosed {
|
|
||||||
r.read1-- // Rollback read position
|
|
||||||
slot.c1.L.Unlock()
|
|
||||||
return nil, nil, 0
|
|
||||||
}
|
|
||||||
|
|
||||||
cmd := slot.cmd
|
|
||||||
done := slot.done
|
|
||||||
slot.mark = 2
|
|
||||||
slot.c1.L.Unlock()
|
|
||||||
return cmd, done, pos
|
|
||||||
}
|
|
||||||
|
|
||||||
// finishCmd marks a command as completed and clears the slot.
|
|
||||||
// Should only be called by the flush goroutine.
|
|
||||||
// The position parameter should be the position returned by nextWriteCmd/waitForWrite.
|
|
||||||
func (r *autoPipelineRing) finishCmd(position uint32) {
|
|
||||||
p := position & r.mask
|
|
||||||
slot := &r.store[p]
|
|
||||||
|
|
||||||
slot.c1.L.Lock()
|
|
||||||
if slot.mark == 2 {
|
|
||||||
// Drain the done channel before reusing
|
|
||||||
select {
|
|
||||||
case <-slot.done:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clear slot for reuse
|
|
||||||
slot.cmd = nil
|
|
||||||
slot.mark = 0
|
|
||||||
}
|
|
||||||
slot.c1.L.Unlock()
|
|
||||||
slot.c1.Signal() // Wake up any writer waiting on this slot
|
|
||||||
}
|
|
||||||
|
|
||||||
// len returns the approximate number of queued commands.
|
|
||||||
// This is an estimate and may not be exact due to concurrent access.
|
|
||||||
func (r *autoPipelineRing) len() int {
|
|
||||||
write := atomic.LoadUint32(&r.write)
|
|
||||||
read := atomic.LoadUint32(&r.read1)
|
|
||||||
|
|
||||||
// Handle wrap-around
|
|
||||||
if write >= read {
|
|
||||||
return int(write - read)
|
|
||||||
}
|
|
||||||
// Wrapped around
|
|
||||||
return int(write + (^uint32(0) - read) + 1)
|
|
||||||
}
|
|
||||||
|
|
||||||
// cap returns the capacity of the ring buffer.
|
|
||||||
func (r *autoPipelineRing) cap() int {
|
|
||||||
return len(r.store)
|
|
||||||
}
|
|
||||||
|
|
||||||
// reset resets the ring buffer to empty state.
|
|
||||||
// This should only be called when no goroutines are accessing the ring.
|
|
||||||
func (r *autoPipelineRing) reset() {
|
|
||||||
atomic.StoreUint32(&r.write, 0)
|
|
||||||
atomic.StoreUint32(&r.read1, 0)
|
|
||||||
atomic.StoreUint32(&r.read2, 0)
|
|
||||||
|
|
||||||
for i := range r.store {
|
|
||||||
r.store[i].c1.L.Lock()
|
|
||||||
r.store[i].cmd = nil
|
|
||||||
r.store[i].mark = 0
|
|
||||||
r.store[i].slept = false
|
|
||||||
r.store[i].c1.L.Unlock()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// wakeAll wakes up all waiting goroutines.
|
|
||||||
// This is used during shutdown to unblock the flusher.
|
|
||||||
func (r *autoPipelineRing) wakeAll() {
|
|
||||||
for i := range r.store {
|
|
||||||
r.store[i].c1.L.Lock()
|
|
||||||
if r.store[i].mark == 0 {
|
|
||||||
r.store[i].mark = apSlotClosed
|
|
||||||
}
|
|
||||||
r.store[i].c1.L.Unlock()
|
|
||||||
r.store[i].c2.Broadcast()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@@ -78,8 +78,6 @@ func TestPipelineBufferSizesWithAutoPipeline(t *testing.T) {
|
|||||||
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
AutoPipelineConfig: &redis.AutoPipelineConfig{
|
||||||
MaxBatchSize: 10,
|
MaxBatchSize: 10,
|
||||||
MaxConcurrentBatches: 2,
|
MaxConcurrentBatches: 2,
|
||||||
UseRingBuffer: true,
|
|
||||||
RingBufferSize: 64,
|
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
|||||||
Reference in New Issue
Block a user