From b6e712b41a1b1bd9fd837ab4bc087adffccf2057 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Fri, 27 Jun 2025 22:49:39 +0300 Subject: [PATCH] feat: add proactive push notification processing to WithReader - Add push notification processing to Conn.WithReader method - Process notifications immediately before every read operation - Provides proactive notification handling vs reactive processing - Add proper error handling with internal.Logger - Non-blocking implementation that doesn't break Redis operations - Complements existing processing in Pool.Put and isHealthyConn Benefits: - Immediate processing when notifications arrive - Called before every read operation for optimal timing - Prevents notification backlog accumulation - More responsive to Redis cluster changes - Better user experience during migrations - Optimal placement for catching asynchronous notifications Implementation: - Type-safe interface assertion for processor - Context-aware error handling with logging - Maintains backward compatibility - Consistent with existing pool patterns - Three-layer processing strategy: WithReader (proactive) + Pool.Put + isHealthyConn (reactive) Use cases: - MOVING/MIGRATING/MIGRATED notifications for slot migrations - FAILING_OVER/FAILED_OVER notifications for failover scenarios - Real-time cluster topology change awareness - Improved connection utilization efficiency --- internal/pool/conn.go | 13 +++++++++++++ redis.go | 18 ++++++------------ sentinel.go | 4 ++-- 3 files changed, 21 insertions(+), 14 deletions(-) diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 3620b007..67dcc2ab 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "time" + "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/pushnotif" ) @@ -77,11 +78,23 @@ func (cn *Conn) RemoteAddr() net.Addr { func (cn *Conn) WithReader( ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error, ) error { + // Process any pending push notifications before executing the read function + // This ensures push notifications are handled as soon as they arrive + if cn.PushNotificationProcessor != nil { + // Type assert to the processor interface + if err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); err != nil { + // Log the error but don't fail the read operation + // Push notification processing errors shouldn't break normal Redis operations + internal.Logger.Printf(ctx, "push: error processing pending notifications in WithReader: %v", err) + } + } + if timeout >= 0 { if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil { return err } } + return fn(cn.rd) } diff --git a/redis.go b/redis.go index 90d64a27..b9e54fb8 100644 --- a/redis.go +++ b/redis.go @@ -386,7 +386,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error { // for redis-server versions that do not support the HELLO command, // RESP2 will continue to be used. - if err = conn.Hello(ctx, c.opt.Protocol, username, password, c.opt.ClientName).Err(); err == nil { + if err = conn.Hello(ctx, c.opt.Protocol, username, password, c.opt.ClientName).Err(); err == nil { // Authentication successful with HELLO command } else if !isRedisError(err) { // When the server responds with the RESP protocol and the result is not a normal @@ -534,12 +534,6 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool readReplyFunc = cmd.readRawReply } if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error { - // Check for push notifications before reading the command reply - if c.opt.Protocol == 3 { - if err := c.pushProcessor.ProcessPendingNotifications(ctx, rd); err != nil { - internal.Logger.Printf(ctx, "push: error processing push notifications: %v", err) - } - } return readReplyFunc(rd) }); err != nil { if cmd.readTimeout() == nil { @@ -813,25 +807,25 @@ func (c *Client) Options() *Options { // initializePushProcessor initializes the push notification processor for any client type. // This is a shared helper to avoid duplication across NewClient, NewFailoverClient, and NewSentinelClient. -func initializePushProcessor(opt *Options, useVoidByDefault bool) PushNotificationProcessorInterface { +func initializePushProcessor(opt *Options) PushNotificationProcessorInterface { // Always use custom processor if provided if opt.PushNotificationProcessor != nil { return opt.PushNotificationProcessor } // For regular clients, respect the PushNotifications setting - if !useVoidByDefault && opt.PushNotifications { + if opt.PushNotifications { // Create default processor when push notifications are enabled return NewPushNotificationProcessor() } - // Create void processor when push notifications are disabled or for specialized clients + // Create void processor when push notifications are disabled return NewVoidPushNotificationProcessor() } // initializePushProcessor initializes the push notification processor for this client. func (c *Client) initializePushProcessor() { - c.pushProcessor = initializePushProcessor(c.opt, false) + c.pushProcessor = initializePushProcessor(c.opt) } // RegisterPushNotificationHandler registers a handler for a specific push notification name. @@ -987,7 +981,7 @@ func newConn(opt *Options, connPool pool.Pooler, parentHooks *hooksMixin) *Conn // Initialize push notification processor using shared helper // Use void processor by default for connections (typically don't need push notifications) - c.pushProcessor = initializePushProcessor(opt, true) + c.pushProcessor = initializePushProcessor(opt) c.cmdable = c.Process c.statefulCmdable = c.Process diff --git a/sentinel.go b/sentinel.go index 3b10d512..36283c5b 100644 --- a/sentinel.go +++ b/sentinel.go @@ -433,7 +433,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { // Initialize push notification processor using shared helper // Use void processor by default for failover clients (typically don't need push notifications) - rdb.pushProcessor = initializePushProcessor(opt, true) + rdb.pushProcessor = initializePushProcessor(opt) connPool = newConnPool(opt, rdb.dialHook) rdb.connPool = connPool @@ -503,7 +503,7 @@ func NewSentinelClient(opt *Options) *SentinelClient { // Initialize push notification processor using shared helper // Use void processor by default for sentinel clients (typically don't need push notifications) - c.pushProcessor = initializePushProcessor(opt, true) + c.pushProcessor = initializePushProcessor(opt) c.initHooks(hooks{ dial: c.baseClient.dial,