diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 898a274d..ad846651 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -81,7 +81,8 @@ type Conn struct { // Connection identifier for unique tracking id uint64 - usedAt int64 // atomic + usedAt atomic.Int64 + lastPutAt atomic.Int64 // Lock-free netConn access using atomic.Value // Contains *atomicNetConn wrapper, accessed atomically for better performance @@ -175,15 +176,24 @@ func NewConnWithBufferSize(netConn net.Conn, readBufSize, writeBufSize int) *Con } func (cn *Conn) UsedAt() time.Time { - unixNano := atomic.LoadInt64(&cn.usedAt) - return time.Unix(0, unixNano) + return time.Unix(0, cn.usedAt.Load()) } -func (cn *Conn) UsedAtNs() int64 { - return atomic.LoadInt64(&cn.usedAt) +func (cn *Conn) SetUsedAt(tm time.Time) { + cn.usedAt.Store(tm.UnixNano()) } -func (cn *Conn) SetUsedAt(tm time.Time) { - atomic.StoreInt64(&cn.usedAt, tm.UnixNano()) +func (cn *Conn) UsedAtNs() int64 { + return cn.usedAt.Load() +} +func (cn *Conn) SetUsedAtNs(ns int64) { + cn.usedAt.Store(ns) +} + +func (cn *Conn) LastPutAtNs() int64 { + return cn.lastPutAt.Load() +} +func (cn *Conn) SetLastPutAtNs(ns int64) { + cn.lastPutAt.Store(ns) } // Backward-compatible wrapper methods for state machine diff --git a/internal/pool/pool.go b/internal/pool/pool.go index dcb6213d..be847b1d 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -461,7 +461,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) { } // Use cached time for health checks (max 50ms staleness is acceptable) - now := time.Unix(0, getCachedTimeNs()) + nowNs := getCachedTimeNs() attempts := 0 // Lock-free atomic read - no mutex overhead! @@ -487,7 +487,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) { break } - if !p.isHealthyConn(cn, now) { + if !p.isHealthyConn(cn, nowNs) { _ = p.CloseConn(cn) continue } @@ -742,6 +742,8 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) { if shouldCloseConn { _ = p.closeConn(cn) } + + cn.SetLastPutAtNs(getCachedTimeNs()) } func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) { @@ -891,14 +893,14 @@ func (p *ConnPool) Close() error { return firstErr } -func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool { +func (p *ConnPool) isHealthyConn(cn *Conn, nowNs int64) bool { // Performance optimization: check conditions from cheapest to most expensive, // and from most likely to fail to least likely to fail. // Only fails if ConnMaxLifetime is set AND connection is old. // Most pools don't set ConnMaxLifetime, so this rarely fails. if p.cfg.ConnMaxLifetime > 0 { - if cn.expiresAt.Before(now) { + if cn.expiresAt.UnixNano() < nowNs { return false // Connection has exceeded max lifetime } } @@ -906,7 +908,7 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool { // Most pools set ConnMaxIdleTime, and idle connections are common. // Checking this first allows us to fail fast without expensive syscalls. if p.cfg.ConnMaxIdleTime > 0 { - if now.Sub(cn.UsedAt()) >= p.cfg.ConnMaxIdleTime { + if nowNs-cn.UsedAtNs() >= int64(p.cfg.ConnMaxIdleTime) { return false // Connection has been idle too long } } @@ -926,7 +928,7 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool { ) // Update timestamp for healthy connection - cn.SetUsedAt(now) + cn.SetUsedAtNs(nowNs) // Connection is healthy, client will handle notifications return true @@ -939,6 +941,6 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool { } // Only update UsedAt if connection is healthy (avoids unnecessary atomic store) - cn.SetUsedAt(now) + cn.SetUsedAtNs(nowNs) return true } diff --git a/redis.go b/redis.go index 8cd961e5..ac97c2ca 100644 --- a/redis.go +++ b/redis.go @@ -1366,11 +1366,13 @@ func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn // If the connection was health-checked within the last 5 seconds, we can skip the // expensive syscall since the health check already verified no unexpected data. // This is safe because: + // 0. lastHealthCheckNs is set in pool/conn.go:putConn() after a successful health check // 1. Health check (connCheck) uses the same syscall (Recvfrom with MSG_PEEK) // 2. If push notifications arrived, they would have been detected by health check // 3. 5 seconds is short enough that connection state is still fresh // 4. Push notifications will be processed by the next WithReader call - lastHealthCheckNs := cn.UsedAtNs() + // used it is set on getConn, so we should use another timer (lastPutAt?) + lastHealthCheckNs := cn.LastPutAtNs() if lastHealthCheckNs > 0 { // Use pool's cached time to avoid expensive time.Now() syscall nowNs := pool.GetCachedTimeNs()