mirror of
https://github.com/redis/go-redis.git
synced 2025-12-02 06:22:31 +03:00
use correct timer for last health check
This commit is contained in:
@@ -81,7 +81,8 @@ type Conn struct {
|
|||||||
// Connection identifier for unique tracking
|
// Connection identifier for unique tracking
|
||||||
id uint64
|
id uint64
|
||||||
|
|
||||||
usedAt int64 // atomic
|
usedAt atomic.Int64
|
||||||
|
lastPutAt atomic.Int64
|
||||||
|
|
||||||
// Lock-free netConn access using atomic.Value
|
// Lock-free netConn access using atomic.Value
|
||||||
// Contains *atomicNetConn wrapper, accessed atomically for better performance
|
// Contains *atomicNetConn wrapper, accessed atomically for better performance
|
||||||
@@ -175,15 +176,24 @@ func NewConnWithBufferSize(netConn net.Conn, readBufSize, writeBufSize int) *Con
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (cn *Conn) UsedAt() time.Time {
|
func (cn *Conn) UsedAt() time.Time {
|
||||||
unixNano := atomic.LoadInt64(&cn.usedAt)
|
return time.Unix(0, cn.usedAt.Load())
|
||||||
return time.Unix(0, unixNano)
|
|
||||||
}
|
}
|
||||||
func (cn *Conn) UsedAtNs() int64 {
|
func (cn *Conn) SetUsedAt(tm time.Time) {
|
||||||
return atomic.LoadInt64(&cn.usedAt)
|
cn.usedAt.Store(tm.UnixNano())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cn *Conn) SetUsedAt(tm time.Time) {
|
func (cn *Conn) UsedAtNs() int64 {
|
||||||
atomic.StoreInt64(&cn.usedAt, tm.UnixNano())
|
return cn.usedAt.Load()
|
||||||
|
}
|
||||||
|
func (cn *Conn) SetUsedAtNs(ns int64) {
|
||||||
|
cn.usedAt.Store(ns)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cn *Conn) LastPutAtNs() int64 {
|
||||||
|
return cn.lastPutAt.Load()
|
||||||
|
}
|
||||||
|
func (cn *Conn) SetLastPutAtNs(ns int64) {
|
||||||
|
cn.lastPutAt.Store(ns)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Backward-compatible wrapper methods for state machine
|
// Backward-compatible wrapper methods for state machine
|
||||||
|
|||||||
@@ -461,7 +461,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Use cached time for health checks (max 50ms staleness is acceptable)
|
// Use cached time for health checks (max 50ms staleness is acceptable)
|
||||||
now := time.Unix(0, getCachedTimeNs())
|
nowNs := getCachedTimeNs()
|
||||||
attempts := 0
|
attempts := 0
|
||||||
|
|
||||||
// Lock-free atomic read - no mutex overhead!
|
// Lock-free atomic read - no mutex overhead!
|
||||||
@@ -487,7 +487,7 @@ func (p *ConnPool) getConn(ctx context.Context) (*Conn, error) {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
if !p.isHealthyConn(cn, now) {
|
if !p.isHealthyConn(cn, nowNs) {
|
||||||
_ = p.CloseConn(cn)
|
_ = p.CloseConn(cn)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -742,6 +742,8 @@ func (p *ConnPool) putConn(ctx context.Context, cn *Conn, freeTurn bool) {
|
|||||||
if shouldCloseConn {
|
if shouldCloseConn {
|
||||||
_ = p.closeConn(cn)
|
_ = p.closeConn(cn)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cn.SetLastPutAtNs(getCachedTimeNs())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) {
|
func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) {
|
||||||
@@ -891,14 +893,14 @@ func (p *ConnPool) Close() error {
|
|||||||
return firstErr
|
return firstErr
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool {
|
func (p *ConnPool) isHealthyConn(cn *Conn, nowNs int64) bool {
|
||||||
// Performance optimization: check conditions from cheapest to most expensive,
|
// Performance optimization: check conditions from cheapest to most expensive,
|
||||||
// and from most likely to fail to least likely to fail.
|
// and from most likely to fail to least likely to fail.
|
||||||
|
|
||||||
// Only fails if ConnMaxLifetime is set AND connection is old.
|
// Only fails if ConnMaxLifetime is set AND connection is old.
|
||||||
// Most pools don't set ConnMaxLifetime, so this rarely fails.
|
// Most pools don't set ConnMaxLifetime, so this rarely fails.
|
||||||
if p.cfg.ConnMaxLifetime > 0 {
|
if p.cfg.ConnMaxLifetime > 0 {
|
||||||
if cn.expiresAt.Before(now) {
|
if cn.expiresAt.UnixNano() < nowNs {
|
||||||
return false // Connection has exceeded max lifetime
|
return false // Connection has exceeded max lifetime
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -906,7 +908,7 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool {
|
|||||||
// Most pools set ConnMaxIdleTime, and idle connections are common.
|
// Most pools set ConnMaxIdleTime, and idle connections are common.
|
||||||
// Checking this first allows us to fail fast without expensive syscalls.
|
// Checking this first allows us to fail fast without expensive syscalls.
|
||||||
if p.cfg.ConnMaxIdleTime > 0 {
|
if p.cfg.ConnMaxIdleTime > 0 {
|
||||||
if now.Sub(cn.UsedAt()) >= p.cfg.ConnMaxIdleTime {
|
if nowNs-cn.UsedAtNs() >= int64(p.cfg.ConnMaxIdleTime) {
|
||||||
return false // Connection has been idle too long
|
return false // Connection has been idle too long
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -926,7 +928,7 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool {
|
|||||||
)
|
)
|
||||||
|
|
||||||
// Update timestamp for healthy connection
|
// Update timestamp for healthy connection
|
||||||
cn.SetUsedAt(now)
|
cn.SetUsedAtNs(nowNs)
|
||||||
|
|
||||||
// Connection is healthy, client will handle notifications
|
// Connection is healthy, client will handle notifications
|
||||||
return true
|
return true
|
||||||
@@ -939,6 +941,6 @@ func (p *ConnPool) isHealthyConn(cn *Conn, now time.Time) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Only update UsedAt if connection is healthy (avoids unnecessary atomic store)
|
// Only update UsedAt if connection is healthy (avoids unnecessary atomic store)
|
||||||
cn.SetUsedAt(now)
|
cn.SetUsedAtNs(nowNs)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|||||||
4
redis.go
4
redis.go
@@ -1366,11 +1366,13 @@ func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn
|
|||||||
// If the connection was health-checked within the last 5 seconds, we can skip the
|
// If the connection was health-checked within the last 5 seconds, we can skip the
|
||||||
// expensive syscall since the health check already verified no unexpected data.
|
// expensive syscall since the health check already verified no unexpected data.
|
||||||
// This is safe because:
|
// This is safe because:
|
||||||
|
// 0. lastHealthCheckNs is set in pool/conn.go:putConn() after a successful health check
|
||||||
// 1. Health check (connCheck) uses the same syscall (Recvfrom with MSG_PEEK)
|
// 1. Health check (connCheck) uses the same syscall (Recvfrom with MSG_PEEK)
|
||||||
// 2. If push notifications arrived, they would have been detected by health check
|
// 2. If push notifications arrived, they would have been detected by health check
|
||||||
// 3. 5 seconds is short enough that connection state is still fresh
|
// 3. 5 seconds is short enough that connection state is still fresh
|
||||||
// 4. Push notifications will be processed by the next WithReader call
|
// 4. Push notifications will be processed by the next WithReader call
|
||||||
lastHealthCheckNs := cn.UsedAtNs()
|
// used it is set on getConn, so we should use another timer (lastPutAt?)
|
||||||
|
lastHealthCheckNs := cn.LastPutAtNs()
|
||||||
if lastHealthCheckNs > 0 {
|
if lastHealthCheckNs > 0 {
|
||||||
// Use pool's cached time to avoid expensive time.Now() syscall
|
// Use pool's cached time to avoid expensive time.Now() syscall
|
||||||
nowNs := pool.GetCachedTimeNs()
|
nowNs := pool.GetCachedTimeNs()
|
||||||
|
|||||||
Reference in New Issue
Block a user