From c2d525f688e42441e65630162696c2c1bbefe007 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Tue, 28 Oct 2025 12:34:09 +0200 Subject: [PATCH] fix precision of time cache and usedAt --- internal/pool/conn.go | 11 +- internal/pool/conn_used_at_test.go | 257 +++++++++++++++++++++++++++++ 2 files changed, 263 insertions(+), 5 deletions(-) create mode 100644 internal/pool/conn_used_at_test.go diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 5ba3db41..0d18e274 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -131,8 +131,9 @@ func NewConn(netConn net.Conn) *Conn { } func NewConnWithBufferSize(netConn net.Conn, readBufSize, writeBufSize int) *Conn { + now := time.Now() cn := &Conn{ - createdAt: time.Now(), + createdAt: now, id: generateConnID(), // Generate unique ID for this connection stateMachine: NewConnStateMachine(), } @@ -154,7 +155,7 @@ func NewConnWithBufferSize(netConn net.Conn, readBufSize, writeBufSize int) *Con cn.netConnAtomic.Store(&atomicNetConn{conn: netConn}) cn.wr = proto.NewWriter(cn.bw) - cn.SetUsedAt(time.Now()) + cn.SetUsedAt(now) // Initialize handoff state atomically initialHandoffState := &HandoffState{ ShouldHandoff: false, @@ -166,12 +167,12 @@ func NewConnWithBufferSize(netConn net.Conn, readBufSize, writeBufSize int) *Con } func (cn *Conn) UsedAt() time.Time { - unix := atomic.LoadInt64(&cn.usedAt) - return time.Unix(unix, 0) + unixNano := atomic.LoadInt64(&cn.usedAt) + return time.Unix(0, unixNano) } func (cn *Conn) SetUsedAt(tm time.Time) { - atomic.StoreInt64(&cn.usedAt, tm.Unix()) + atomic.StoreInt64(&cn.usedAt, tm.UnixNano()) } // Backward-compatible wrapper methods for state machine diff --git a/internal/pool/conn_used_at_test.go b/internal/pool/conn_used_at_test.go new file mode 100644 index 00000000..74b447f2 --- /dev/null +++ b/internal/pool/conn_used_at_test.go @@ -0,0 +1,257 @@ +package pool + +import ( + "context" + "net" + "testing" + "time" + + "github.com/redis/go-redis/v9/internal/proto" +) + +// TestConn_UsedAtUpdatedOnRead verifies that usedAt is updated when reading from connection +func TestConn_UsedAtUpdatedOnRead(t *testing.T) { + // Create a mock connection + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + cn := NewConn(client) + defer cn.Close() + + // Get initial usedAt time + initialUsedAt := cn.UsedAt() + + // Wait at least 100ms to ensure time difference (usedAt has ~50ms precision from cached time) + time.Sleep(100 * time.Millisecond) + + // Simulate a read operation by calling WithReader + ctx := context.Background() + err := cn.WithReader(ctx, time.Second, func(rd *proto.Reader) error { + // Don't actually read anything, just trigger the deadline update + return nil + }) + + if err != nil { + t.Fatalf("WithReader failed: %v", err) + } + + // Get updated usedAt time + updatedUsedAt := cn.UsedAt() + + // Verify that usedAt was updated + if !updatedUsedAt.After(initialUsedAt) { + t.Errorf("Expected usedAt to be updated after read. Initial: %v, Updated: %v", + initialUsedAt, updatedUsedAt) + } + + // Verify the difference is reasonable (should be around 100ms, accounting for ~50ms cache precision) + diff := updatedUsedAt.Sub(initialUsedAt) + if diff < 50*time.Millisecond || diff > 200*time.Millisecond { + t.Errorf("Expected usedAt difference to be around 100ms (±50ms for cache), got %v", diff) + } +} + +// TestConn_UsedAtUpdatedOnWrite verifies that usedAt is updated when writing to connection +func TestConn_UsedAtUpdatedOnWrite(t *testing.T) { + // Create a mock connection + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + cn := NewConn(client) + defer cn.Close() + + // Get initial usedAt time + initialUsedAt := cn.UsedAt() + + // Wait at least 100ms to ensure time difference (usedAt has ~50ms precision from cached time) + time.Sleep(100 * time.Millisecond) + + // Simulate a write operation by calling WithWriter + ctx := context.Background() + err := cn.WithWriter(ctx, time.Second, func(wr *proto.Writer) error { + // Don't actually write anything, just trigger the deadline update + return nil + }) + + if err != nil { + t.Fatalf("WithWriter failed: %v", err) + } + + // Get updated usedAt time + updatedUsedAt := cn.UsedAt() + + // Verify that usedAt was updated + if !updatedUsedAt.After(initialUsedAt) { + t.Errorf("Expected usedAt to be updated after write. Initial: %v, Updated: %v", + initialUsedAt, updatedUsedAt) + } + + // Verify the difference is reasonable (should be around 100ms, accounting for ~50ms cache precision) + diff := updatedUsedAt.Sub(initialUsedAt) + if diff < 50*time.Millisecond || diff > 200*time.Millisecond { + t.Errorf("Expected usedAt difference to be around 100ms (±50ms for cache), got %v", diff) + } +} + +// TestConn_UsedAtUpdatedOnMultipleOperations verifies that usedAt is updated on each operation +func TestConn_UsedAtUpdatedOnMultipleOperations(t *testing.T) { + // Create a mock connection + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + cn := NewConn(client) + defer cn.Close() + + ctx := context.Background() + var previousUsedAt time.Time + + // Perform multiple operations and verify usedAt is updated each time + // Note: usedAt has ~50ms precision from cached time + for i := 0; i < 5; i++ { + currentUsedAt := cn.UsedAt() + + if i > 0 { + // Verify usedAt was updated from previous iteration + if !currentUsedAt.After(previousUsedAt) { + t.Errorf("Iteration %d: Expected usedAt to be updated. Previous: %v, Current: %v", + i, previousUsedAt, currentUsedAt) + } + } + + previousUsedAt = currentUsedAt + + // Wait at least 100ms (accounting for ~50ms cache precision) + time.Sleep(100 * time.Millisecond) + + // Perform a read operation + err := cn.WithReader(ctx, time.Second, func(rd *proto.Reader) error { + return nil + }) + if err != nil { + t.Fatalf("Iteration %d: WithReader failed: %v", i, err) + } + } + + // Verify final usedAt is significantly later than initial + finalUsedAt := cn.UsedAt() + if !finalUsedAt.After(previousUsedAt) { + t.Errorf("Expected final usedAt to be updated. Previous: %v, Final: %v", + previousUsedAt, finalUsedAt) + } +} + +// TestConn_UsedAtNotUpdatedWithoutOperation verifies that usedAt is NOT updated without operations +func TestConn_UsedAtNotUpdatedWithoutOperation(t *testing.T) { + // Create a mock connection + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + cn := NewConn(client) + defer cn.Close() + + // Get initial usedAt time + initialUsedAt := cn.UsedAt() + + // Wait without performing any operations + time.Sleep(100 * time.Millisecond) + + // Get usedAt time again + currentUsedAt := cn.UsedAt() + + // Verify that usedAt was NOT updated (should be the same) + if !currentUsedAt.Equal(initialUsedAt) { + t.Errorf("Expected usedAt to remain unchanged without operations. Initial: %v, Current: %v", + initialUsedAt, currentUsedAt) + } +} + +// TestConn_UsedAtConcurrentUpdates verifies that usedAt updates are thread-safe +func TestConn_UsedAtConcurrentUpdates(t *testing.T) { + // Create a mock connection + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + cn := NewConn(client) + defer cn.Close() + + ctx := context.Background() + const numGoroutines = 10 + const numIterations = 10 + + // Launch multiple goroutines that perform operations concurrently + done := make(chan bool, numGoroutines) + for i := 0; i < numGoroutines; i++ { + go func() { + for j := 0; j < numIterations; j++ { + // Alternate between read and write operations + if j%2 == 0 { + _ = cn.WithReader(ctx, time.Second, func(rd *proto.Reader) error { + return nil + }) + } else { + _ = cn.WithWriter(ctx, time.Second, func(wr *proto.Writer) error { + return nil + }) + } + time.Sleep(time.Millisecond) + } + done <- true + }() + } + + // Wait for all goroutines to complete + for i := 0; i < numGoroutines; i++ { + <-done + } + + // Verify that usedAt was updated (should be recent) + usedAt := cn.UsedAt() + timeSinceUsed := time.Since(usedAt) + + // Should be very recent (within last second) + if timeSinceUsed > time.Second { + t.Errorf("Expected usedAt to be recent, but it was %v ago", timeSinceUsed) + } +} + +// TestConn_UsedAtPrecision verifies that usedAt has 50ms precision (not nanosecond) +func TestConn_UsedAtPrecision(t *testing.T) { + // Create a mock connection + server, client := net.Pipe() + defer server.Close() + defer client.Close() + + cn := NewConn(client) + defer cn.Close() + + ctx := context.Background() + + // Perform an operation + err := cn.WithReader(ctx, time.Second, func(rd *proto.Reader) error { + return nil + }) + if err != nil { + t.Fatalf("WithReader failed: %v", err) + } + + // Get usedAt time + usedAt := cn.UsedAt() + + // Verify that usedAt has nanosecond precision (from the cached time which updates every 50ms) + // The value should be reasonable (not year 1970 or something) + if usedAt.Year() < 2020 { + t.Errorf("Expected usedAt to be a recent time, got %v", usedAt) + } + + // The nanoseconds might be non-zero depending on when the cache was updated + // We just verify the time is stored with full precision (not truncated to seconds) + initialNanos := usedAt.UnixNano() + if initialNanos == 0 { + t.Error("Expected usedAt to have nanosecond precision, got 0") + } +}