From 47dd490a8a264e3a3facedafa82ee7765486be7e Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Fri, 4 Jul 2025 17:08:08 +0300 Subject: [PATCH] feat: enhance push notification handlers with context information --- internal/pool/conn.go | 13 +-- internal/pool/pool.go | 51 ++++---- internal/pushnotif/processor.go | 13 ++- internal/pushnotif/pushnotif_test.go | 41 +++++-- internal/pushnotif/types.go | 21 +++- options.go | 14 +-- osscluster.go | 26 ++++- pubsub.go | 9 +- push_notifications.go | 9 +- redis.go | 168 ++++++++++++++++++++------- sentinel.go | 5 +- 11 files changed, 242 insertions(+), 128 deletions(-) diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 67dcc2ab..664dc3a0 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -7,7 +7,6 @@ import ( "sync/atomic" "time" - "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/pushnotif" ) @@ -78,16 +77,8 @@ func (cn *Conn) RemoteAddr() net.Addr { func (cn *Conn) WithReader( ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error, ) error { - // Process any pending push notifications before executing the read function - // This ensures push notifications are handled as soon as they arrive - if cn.PushNotificationProcessor != nil { - // Type assert to the processor interface - if err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); err != nil { - // Log the error but don't fail the read operation - // Push notification processing errors shouldn't break normal Redis operations - internal.Logger.Printf(ctx, "push: error processing pending notifications in WithReader: %v", err) - } - } + // Push notification processing is now handled by the client before calling WithReader + // This ensures proper context (client, connection pool, connection) is available to handlers if timeout >= 0 { if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil { diff --git a/internal/pool/pool.go b/internal/pool/pool.go index efadfaae..8f0a7b1c 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -9,6 +9,7 @@ import ( "time" "github.com/redis/go-redis/v9/internal" + "github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/pushnotif" ) @@ -237,11 +238,6 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) { cn := NewConn(netConn) cn.pooled = pooled - // Set push notification processor if available - if p.cfg.PushNotificationProcessor != nil { - cn.PushNotificationProcessor = p.cfg.PushNotificationProcessor - } - return cn, nil } @@ -392,23 +388,18 @@ func (p *ConnPool) popIdle() (*Conn, error) { func (p *ConnPool) Put(ctx context.Context, cn *Conn) { if cn.rd.Buffered() > 0 { // Check if this might be push notification data - if cn.PushNotificationProcessor != nil && p.cfg.Protocol == 3 { - // Only process for RESP3 clients (push notifications only available in RESP3) - err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd) - if err != nil { - internal.Logger.Printf(ctx, "push: error processing pending notifications: %v", err) - } - // Check again if there's still unread data after processing push notifications - if cn.rd.Buffered() > 0 { - internal.Logger.Printf(ctx, "Conn has unread data after processing push notifications") - p.Remove(ctx, cn, BadConnError{}) + if p.cfg.Protocol == 3 { + if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush { + // For push notifications, we allow some buffered data + // The client will process these notifications before using the connection + internal.Logger.Printf(ctx, "push: connection has buffered data, likely push notifications - will be processed by client") return } - } else { - internal.Logger.Printf(ctx, "Conn has unread data") - p.Remove(ctx, cn, BadConnError{}) - return } + // For non-RESP3 or data that is not a push notification, buffered data is unexpected + internal.Logger.Printf(ctx, "Conn has unread data") + p.Remove(ctx, cn, BadConnError{}) + return } if !cn.pooled { @@ -554,19 +545,17 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool { // Check connection health, but be aware of push notifications if err := connCheck(cn.netConn); err != nil { - // If there's unexpected data and we have push notification support, - // it might be push notifications (only for RESP3) - if err == errUnexpectedRead && cn.PushNotificationProcessor != nil && p.cfg.Protocol == 3 { - // Try to process any pending push notifications (only for RESP3) - ctx := context.Background() - if procErr := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); procErr != nil { - internal.Logger.Printf(ctx, "push: error processing pending notifications during health check: %v", procErr) - return false - } - // Check again after processing push notifications - if connCheck(cn.netConn) != nil { - return false + // If there's unexpected data, it might be push notifications (RESP3) + // However, push notification processing is now handled by the client + // before WithReader to ensure proper context is available to handlers + if err == errUnexpectedRead && p.cfg.Protocol == 3 { + if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush { + // For RESP3 connections with push notifications, we allow some buffered data + // The client will process these notifications before using the connection + internal.Logger.Printf(context.Background(), "push: connection has buffered data, likely push notifications - will be processed by client") + return true // Connection is healthy, client will handle notifications } + return false // Unexpected data, not push notifications, connection is unhealthy } else { return false } diff --git a/internal/pushnotif/processor.go b/internal/pushnotif/processor.go index 4476ecb8..8acff455 100644 --- a/internal/pushnotif/processor.go +++ b/internal/pushnotif/processor.go @@ -39,7 +39,8 @@ func (p *Processor) UnregisterHandler(pushNotificationName string) error { } // ProcessPendingNotifications checks for and processes any pending push notifications. -func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { +// The handlerCtx provides context about the client, connection pool, and connection. +func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx *HandlerContext, rd *proto.Reader) error { // Check for nil reader if rd == nil { return nil @@ -98,8 +99,8 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.R // Get the handler for this notification type if handler := p.registry.GetHandler(notificationType); handler != nil { - // Handle the notification - handler.HandlePushNotification(ctx, notification) + // Handle the notification with context + handler.HandlePushNotification(ctx, handlerCtx, notification) } } } @@ -176,10 +177,10 @@ func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error { } // ProcessPendingNotifications for VoidProcessor does nothing since push notifications -// are only available in RESP3 and this processor is used when they're disabled. +// are only available in RESP3 and this processor is used for RESP2 connections. // This avoids unnecessary buffer scanning overhead. -func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { - // VoidProcessor is used when push notifications are disabled (typically RESP2 or disabled RESP3). +func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx *HandlerContext, rd *proto.Reader) error { + // VoidProcessor is used for RESP2 connections where push notifications are not available. // Since push notifications only exist in RESP3, we can safely skip all processing // to avoid unnecessary buffer scanning overhead. return nil diff --git a/internal/pushnotif/pushnotif_test.go b/internal/pushnotif/pushnotif_test.go index 3fa84e88..f4442176 100644 --- a/internal/pushnotif/pushnotif_test.go +++ b/internal/pushnotif/pushnotif_test.go @@ -25,8 +25,10 @@ func NewTestHandler(name string, returnValue bool) *TestHandler { } } -func (h *TestHandler) HandlePushNotification(ctx context.Context, notification []interface{}) bool { +func (h *TestHandler) HandlePushNotification(ctx context.Context, handlerCtx *HandlerContext, notification []interface{}) bool { h.handled = append(h.handled, notification) + // Store the handler context for testing if needed + _ = handlerCtx return h.returnValue } @@ -131,6 +133,13 @@ func testProcessPendingNotifications(processor *Processor, ctx context.Context, return nil } + // Create a test handler context + handlerCtx := &HandlerContext{ + Client: nil, + ConnPool: nil, + Conn: nil, + } + for { // Check if there are push notifications available replyType, err := reader.PeekReplyType() @@ -175,8 +184,8 @@ func testProcessPendingNotifications(processor *Processor, ctx context.Context, if notificationType, ok := notification[0].(string); ok { // Get the handler for this notification type if handler := processor.registry.GetHandler(notificationType); handler != nil { - // Handle the notification - handler.HandlePushNotification(ctx, notification) + // Handle the notification with context + handler.HandlePushNotification(ctx, handlerCtx, notification) } } } @@ -420,14 +429,19 @@ func TestProcessor(t *testing.T) { ctx := context.Background() // Test with nil reader - err := processor.ProcessPendingNotifications(ctx, nil) + handlerCtx := &HandlerContext{ + Client: nil, + ConnPool: nil, + Conn: nil, + } + err := processor.ProcessPendingNotifications(ctx, handlerCtx, nil) if err != nil { t.Errorf("ProcessPendingNotifications with nil reader should not error, got: %v", err) } // Test with empty reader (no buffered data) reader := proto.NewReader(strings.NewReader("")) - err = processor.ProcessPendingNotifications(ctx, reader) + err = processor.ProcessPendingNotifications(ctx, handlerCtx, reader) if err != nil { t.Errorf("ProcessPendingNotifications with empty reader should not error, got: %v", err) } @@ -533,21 +547,21 @@ func TestProcessor(t *testing.T) { // Test the actual ProcessPendingNotifications method with real proto.Reader // Test with nil reader - err = processor.ProcessPendingNotifications(ctx, nil) + err = processor.ProcessPendingNotifications(ctx, handlerCtx, nil) if err != nil { t.Errorf("ProcessPendingNotifications with nil reader should not error, got: %v", err) } // Test with empty reader (no buffered data) protoReader := proto.NewReader(strings.NewReader("")) - err = processor.ProcessPendingNotifications(ctx, protoReader) + err = processor.ProcessPendingNotifications(ctx, handlerCtx, protoReader) if err != nil { t.Errorf("ProcessPendingNotifications with empty reader should not error, got: %v", err) } // Test with reader that has some data but not push notifications protoReader = proto.NewReader(strings.NewReader("+OK\r\n")) - err = processor.ProcessPendingNotifications(ctx, protoReader) + err = processor.ProcessPendingNotifications(ctx, handlerCtx, protoReader) if err != nil { t.Errorf("ProcessPendingNotifications with non-push data should not error, got: %v", err) } @@ -637,22 +651,27 @@ func TestVoidProcessor(t *testing.T) { t.Run("ProcessPendingNotifications", func(t *testing.T) { processor := NewVoidProcessor() ctx := context.Background() + handlerCtx := &HandlerContext{ + Client: nil, + ConnPool: nil, + Conn: nil, + } // VoidProcessor should always succeed and do nothing - err := processor.ProcessPendingNotifications(ctx, nil) + err := processor.ProcessPendingNotifications(ctx, handlerCtx, nil) if err != nil { t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err) } // Test with various readers reader := proto.NewReader(strings.NewReader("")) - err = processor.ProcessPendingNotifications(ctx, reader) + err = processor.ProcessPendingNotifications(ctx, handlerCtx, reader) if err != nil { t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err) } reader = proto.NewReader(strings.NewReader("some data")) - err = processor.ProcessPendingNotifications(ctx, reader) + err = processor.ProcessPendingNotifications(ctx, handlerCtx, reader) if err != nil { t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err) } diff --git a/internal/pushnotif/types.go b/internal/pushnotif/types.go index e60250e7..d5b3cd2e 100644 --- a/internal/pushnotif/types.go +++ b/internal/pushnotif/types.go @@ -6,17 +6,32 @@ import ( "github.com/redis/go-redis/v9/internal/proto" ) +// HandlerContext provides context information about where a push notification was received. +// This allows handlers to make informed decisions based on the source of the notification. +type HandlerContext struct { + // Client is the Redis client instance that received the notification + Client interface{} + + // ConnPool is the connection pool from which the connection was obtained + ConnPool interface{} + + // Conn is the specific connection on which the notification was received + Conn interface{} +} + // Handler defines the interface for push notification handlers. type Handler interface { - // HandlePushNotification processes a push notification. + // HandlePushNotification processes a push notification with context information. + // The handlerCtx provides information about the client, connection pool, and connection + // on which the notification was received, allowing handlers to make informed decisions. // Returns true if the notification was handled, false otherwise. - HandlePushNotification(ctx context.Context, notification []interface{}) bool + HandlePushNotification(ctx context.Context, handlerCtx *HandlerContext, notification []interface{}) bool } // ProcessorInterface defines the interface for push notification processors. type ProcessorInterface interface { GetHandler(pushNotificationName string) Handler - ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error + ProcessPendingNotifications(ctx context.Context, handlerCtx *HandlerContext, rd *proto.Reader) error RegisterHandler(pushNotificationName string, handler Handler, protected bool) error } diff --git a/options.go b/options.go index 2ffb8603..a0616b00 100644 --- a/options.go +++ b/options.go @@ -217,19 +217,11 @@ type Options struct { // When unstable mode is enabled, the client will use RESP3 protocol and only be able to use RawResult UnstableResp3 bool - // PushNotifications enables general push notification processing. - // When enabled, the client will process RESP3 push notifications and - // route them to registered handlers. - // - // For RESP3 connections (Protocol: 3), push notifications are always enabled - // and cannot be disabled. To avoid push notifications, use Protocol: 2 (RESP2). - // For RESP2 connections, push notifications are not available. - // - // default: always enabled for RESP3, disabled for RESP2 - PushNotifications bool + // Push notifications are always enabled for RESP3 connections (Protocol: 3) + // and are not available for RESP2 connections. No configuration option is needed. // PushNotificationProcessor is the processor for handling push notifications. - // If nil, a default processor will be created when PushNotifications is enabled. + // If nil, a default processor will be created for RESP3 connections. PushNotificationProcessor PushNotificationProcessorInterface } diff --git a/osscluster.go b/osscluster.go index 0526022b..bfcc39fc 100644 --- a/osscluster.go +++ b/osscluster.go @@ -1623,7 +1623,7 @@ func (c *ClusterClient) processTxPipelineNode( } func (c *ClusterClient) processTxPipelineNodeConn( - ctx context.Context, _ *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, + ctx context.Context, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, ) error { if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { return writeCmds(wr, cmds) @@ -1641,7 +1641,7 @@ func (c *ClusterClient) processTxPipelineNodeConn( trimmedCmds := cmds[1 : len(cmds)-1] if err := c.txPipelineReadQueued( - ctx, rd, statusCmd, trimmedCmds, failedCmds, + ctx, node, cn, rd, statusCmd, trimmedCmds, failedCmds, ); err != nil { setCmdsErr(cmds, err) @@ -1653,23 +1653,37 @@ func (c *ClusterClient) processTxPipelineNodeConn( return err } - return pipelineReadCmds(rd, trimmedCmds) + return node.Client.pipelineReadCmds(ctx, cn, rd, trimmedCmds) }) } func (c *ClusterClient) txPipelineReadQueued( ctx context.Context, + node *clusterNode, + cn *pool.Conn, rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder, failedCmds *cmdsMap, ) error { // Parse queued replies. + // To be sure there are no buffered push notifications, we process them before reading the reply + if err := node.Client.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { + // Log the error but don't fail the command execution + // Push notification processing errors shouldn't break normal Redis operations + internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) + } if err := statusCmd.readReply(rd); err != nil { return err } for _, cmd := range cmds { + // To be sure there are no buffered push notifications, we process them before reading the reply + if err := node.Client.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { + // Log the error but don't fail the command execution + // Push notification processing errors shouldn't break normal Redis operations + internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) + } err := statusCmd.readReply(rd) if err == nil || c.checkMovedErr(ctx, cmd, err, failedCmds) || isRedisError(err) { continue @@ -1677,6 +1691,12 @@ func (c *ClusterClient) txPipelineReadQueued( return err } + // To be sure there are no buffered push notifications, we process them before reading the reply + if err := node.Client.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { + // Log the error but don't fail the command execution + // Push notification processing errors shouldn't break normal Redis operations + internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) + } // Parse number of replies. line, err := rd.ReadLine() if err != nil { diff --git a/pubsub.go b/pubsub.go index da16d319..bbc778f4 100644 --- a/pubsub.go +++ b/pubsub.go @@ -10,6 +10,7 @@ import ( "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/proto" + "github.com/redis/go-redis/v9/internal/pushnotif" ) // PubSub implements Pub/Sub commands as described in @@ -438,7 +439,13 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) { ctx := c.getContext() handler := c.pushProcessor.GetHandler(kind) if handler != nil { - handled := handler.HandlePushNotification(ctx, reply) + // Create handler context for pubsub + handlerCtx := &pushnotif.HandlerContext{ + Client: c, + ConnPool: nil, // Not available in pubsub context + Conn: nil, // Not available in pubsub context + } + handled := handler.HandlePushNotification(ctx, handlerCtx, reply) if handled { // Return a special message type to indicate it was handled return &PushNotificationMessage{ diff --git a/push_notifications.go b/push_notifications.go index 18544f85..8533aba9 100644 --- a/push_notifications.go +++ b/push_notifications.go @@ -75,8 +75,9 @@ func (p *PushNotificationProcessor) UnregisterHandler(pushNotificationName strin } // ProcessPendingNotifications checks for and processes any pending push notifications. -func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { - return p.processor.ProcessPendingNotifications(ctx, rd) +// The handlerCtx provides context about the client, connection pool, and connection. +func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx *pushnotif.HandlerContext, rd *proto.Reader) error { + return p.processor.ProcessPendingNotifications(ctx, handlerCtx, rd) } // VoidPushNotificationProcessor discards all push notifications without processing them. @@ -102,8 +103,8 @@ func (v *VoidPushNotificationProcessor) RegisterHandler(pushNotificationName str } // ProcessPendingNotifications reads and discards any pending push notifications. -func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { - return v.processor.ProcessPendingNotifications(ctx, rd) +func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx *pushnotif.HandlerContext, rd *proto.Reader) error { + return v.processor.ProcessPendingNotifications(ctx, handlerCtx, rd) } // Redis Cluster push notification names diff --git a/redis.go b/redis.go index e78cea42..e634de1d 100644 --- a/redis.go +++ b/redis.go @@ -14,6 +14,7 @@ import ( "github.com/redis/go-redis/v9/internal/hscan" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/proto" + "github.com/redis/go-redis/v9/internal/pushnotif" ) // Scanner internal/hscan.Scanner exposed interface. @@ -273,13 +274,6 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) { } if cn.Inited { - // Process all pending push notifications before returning the connection - // This ensures that cluster topology changes are handled immediately - if err := c.processPushNotifications(ctx, cn); err != nil { - // If push notification processing fails, remove the connection - c.connPool.Remove(ctx, cn, err) - return nil, err - } return cn, nil } @@ -291,32 +285,9 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) { return nil, err } - // Process any pending push notifications on the newly initialized connection - // This ensures that any notifications received during connection setup are handled - if err := c.processPushNotifications(ctx, cn); err != nil { - // If push notification processing fails, remove the connection - c.connPool.Remove(ctx, cn, err) - return nil, err - } - return cn, nil } -// processPushNotifications processes all pending push notifications on a connection -// This ensures that cluster topology changes are handled immediately before the connection is used -func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn) error { - // Only process push notifications for RESP3 connections with a processor - if c.opt.Protocol != 3 || c.pushProcessor == nil { - return nil - } - - // Use WithReader to access the reader and process push notifications - // This is critical for hitless upgrades to work properly - return cn.WithReader(ctx, 0, func(rd *proto.Reader) error { - return c.pushProcessor.ProcessPendingNotifications(ctx, rd) - }) -} - func (c *baseClient) newReAuthCredentialsListener(poolCn *pool.Conn) auth.CredentialsListener { return auth.NewReAuthCredentialsListener( c.reAuthConnection(poolCn), @@ -489,6 +460,12 @@ func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error) if isBadConn(err, false, c.opt.Addr) { c.connPool.Remove(ctx, cn, err) } else { + // process any pending push notifications before returning the connection to the pool + if err := c.processPushNotifications(ctx, cn); err != nil { + // Log the error but don't fail the connection release + // Push notification processing errors shouldn't break normal Redis operations + internal.Logger.Printf(ctx, "push: error processing pending notifications before releasing connection: %v", err) + } c.connPool.Put(ctx, cn) } } @@ -552,6 +529,13 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool retryTimeout := uint32(0) if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { + // Process any pending push notifications before executing the command + if err := c.processPushNotifications(ctx, cn); err != nil { + // Log the error but don't fail the command execution + // Push notification processing errors shouldn't break normal Redis operations + internal.Logger.Printf(ctx, "push: error processing pending notifications before command: %v", err) + } + if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { return writeCmd(wr, cmd) }); err != nil { @@ -564,6 +548,12 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool readReplyFunc = cmd.readRawReply } if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error { + // To be sure there are no buffered push notifications, we process them before reading the reply + if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { + // Log the error but don't fail the command execution + // Push notification processing errors shouldn't break normal Redis operations + internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) + } return readReplyFunc(rd) }); err != nil { if cmd.readTimeout() == nil { @@ -660,6 +650,12 @@ func (c *baseClient) generalProcessPipeline( // Enable retries by default to retry dial errors returned by withConn. canRetry := true lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { + // Process any pending push notifications before executing the pipeline + if err := c.processPushNotifications(ctx, cn); err != nil { + // Log the error but don't fail the pipeline execution + // Push notification processing errors shouldn't break normal Redis operations + internal.Logger.Printf(ctx, "push: error processing pending notifications before pipeline: %v", err) + } var err error canRetry, err = p(ctx, cn, cmds) return err @@ -674,6 +670,14 @@ func (c *baseClient) generalProcessPipeline( func (c *baseClient) pipelineProcessCmds( ctx context.Context, cn *pool.Conn, cmds []Cmder, ) (bool, error) { + // Process any pending push notifications before executing the pipeline + // This ensures that cluster topology changes are handled immediately + if err := c.processPushNotifications(ctx, cn); err != nil { + // Log the error but don't fail the pipeline execution + // Push notification processing errors shouldn't break normal Redis operations + internal.Logger.Printf(ctx, "push: error processing pending notifications before pipeline: %v", err) + } + if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { return writeCmds(wr, cmds) }); err != nil { @@ -682,7 +686,8 @@ func (c *baseClient) pipelineProcessCmds( } if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error { - return pipelineReadCmds(rd, cmds) + // read all replies + return c.pipelineReadCmds(ctx, cn, rd, cmds) }); err != nil { return true, err } @@ -690,8 +695,14 @@ func (c *baseClient) pipelineProcessCmds( return false, nil } -func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error { +func (c *baseClient) pipelineReadCmds(ctx context.Context, cn *pool.Conn, rd *proto.Reader, cmds []Cmder) error { for i, cmd := range cmds { + // To be sure there are no buffered push notifications, we process them before reading the reply + if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { + // Log the error but don't fail the command execution + // Push notification processing errors shouldn't break normal Redis operations + internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) + } err := cmd.readReply(rd) cmd.SetErr(err) if err != nil && !isRedisError(err) { @@ -706,6 +717,14 @@ func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error { func (c *baseClient) txPipelineProcessCmds( ctx context.Context, cn *pool.Conn, cmds []Cmder, ) (bool, error) { + // Process any pending push notifications before executing the transaction pipeline + // This ensures that cluster topology changes are handled immediately + if err := c.processPushNotifications(ctx, cn); err != nil { + // Log the error but don't fail the transaction execution + // Push notification processing errors shouldn't break normal Redis operations + internal.Logger.Printf(ctx, "push: error processing pending notifications before transaction: %v", err) + } + if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { return writeCmds(wr, cmds) }); err != nil { @@ -714,16 +733,24 @@ func (c *baseClient) txPipelineProcessCmds( } if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error { + // To be sure there are no buffered push notifications, we process them before reading the reply + if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { + // Log the error but don't fail the command execution + // Push notification processing errors shouldn't break normal Redis operations + internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) + } + statusCmd := cmds[0].(*StatusCmd) // Trim multi and exec. trimmedCmds := cmds[1 : len(cmds)-1] - if err := txPipelineReadQueued(rd, statusCmd, trimmedCmds); err != nil { + if err := c.txPipelineReadQueued(ctx, cn, rd, statusCmd, trimmedCmds); err != nil { setCmdsErr(cmds, err) return err } - return pipelineReadCmds(rd, trimmedCmds) + // Read replies. + return c.pipelineReadCmds(ctx, cn, rd, trimmedCmds) }); err != nil { return false, err } @@ -731,7 +758,15 @@ func (c *baseClient) txPipelineProcessCmds( return false, nil } -func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error { +// txPipelineReadQueued reads queued replies from the Redis server. +// It returns an error if the server returns an error or if the number of replies does not match the number of commands. +func (c *baseClient) txPipelineReadQueued(ctx context.Context, cn *pool.Conn, rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error { + // To be sure there are no buffered push notifications, we process them before reading the reply + if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { + // Log the error but don't fail the command execution + // Push notification processing errors shouldn't break normal Redis operations + internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) + } // Parse +OK. if err := statusCmd.readReply(rd); err != nil { return err @@ -739,11 +774,23 @@ func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) // Parse +QUEUED. for range cmds { + // To be sure there are no buffered push notifications, we process them before reading the reply + if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { + // Log the error but don't fail the command execution + // Push notification processing errors shouldn't break normal Redis operations + internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) + } if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) { return err } } + // To be sure there are no buffered push notifications, we process them before reading the reply + if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { + // Log the error but don't fail the command execution + // Push notification processing errors shouldn't break normal Redis operations + internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) + } // Parse number of replies. line, err := rd.ReadLine() if err != nil { @@ -780,10 +827,6 @@ func NewClient(opt *Options) *Client { opt.init() // Push notifications are always enabled for RESP3 (cannot be disabled) - // Only override if no custom processor is provided - if opt.Protocol == 3 && opt.PushNotificationProcessor == nil { - opt.PushNotifications = true - } c := Client{ baseClient: &baseClient{ @@ -843,13 +886,13 @@ func initializePushProcessor(opt *Options) PushNotificationProcessorInterface { return opt.PushNotificationProcessor } - // For regular clients, respect the PushNotifications setting - if opt.PushNotifications { - // Create default processor when push notifications are enabled + // Push notifications are always enabled for RESP3, disabled for RESP2 + if opt.Protocol == 3 { + // Create default processor for RESP3 connections return NewPushNotificationProcessor() } - // Create void processor when push notifications are disabled + // Create void processor for RESP2 connections (push notifications not available) return NewVoidPushNotificationProcessor() } @@ -1070,3 +1113,42 @@ func (c *Conn) TxPipeline() Pipeliner { pipe.init() return &pipe } + +// processPushNotifications processes all pending push notifications on a connection +// This ensures that cluster topology changes are handled immediately before the connection is used +// This method should be called by the client before using WithReader for command execution +func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn) error { + // Only process push notifications for RESP3 connections with a processor + if c.opt.Protocol != 3 || c.pushProcessor == nil { + return nil + } + + // Use WithReader to access the reader and process push notifications + // This is critical for hitless upgrades to work properly + return cn.WithReader(ctx, 0, func(rd *proto.Reader) error { + // Create handler context with client, connection pool, and connection information + handlerCtx := c.pushNotificationHandlerContext(cn) + return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd) + }) +} + +// processPendingPushNotificationWithReader processes all pending push notifications on a connection +// This method should be called by the client in WithReader before reading the reply +func (c *baseClient) processPendingPushNotificationWithReader(ctx context.Context, cn *pool.Conn, rd *proto.Reader) error { + if c.opt.Protocol != 3 || c.pushProcessor == nil { + return nil + } + + // Create handler context with client, connection pool, and connection information + handlerCtx := c.pushNotificationHandlerContext(cn) + return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd) +} + +// pushNotificationHandlerContext creates a handler context for push notification processing +func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) *pushnotif.HandlerContext { + return &pushnotif.HandlerContext{ + Client: c, + ConnPool: c.connPool, + Conn: cn, + } +} diff --git a/sentinel.go b/sentinel.go index 36283c5b..126dc3ea 100644 --- a/sentinel.go +++ b/sentinel.go @@ -62,9 +62,7 @@ type FailoverOptions struct { Username string Password string - // PushNotifications enables push notifications for RESP3. - // Defaults to true for RESP3 connections. - PushNotifications bool + // Push notifications are always enabled for RESP3 connections // CredentialsProvider allows the username and password to be updated // before reconnecting. It should return the current username and password. CredentialsProvider func() (username string, password string) @@ -133,7 +131,6 @@ func (opt *FailoverOptions) clientOptions() *Options { Protocol: opt.Protocol, Username: opt.Username, Password: opt.Password, - PushNotifications: opt.PushNotifications, CredentialsProvider: opt.CredentialsProvider, CredentialsProviderContext: opt.CredentialsProviderContext, StreamingCredentialsProvider: opt.StreamingCredentialsProvider,