From f7948b5c5c2ecf7defea7fb4ba757f13215c75ee Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Fri, 27 Jun 2025 18:07:13 +0300 Subject: [PATCH] fix: address pr review --- internal/pool/conn.go | 5 ++-- internal/pool/pool.go | 27 +++++++++++++++------- internal/pushnotif/processor.go | 35 +++++----------------------- options.go | 8 ++++--- redis.go | 41 ++++++++++++++++++--------------- sentinel.go | 22 +++++------------- 6 files changed, 61 insertions(+), 77 deletions(-) diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 0ff4da90..9e475d0e 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -27,9 +27,8 @@ type Conn struct { onClose func() error // Push notification processor for handling push notifications on this connection - PushNotificationProcessor interface { - ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error - } + // Uses the same interface as defined in pool.go to avoid duplication + PushNotificationProcessor PushNotificationProcessorInterface } func NewConn(netConn net.Conn) *Conn { diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 0150f2f4..8a80f5e6 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -24,6 +24,8 @@ var ( ErrPoolTimeout = errors.New("redis: connection pool timeout") ) + + var timers = sync.Pool{ New: func() interface{} { t := time.NewTimer(time.Hour) @@ -60,6 +62,12 @@ type Pooler interface { Close() error } +// PushNotificationProcessorInterface defines the interface for push notification processors. +// This matches the main PushNotificationProcessorInterface to avoid duplication while preventing circular imports. +type PushNotificationProcessorInterface interface { + ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error +} + type Options struct { Dialer func(context.Context) (net.Conn, error) @@ -74,9 +82,12 @@ type Options struct { ConnMaxLifetime time.Duration // Push notification processor for connections - PushNotificationProcessor interface { - ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error - } + // This interface matches PushNotificationProcessorInterface to avoid duplication + // while preventing circular imports + PushNotificationProcessor PushNotificationProcessorInterface + + // Protocol version for optimization (3 = RESP3 with push notifications, 2 = RESP2 without) + Protocol int } type lastDialErrorWrap struct { @@ -390,8 +401,8 @@ func (p *ConnPool) popIdle() (*Conn, error) { func (p *ConnPool) Put(ctx context.Context, cn *Conn) { if cn.rd.Buffered() > 0 { // Check if this might be push notification data - if cn.PushNotificationProcessor != nil { - // Try to process pending push notifications before discarding connection + if cn.PushNotificationProcessor != nil && p.cfg.Protocol == 3 { + // Only process for RESP3 clients (push notifications only available in RESP3) err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd) if err != nil { internal.Logger.Printf(ctx, "push: error processing pending notifications: %v", err) @@ -553,9 +564,9 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool { // Check connection health, but be aware of push notifications if err := connCheck(cn.netConn); err != nil { // If there's unexpected data and we have push notification support, - // it might be push notifications - if err == errUnexpectedRead && cn.PushNotificationProcessor != nil { - // Try to process any pending push notifications + // it might be push notifications (only for RESP3) + if err == errUnexpectedRead && cn.PushNotificationProcessor != nil && p.cfg.Protocol == 3 { + // Try to process any pending push notifications (only for RESP3) ctx := context.Background() if procErr := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); procErr != nil { internal.Logger.Printf(ctx, "push: error processing pending notifications during health check: %v", procErr) diff --git a/internal/pushnotif/processor.go b/internal/pushnotif/processor.go index be1daaf5..5bbed033 100644 --- a/internal/pushnotif/processor.go +++ b/internal/pushnotif/processor.go @@ -114,35 +114,12 @@ func (v *VoidProcessor) GetRegistryForTesting() *Registry { return nil } -// ProcessPendingNotifications reads and discards any pending push notifications. +// ProcessPendingNotifications for VoidProcessor does nothing since push notifications +// are only available in RESP3 and this processor is used when they're disabled. +// This avoids unnecessary buffer scanning overhead. func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { - // Check for nil reader - if rd == nil { - return nil - } - - // Read and discard any pending push notifications to clean the buffer - for { - // Peek at the next reply type to see if it's a push notification - replyType, err := rd.PeekReplyType() - if err != nil { - // No more data available or error reading - break - } - - // Push notifications use RespPush type in RESP3 - if replyType != proto.RespPush { - break - } - - // Read and discard the push notification - _, err = rd.ReadReply() - if err != nil { - return fmt.Errorf("failed to read push notification for discarding: %w", err) - } - - // Notification discarded - continue to next one - } - + // VoidProcessor is used when push notifications are disabled (typically RESP2 or disabled RESP3). + // Since push notifications only exist in RESP3, we can safely skip all processing + // to avoid unnecessary buffer scanning overhead. return nil } diff --git a/options.go b/options.go index 091ee419..2ffb8603 100644 --- a/options.go +++ b/options.go @@ -221,11 +221,11 @@ type Options struct { // When enabled, the client will process RESP3 push notifications and // route them to registered handlers. // - // For RESP3 connections (Protocol: 3), push notifications are automatically enabled. - // To disable push notifications for RESP3, use Protocol: 2 instead. + // For RESP3 connections (Protocol: 3), push notifications are always enabled + // and cannot be disabled. To avoid push notifications, use Protocol: 2 (RESP2). // For RESP2 connections, push notifications are not available. // - // default: automatically enabled for RESP3, disabled for RESP2 + // default: always enabled for RESP3, disabled for RESP2 PushNotifications bool // PushNotificationProcessor is the processor for handling push notifications. @@ -609,5 +609,7 @@ func newConnPool( ConnMaxLifetime: opt.ConnMaxLifetime, // Pass push notification processor for connection initialization PushNotificationProcessor: opt.PushNotificationProcessor, + // Pass protocol version for push notification optimization + Protocol: opt.Protocol, }) } diff --git a/redis.go b/redis.go index cd015daf..90d64a27 100644 --- a/redis.go +++ b/redis.go @@ -755,7 +755,7 @@ func NewClient(opt *Options) *Client { } opt.init() - // Enable push notifications by default for RESP3 + // Push notifications are always enabled for RESP3 (cannot be disabled) // Only override if no custom processor is provided if opt.Protocol == 3 && opt.PushNotificationProcessor == nil { opt.PushNotifications = true @@ -811,18 +811,27 @@ func (c *Client) Options() *Options { return c.opt } -// initializePushProcessor initializes the push notification processor. -func (c *Client) initializePushProcessor() { +// initializePushProcessor initializes the push notification processor for any client type. +// This is a shared helper to avoid duplication across NewClient, NewFailoverClient, and NewSentinelClient. +func initializePushProcessor(opt *Options, useVoidByDefault bool) PushNotificationProcessorInterface { // Always use custom processor if provided - if c.opt.PushNotificationProcessor != nil { - c.pushProcessor = c.opt.PushNotificationProcessor - } else if c.opt.PushNotifications { - // Create default processor when push notifications are enabled - c.pushProcessor = NewPushNotificationProcessor() - } else { - // Create void processor when push notifications are disabled - c.pushProcessor = NewVoidPushNotificationProcessor() + if opt.PushNotificationProcessor != nil { + return opt.PushNotificationProcessor } + + // For regular clients, respect the PushNotifications setting + if !useVoidByDefault && opt.PushNotifications { + // Create default processor when push notifications are enabled + return NewPushNotificationProcessor() + } + + // Create void processor when push notifications are disabled or for specialized clients + return NewVoidPushNotificationProcessor() +} + +// initializePushProcessor initializes the push notification processor for this client. +func (c *Client) initializePushProcessor() { + c.pushProcessor = initializePushProcessor(c.opt, false) } // RegisterPushNotificationHandler registers a handler for a specific push notification name. @@ -976,13 +985,9 @@ func newConn(opt *Options, connPool pool.Pooler, parentHooks *hooksMixin) *Conn c.hooksMixin = parentHooks.clone() } - // Set push notification processor from options, ensure it's never nil - if opt.PushNotificationProcessor != nil { - c.pushProcessor = opt.PushNotificationProcessor - } else { - // Create a void processor if none provided to ensure we never have nil - c.pushProcessor = NewVoidPushNotificationProcessor() - } + // Initialize push notification processor using shared helper + // Use void processor by default for connections (typically don't need push notifications) + c.pushProcessor = initializePushProcessor(opt, true) c.cmdable = c.Process c.statefulCmdable = c.Process diff --git a/sentinel.go b/sentinel.go index b5e6d73b..3b10d512 100644 --- a/sentinel.go +++ b/sentinel.go @@ -431,14 +431,9 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { } rdb.init() - // Initialize push notification processor similar to regular client - if opt.PushNotificationProcessor != nil { - rdb.pushProcessor = opt.PushNotificationProcessor - } else if opt.PushNotifications { - rdb.pushProcessor = NewPushNotificationProcessor() - } else { - rdb.pushProcessor = NewVoidPushNotificationProcessor() - } + // Initialize push notification processor using shared helper + // Use void processor by default for failover clients (typically don't need push notifications) + rdb.pushProcessor = initializePushProcessor(opt, true) connPool = newConnPool(opt, rdb.dialHook) rdb.connPool = connPool @@ -506,14 +501,9 @@ func NewSentinelClient(opt *Options) *SentinelClient { }, } - // Initialize push notification processor similar to regular client - if opt.PushNotificationProcessor != nil { - c.pushProcessor = opt.PushNotificationProcessor - } else if opt.PushNotifications { - c.pushProcessor = NewPushNotificationProcessor() - } else { - c.pushProcessor = NewVoidPushNotificationProcessor() - } + // Initialize push notification processor using shared helper + // Use void processor by default for sentinel clients (typically don't need push notifications) + c.pushProcessor = initializePushProcessor(opt, true) c.initHooks(hooks{ dial: c.baseClient.dial,