diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 7c195cd7..e504dfbc 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -18,9 +18,9 @@ import ( var noDeadline = time.Time{} -// Global time cache updated every 50ms by background goroutine. +// Global time cache updated every 100ms by background goroutine. // This avoids expensive time.Now() syscalls in hot paths like getEffectiveReadTimeout. -// Max staleness: 50ms, which is acceptable for timeout deadline checks (timeouts are typically 3-30 seconds). +// Max staleness: 100ms, which is acceptable for timeout deadline checks (timeouts are typically 3-30 seconds). var globalTimeCache struct { nowNs atomic.Int64 } @@ -31,7 +31,7 @@ func init() { // Start background updater go func() { - ticker := time.NewTicker(50 * time.Millisecond) + ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for range ticker.C { @@ -41,12 +41,20 @@ func init() { } // getCachedTimeNs returns the current time in nanoseconds from the global cache. -// This is updated every 50ms by a background goroutine, avoiding expensive syscalls. -// Max staleness: 50ms. +// This is updated every 100ms by a background goroutine, avoiding expensive syscalls. +// Max staleness: 100ms. func getCachedTimeNs() int64 { return globalTimeCache.nowNs.Load() } +// GetCachedTimeNs returns the current time in nanoseconds from the global cache. +// This is updated every 100ms by a background goroutine, avoiding expensive syscalls. +// Max staleness: 100ms. +// Exported for use by other packages that need fast time access. +func GetCachedTimeNs() int64 { + return getCachedTimeNs() +} + // Global atomic counter for connection IDs var connIDCounter uint64 @@ -170,6 +178,9 @@ func (cn *Conn) UsedAt() time.Time { unixNano := atomic.LoadInt64(&cn.usedAt) return time.Unix(0, unixNano) } +func (cn *Conn) UsedAtNs() int64 { + return atomic.LoadInt64(&cn.usedAt) +} func (cn *Conn) SetUsedAt(tm time.Time) { atomic.StoreInt64(&cn.usedAt, tm.UnixNano()) @@ -488,7 +499,7 @@ func (cn *Conn) getEffectiveReadTimeout(normalTimeout time.Duration) time.Durati return time.Duration(readTimeoutNs) } - // Use cached time to avoid expensive syscall (max 50ms staleness is acceptable for timeout checks) + // Use cached time to avoid expensive syscall (max 100ms staleness is acceptable for timeout checks) nowNs := getCachedTimeNs() // Check if deadline has passed if nowNs < deadlineNs { @@ -522,7 +533,7 @@ func (cn *Conn) getEffectiveWriteTimeout(normalTimeout time.Duration) time.Durat return time.Duration(writeTimeoutNs) } - // Use cached time to avoid expensive syscall (max 50ms staleness is acceptable for timeout checks) + // Use cached time to avoid expensive syscall (max 100ms staleness is acceptable for timeout checks) nowNs := getCachedTimeNs() // Check if deadline has passed if nowNs < deadlineNs { @@ -879,7 +890,7 @@ func (cn *Conn) MaybeHasData() bool { // deadline computes the effective deadline time based on context and timeout. // It updates the usedAt timestamp to now. -// Uses cached time to avoid expensive syscall (max 50ms staleness is acceptable for deadline calculation). +// Uses cached time to avoid expensive syscall (max 100ms staleness is acceptable for deadline calculation). func (cn *Conn) deadline(ctx context.Context, timeout time.Duration) time.Time { // Use cached time for deadline calculation (called 2x per command: read + write) tm := time.Unix(0, getCachedTimeNs()) diff --git a/internal/pool/conn_check.go b/internal/pool/conn_check.go index 9e83dd83..cfdf5e5d 100644 --- a/internal/pool/conn_check.go +++ b/internal/pool/conn_check.go @@ -30,7 +30,7 @@ func connCheck(conn net.Conn) error { var sysErr error - if err := rawConn.Read(func(fd uintptr) bool { + if err := rawConn.Control(func(fd uintptr) { var buf [1]byte // Use MSG_PEEK to peek at data without consuming it n, _, err := syscall.Recvfrom(int(fd), buf[:], syscall.MSG_PEEK|syscall.MSG_DONTWAIT) @@ -45,7 +45,6 @@ func connCheck(conn net.Conn) error { default: sysErr = err } - return true }); err != nil { return err } diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 5df4962b..dcb6213d 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -155,10 +155,18 @@ type ConnPool struct { var _ Pooler = (*ConnPool)(nil) func NewConnPool(opt *Options) *ConnPool { - p := &ConnPool{ - cfg: opt, + semSize := opt.PoolSize + if opt.MaxActiveConns > 0 && opt.MaxActiveConns < opt.PoolSize { + if opt.MaxActiveConns < opt.PoolSize { + opt.MaxActiveConns = opt.PoolSize + } + semSize = opt.MaxActiveConns + } + //semSize = opt.PoolSize - semaphore: internal.NewFastSemaphore(opt.PoolSize), + p := &ConnPool{ + cfg: opt, + semaphore: internal.NewFastSemaphore(semSize), conns: make(map[uint64]*Conn), idleConns: make([]*Conn, 0, opt.PoolSize), } diff --git a/redis.go b/redis.go index 1f4b0224..8cd961e5 100644 --- a/redis.go +++ b/redis.go @@ -1351,13 +1351,39 @@ func (c *Conn) TxPipeline() Pipeliner { // processPushNotifications processes all pending push notifications on a connection // This ensures that cluster topology changes are handled immediately before the connection is used -// This method should be called by the client before using WithReader for command execution +// This method should be called by the client before using WithWriter for command execution +// +// Performance optimization: Skip the expensive MaybeHasData() syscall if a health check +// was performed recently (within 5 seconds). The health check already verified the connection +// is healthy and checked for unexpected data (push notifications). func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn) error { // Only process push notifications for RESP3 connections with a processor - // Also check if there is any data to read before processing - // Which is an optimization on UNIX systems where MaybeHasData is a syscall + if c.opt.Protocol != 3 || c.pushProcessor == nil { + return nil + } + + // Performance optimization: Skip MaybeHasData() syscall if health check was recent + // If the connection was health-checked within the last 5 seconds, we can skip the + // expensive syscall since the health check already verified no unexpected data. + // This is safe because: + // 1. Health check (connCheck) uses the same syscall (Recvfrom with MSG_PEEK) + // 2. If push notifications arrived, they would have been detected by health check + // 3. 5 seconds is short enough that connection state is still fresh + // 4. Push notifications will be processed by the next WithReader call + lastHealthCheckNs := cn.UsedAtNs() + if lastHealthCheckNs > 0 { + // Use pool's cached time to avoid expensive time.Now() syscall + nowNs := pool.GetCachedTimeNs() + if nowNs-lastHealthCheckNs < int64(5*time.Second) { + // Recent health check confirmed no unexpected data, skip the syscall + return nil + } + } + + // Check if there is any data to read before processing + // This is an optimization on UNIX systems where MaybeHasData is a syscall // On Windows, MaybeHasData always returns true, so this check is a no-op - if c.opt.Protocol != 3 || c.pushProcessor == nil || !cn.MaybeHasData() { + if !cn.MaybeHasData() { return nil } diff --git a/redis_test.go b/redis_test.go index 5cce3f25..9dd00f19 100644 --- a/redis_test.go +++ b/redis_test.go @@ -245,6 +245,52 @@ var _ = Describe("Client", func() { Expect(val).Should(HaveKeyWithValue("proto", int64(3))) }) + It("should initialize idle connections created by MinIdleConns", func() { + opt := redisOptions() + opt.MinIdleConns = 5 + opt.Password = "asdf" // Set password to require AUTH + opt.DB = 1 // Set DB to require SELECT + + db := redis.NewClient(opt) + defer func() { + Expect(db.Close()).NotTo(HaveOccurred()) + }() + + // Wait for minIdle connections to be created + time.Sleep(100 * time.Millisecond) + + // Verify that idle connections were created + stats := db.PoolStats() + Expect(stats.IdleConns).To(BeNumerically(">=", 5)) + + // Now use these connections - they should be properly initialized + // If they're not initialized, we'll get NOAUTH or WRONGDB errors + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func(id int) { + defer wg.Done() + // Each goroutine performs multiple operations + for j := 0; j < 5; j++ { + key := fmt.Sprintf("test_key_%d_%d", id, j) + err := db.Set(ctx, key, "value", 0).Err() + Expect(err).NotTo(HaveOccurred()) + + val, err := db.Get(ctx, key).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(val).To(Equal("value")) + + err = db.Del(ctx, key).Err() + Expect(err).NotTo(HaveOccurred()) + } + }(i) + } + wg.Wait() + + // Verify no errors occurred + Expect(db.Ping(ctx).Err()).NotTo(HaveOccurred()) + }) + It("processes custom commands", func() { cmd := redis.NewCmd(ctx, "PING") _ = client.Process(ctx, cmd)