From ded98eccb13288b75a8cfa0f8ddf9bc810bf4a82 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Tue, 19 Aug 2025 15:56:41 +0300 Subject: [PATCH] address comments --- hitless/errors.go | 1 + hitless/pool_hook.go | 2 +- internal/pool/pool.go | 14 ++++++-- options.go | 76 ++++++++++++++++++++++++++++++++++++------- redis.go | 14 ++++++-- sentinel.go | 22 ++++++++++--- 6 files changed, 108 insertions(+), 21 deletions(-) diff --git a/hitless/errors.go b/hitless/errors.go index 5beb250a..784b41a2 100644 --- a/hitless/errors.go +++ b/hitless/errors.go @@ -37,6 +37,7 @@ var ( ErrHandoffInProgress = errors.New("hitless: handoff already in progress") ErrNoHandoffInProgress = errors.New("hitless: no handoff in progress") ErrConnectionFailed = errors.New("hitless: failed to establish new connection") + ErrHandoffQueueFull = errors.New("hitless: handoff queue is full, cannot queue new handoff requests - consider increasing HandoffQueueSize or MaxWorkers in configuration") ) // Dead error variables removed - unused in simplified architecture diff --git a/hitless/pool_hook.go b/hitless/pool_hook.go index eb3eaf90..073fdd73 100644 --- a/hitless/pool_hook.go +++ b/hitless/pool_hook.go @@ -309,7 +309,7 @@ func (ph *PoolHook) queueHandoff(conn *pool.Conn) error { // Ensure we have workers available to handle the load ph.ensureWorkerAvailable() - return errors.New("queue full") + return ErrHandoffQueueFull } // performConnectionHandoffWithPool performs the actual connection handoff with pool for connection removal on failure diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 43c6a819..0989d38c 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -24,8 +24,18 @@ var ( // ErrPoolTimeout timed out waiting to get a connection from the connection pool. ErrPoolTimeout = errors.New("redis: connection pool timeout") - popAttempts = 10 - getAttempts = 3 + // popAttempts is the maximum number of attempts to find a usable connection + // when popping from the idle connection pool. This handles cases where connections + // are temporarily marked as unusable (e.g., during hitless upgrades or network issues). + // Value of 10 provides sufficient resilience without excessive overhead. + popAttempts = 10 + + // getAttempts is the maximum number of attempts to get a connection that passes + // hook validation (e.g., hitless upgrade hooks). This protects against race conditions + // where hooks might temporarily reject connections during cluster transitions. + // Value of 3 balances resilience with performance - most hook rejections resolve quickly. + getAttempts = 3 + minTime = time.Unix(-2208988800, 0) // Jan 1, 1900 maxTime = minTime.Add(1<<63 - 1) noExpiration = maxTime diff --git a/options.go b/options.go index 3c5d364c..45f62b32 100644 --- a/options.go +++ b/options.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "errors" "fmt" + "math" "net" "net/url" "runtime" @@ -31,6 +32,17 @@ type Limiter interface { ReportResult(result error) } +// safeIntToInt32 safely converts an int to int32, returning an error if overflow would occur. +func safeIntToInt32(value int, fieldName string) (int32, error) { + if value > math.MaxInt32 { + return 0, fmt.Errorf("redis: %s value %d exceeds maximum allowed value %d", fieldName, value, math.MaxInt32) + } + if value < math.MinInt32 { + return 0, fmt.Errorf("redis: %s value %d is below minimum allowed value %d", fieldName, value, math.MinInt32) + } + return int32(value), nil +} + // Options keeps the settings to set up redis connection. type Options struct { @@ -648,40 +660,80 @@ func getUserPassword(u *url.URL) (string, string) { func newConnPool( opt *Options, dialer func(ctx context.Context, network, addr string) (net.Conn, error), -) *pool.ConnPool { +) (*pool.ConnPool, error) { + poolSize, err := safeIntToInt32(opt.PoolSize, "PoolSize") + if err != nil { + return nil, err + } + + minIdleConns, err := safeIntToInt32(opt.MinIdleConns, "MinIdleConns") + if err != nil { + return nil, err + } + + maxIdleConns, err := safeIntToInt32(opt.MaxIdleConns, "MaxIdleConns") + if err != nil { + return nil, err + } + + maxActiveConns, err := safeIntToInt32(opt.MaxActiveConns, "MaxActiveConns") + if err != nil { + return nil, err + } + return pool.NewConnPool(&pool.Options{ Dialer: func(ctx context.Context) (net.Conn, error) { return dialer(ctx, opt.Network, opt.Addr) }, PoolFIFO: opt.PoolFIFO, - PoolSize: int32(opt.PoolSize), + PoolSize: poolSize, PoolTimeout: opt.PoolTimeout, DialTimeout: opt.DialTimeout, - MinIdleConns: int32(opt.MinIdleConns), - MaxIdleConns: int32(opt.MaxIdleConns), - MaxActiveConns: int32(opt.MaxActiveConns), + MinIdleConns: minIdleConns, + MaxIdleConns: maxIdleConns, + MaxActiveConns: maxActiveConns, ConnMaxIdleTime: opt.ConnMaxIdleTime, ConnMaxLifetime: opt.ConnMaxLifetime, ReadBufferSize: opt.ReadBufferSize, WriteBufferSize: opt.WriteBufferSize, PushNotificationsEnabled: opt.Protocol == 3, - }) + }), nil } func newPubSubPool(opt *Options, dialer func(ctx context.Context, network, addr string) (net.Conn, error), -) *pool.PubSubPool { +) (*pool.PubSubPool, error) { + poolSize, err := safeIntToInt32(opt.PoolSize, "PoolSize") + if err != nil { + return nil, err + } + + minIdleConns, err := safeIntToInt32(opt.MinIdleConns, "MinIdleConns") + if err != nil { + return nil, err + } + + maxIdleConns, err := safeIntToInt32(opt.MaxIdleConns, "MaxIdleConns") + if err != nil { + return nil, err + } + + maxActiveConns, err := safeIntToInt32(opt.MaxActiveConns, "MaxActiveConns") + if err != nil { + return nil, err + } + return pool.NewPubSubPool(&pool.Options{ PoolFIFO: opt.PoolFIFO, - PoolSize: int32(opt.PoolSize), + PoolSize: poolSize, PoolTimeout: opt.PoolTimeout, DialTimeout: opt.DialTimeout, - MinIdleConns: int32(opt.MinIdleConns), - MaxIdleConns: int32(opt.MaxIdleConns), - MaxActiveConns: int32(opt.MaxActiveConns), + MinIdleConns: minIdleConns, + MaxIdleConns: maxIdleConns, + MaxActiveConns: maxActiveConns, ConnMaxIdleTime: opt.ConnMaxIdleTime, ConnMaxLifetime: opt.ConnMaxLifetime, ReadBufferSize: 32 * 1024, WriteBufferSize: 32 * 1024, PushNotificationsEnabled: opt.Protocol == 3, - }, dialer) + }, dialer), nil } diff --git a/redis.go b/redis.go index 27bd6b7e..c2791336 100644 --- a/redis.go +++ b/redis.go @@ -456,6 +456,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error { c.optLock.Unlock() return fmt.Errorf("failed to enable maintenance notifications: %w", hitlessHandshakeErr) default: // will handle auto and any other + internal.Logger.Printf(ctx, "hitless: auto mode fallback: hitless upgrades disabled due to handshake failure: %v", hitlessHandshakeErr) c.opt.HitlessUpgradeConfig.Mode = hitless.MaintNotificationsDisabled c.optLock.Unlock() // auto mode, disable hitless upgrades and continue @@ -562,6 +563,8 @@ func (c *baseClient) assertUnstableCommand(cmd Cmder) bool { if c.opt.UnstableResp3 { return true } else { + // TODO: find the best way to remove the panic and return error here + // The client should not panic when executing a command, only when initializing. panic("RESP3 responses for this command are disabled because they may still change. Please set the flag UnstableResp3 . See the [README](https://github.com/redis/go-redis/blob/master/README.md) and the release notes for guidance.") } default: @@ -921,8 +924,15 @@ func NewClient(opt *Options) *Client { opt.PushNotificationProcessor = c.pushProcessor // Create connection pools - c.connPool = newConnPool(opt, c.dialHook) - c.pubSubPool = newPubSubPool(opt, c.dialHook) + var err error + c.connPool, err = newConnPool(opt, c.dialHook) + if err != nil { + panic(fmt.Errorf("redis: failed to create connection pool: %w", err)) + } + c.pubSubPool, err = newPubSubPool(opt, c.dialHook) + if err != nil { + panic(fmt.Errorf("redis: failed to create pubsub pool: %w", err)) + } // Initialize hitless upgrades first if enabled and protocol is RESP3 if opt.HitlessUpgradeConfig != nil && opt.HitlessUpgradeConfig.Mode != hitless.MaintNotificationsDisabled && opt.Protocol == 3 { diff --git a/sentinel.go b/sentinel.go index 8ae284de..e52e8407 100644 --- a/sentinel.go +++ b/sentinel.go @@ -475,8 +475,15 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { // Use void processor by default for RESP2 connections rdb.pushProcessor = initializePushProcessor(opt) - rdb.connPool = newConnPool(opt, rdb.dialHook) - rdb.pubSubPool = newPubSubPool(opt, rdb.dialHook) + var err error + rdb.connPool, err = newConnPool(opt, rdb.dialHook) + if err != nil { + panic(fmt.Errorf("redis: failed to create connection pool: %w", err)) + } + rdb.pubSubPool, err = newPubSubPool(opt, rdb.dialHook) + if err != nil { + panic(fmt.Errorf("redis: failed to create pubsub pool: %w", err)) + } rdb.onClose = rdb.wrappedOnClose(failover.Close) @@ -552,8 +559,15 @@ func NewSentinelClient(opt *Options) *SentinelClient { dial: c.baseClient.dial, process: c.baseClient.process, }) - c.connPool = newConnPool(opt, c.dialHook) - c.pubSubPool = newPubSubPool(opt, c.dialHook) + var err error + c.connPool, err = newConnPool(opt, c.dialHook) + if err != nil { + panic(fmt.Errorf("redis: failed to create connection pool: %w", err)) + } + c.pubSubPool, err = newPubSubPool(opt, c.dialHook) + if err != nil { + panic(fmt.Errorf("redis: failed to create pubsub pool: %w", err)) + } return c }