1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-29 17:41:15 +03:00

feat: enhance push notification handlers with context information

This commit is contained in:
Nedyalko Dyakov
2025-07-04 17:08:08 +03:00
parent c44c8b5b03
commit 47dd490a8a
11 changed files with 242 additions and 128 deletions

View File

@ -7,7 +7,6 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/internal/pushnotif" "github.com/redis/go-redis/v9/internal/pushnotif"
) )
@ -78,16 +77,8 @@ func (cn *Conn) RemoteAddr() net.Addr {
func (cn *Conn) WithReader( func (cn *Conn) WithReader(
ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error, ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
) error { ) error {
// Process any pending push notifications before executing the read function // Push notification processing is now handled by the client before calling WithReader
// This ensures push notifications are handled as soon as they arrive // This ensures proper context (client, connection pool, connection) is available to handlers
if cn.PushNotificationProcessor != nil {
// Type assert to the processor interface
if err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); err != nil {
// Log the error but don't fail the read operation
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications in WithReader: %v", err)
}
}
if timeout >= 0 { if timeout >= 0 {
if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil { if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/internal/pushnotif" "github.com/redis/go-redis/v9/internal/pushnotif"
) )
@ -237,11 +238,6 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
cn := NewConn(netConn) cn := NewConn(netConn)
cn.pooled = pooled cn.pooled = pooled
// Set push notification processor if available
if p.cfg.PushNotificationProcessor != nil {
cn.PushNotificationProcessor = p.cfg.PushNotificationProcessor
}
return cn, nil return cn, nil
} }
@ -392,23 +388,18 @@ func (p *ConnPool) popIdle() (*Conn, error) {
func (p *ConnPool) Put(ctx context.Context, cn *Conn) { func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
if cn.rd.Buffered() > 0 { if cn.rd.Buffered() > 0 {
// Check if this might be push notification data // Check if this might be push notification data
if cn.PushNotificationProcessor != nil && p.cfg.Protocol == 3 { if p.cfg.Protocol == 3 {
// Only process for RESP3 clients (push notifications only available in RESP3) if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush {
err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd) // For push notifications, we allow some buffered data
if err != nil { // The client will process these notifications before using the connection
internal.Logger.Printf(ctx, "push: error processing pending notifications: %v", err) internal.Logger.Printf(ctx, "push: connection has buffered data, likely push notifications - will be processed by client")
}
// Check again if there's still unread data after processing push notifications
if cn.rd.Buffered() > 0 {
internal.Logger.Printf(ctx, "Conn has unread data after processing push notifications")
p.Remove(ctx, cn, BadConnError{})
return return
} }
} else {
internal.Logger.Printf(ctx, "Conn has unread data")
p.Remove(ctx, cn, BadConnError{})
return
} }
// For non-RESP3 or data that is not a push notification, buffered data is unexpected
internal.Logger.Printf(ctx, "Conn has unread data")
p.Remove(ctx, cn, BadConnError{})
return
} }
if !cn.pooled { if !cn.pooled {
@ -554,19 +545,17 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool {
// Check connection health, but be aware of push notifications // Check connection health, but be aware of push notifications
if err := connCheck(cn.netConn); err != nil { if err := connCheck(cn.netConn); err != nil {
// If there's unexpected data and we have push notification support, // If there's unexpected data, it might be push notifications (RESP3)
// it might be push notifications (only for RESP3) // However, push notification processing is now handled by the client
if err == errUnexpectedRead && cn.PushNotificationProcessor != nil && p.cfg.Protocol == 3 { // before WithReader to ensure proper context is available to handlers
// Try to process any pending push notifications (only for RESP3) if err == errUnexpectedRead && p.cfg.Protocol == 3 {
ctx := context.Background() if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush {
if procErr := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); procErr != nil { // For RESP3 connections with push notifications, we allow some buffered data
internal.Logger.Printf(ctx, "push: error processing pending notifications during health check: %v", procErr) // The client will process these notifications before using the connection
return false internal.Logger.Printf(context.Background(), "push: connection has buffered data, likely push notifications - will be processed by client")
} return true // Connection is healthy, client will handle notifications
// Check again after processing push notifications
if connCheck(cn.netConn) != nil {
return false
} }
return false // Unexpected data, not push notifications, connection is unhealthy
} else { } else {
return false return false
} }

View File

@ -39,7 +39,8 @@ func (p *Processor) UnregisterHandler(pushNotificationName string) error {
} }
// ProcessPendingNotifications checks for and processes any pending push notifications. // ProcessPendingNotifications checks for and processes any pending push notifications.
func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { // The handlerCtx provides context about the client, connection pool, and connection.
func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx *HandlerContext, rd *proto.Reader) error {
// Check for nil reader // Check for nil reader
if rd == nil { if rd == nil {
return nil return nil
@ -98,8 +99,8 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.R
// Get the handler for this notification type // Get the handler for this notification type
if handler := p.registry.GetHandler(notificationType); handler != nil { if handler := p.registry.GetHandler(notificationType); handler != nil {
// Handle the notification // Handle the notification with context
handler.HandlePushNotification(ctx, notification) handler.HandlePushNotification(ctx, handlerCtx, notification)
} }
} }
} }
@ -176,10 +177,10 @@ func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error {
} }
// ProcessPendingNotifications for VoidProcessor does nothing since push notifications // ProcessPendingNotifications for VoidProcessor does nothing since push notifications
// are only available in RESP3 and this processor is used when they're disabled. // are only available in RESP3 and this processor is used for RESP2 connections.
// This avoids unnecessary buffer scanning overhead. // This avoids unnecessary buffer scanning overhead.
func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx *HandlerContext, rd *proto.Reader) error {
// VoidProcessor is used when push notifications are disabled (typically RESP2 or disabled RESP3). // VoidProcessor is used for RESP2 connections where push notifications are not available.
// Since push notifications only exist in RESP3, we can safely skip all processing // Since push notifications only exist in RESP3, we can safely skip all processing
// to avoid unnecessary buffer scanning overhead. // to avoid unnecessary buffer scanning overhead.
return nil return nil

View File

@ -25,8 +25,10 @@ func NewTestHandler(name string, returnValue bool) *TestHandler {
} }
} }
func (h *TestHandler) HandlePushNotification(ctx context.Context, notification []interface{}) bool { func (h *TestHandler) HandlePushNotification(ctx context.Context, handlerCtx *HandlerContext, notification []interface{}) bool {
h.handled = append(h.handled, notification) h.handled = append(h.handled, notification)
// Store the handler context for testing if needed
_ = handlerCtx
return h.returnValue return h.returnValue
} }
@ -131,6 +133,13 @@ func testProcessPendingNotifications(processor *Processor, ctx context.Context,
return nil return nil
} }
// Create a test handler context
handlerCtx := &HandlerContext{
Client: nil,
ConnPool: nil,
Conn: nil,
}
for { for {
// Check if there are push notifications available // Check if there are push notifications available
replyType, err := reader.PeekReplyType() replyType, err := reader.PeekReplyType()
@ -175,8 +184,8 @@ func testProcessPendingNotifications(processor *Processor, ctx context.Context,
if notificationType, ok := notification[0].(string); ok { if notificationType, ok := notification[0].(string); ok {
// Get the handler for this notification type // Get the handler for this notification type
if handler := processor.registry.GetHandler(notificationType); handler != nil { if handler := processor.registry.GetHandler(notificationType); handler != nil {
// Handle the notification // Handle the notification with context
handler.HandlePushNotification(ctx, notification) handler.HandlePushNotification(ctx, handlerCtx, notification)
} }
} }
} }
@ -420,14 +429,19 @@ func TestProcessor(t *testing.T) {
ctx := context.Background() ctx := context.Background()
// Test with nil reader // Test with nil reader
err := processor.ProcessPendingNotifications(ctx, nil) handlerCtx := &HandlerContext{
Client: nil,
ConnPool: nil,
Conn: nil,
}
err := processor.ProcessPendingNotifications(ctx, handlerCtx, nil)
if err != nil { if err != nil {
t.Errorf("ProcessPendingNotifications with nil reader should not error, got: %v", err) t.Errorf("ProcessPendingNotifications with nil reader should not error, got: %v", err)
} }
// Test with empty reader (no buffered data) // Test with empty reader (no buffered data)
reader := proto.NewReader(strings.NewReader("")) reader := proto.NewReader(strings.NewReader(""))
err = processor.ProcessPendingNotifications(ctx, reader) err = processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
if err != nil { if err != nil {
t.Errorf("ProcessPendingNotifications with empty reader should not error, got: %v", err) t.Errorf("ProcessPendingNotifications with empty reader should not error, got: %v", err)
} }
@ -533,21 +547,21 @@ func TestProcessor(t *testing.T) {
// Test the actual ProcessPendingNotifications method with real proto.Reader // Test the actual ProcessPendingNotifications method with real proto.Reader
// Test with nil reader // Test with nil reader
err = processor.ProcessPendingNotifications(ctx, nil) err = processor.ProcessPendingNotifications(ctx, handlerCtx, nil)
if err != nil { if err != nil {
t.Errorf("ProcessPendingNotifications with nil reader should not error, got: %v", err) t.Errorf("ProcessPendingNotifications with nil reader should not error, got: %v", err)
} }
// Test with empty reader (no buffered data) // Test with empty reader (no buffered data)
protoReader := proto.NewReader(strings.NewReader("")) protoReader := proto.NewReader(strings.NewReader(""))
err = processor.ProcessPendingNotifications(ctx, protoReader) err = processor.ProcessPendingNotifications(ctx, handlerCtx, protoReader)
if err != nil { if err != nil {
t.Errorf("ProcessPendingNotifications with empty reader should not error, got: %v", err) t.Errorf("ProcessPendingNotifications with empty reader should not error, got: %v", err)
} }
// Test with reader that has some data but not push notifications // Test with reader that has some data but not push notifications
protoReader = proto.NewReader(strings.NewReader("+OK\r\n")) protoReader = proto.NewReader(strings.NewReader("+OK\r\n"))
err = processor.ProcessPendingNotifications(ctx, protoReader) err = processor.ProcessPendingNotifications(ctx, handlerCtx, protoReader)
if err != nil { if err != nil {
t.Errorf("ProcessPendingNotifications with non-push data should not error, got: %v", err) t.Errorf("ProcessPendingNotifications with non-push data should not error, got: %v", err)
} }
@ -637,22 +651,27 @@ func TestVoidProcessor(t *testing.T) {
t.Run("ProcessPendingNotifications", func(t *testing.T) { t.Run("ProcessPendingNotifications", func(t *testing.T) {
processor := NewVoidProcessor() processor := NewVoidProcessor()
ctx := context.Background() ctx := context.Background()
handlerCtx := &HandlerContext{
Client: nil,
ConnPool: nil,
Conn: nil,
}
// VoidProcessor should always succeed and do nothing // VoidProcessor should always succeed and do nothing
err := processor.ProcessPendingNotifications(ctx, nil) err := processor.ProcessPendingNotifications(ctx, handlerCtx, nil)
if err != nil { if err != nil {
t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err) t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err)
} }
// Test with various readers // Test with various readers
reader := proto.NewReader(strings.NewReader("")) reader := proto.NewReader(strings.NewReader(""))
err = processor.ProcessPendingNotifications(ctx, reader) err = processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
if err != nil { if err != nil {
t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err) t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err)
} }
reader = proto.NewReader(strings.NewReader("some data")) reader = proto.NewReader(strings.NewReader("some data"))
err = processor.ProcessPendingNotifications(ctx, reader) err = processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
if err != nil { if err != nil {
t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err) t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err)
} }

View File

@ -6,17 +6,32 @@ import (
"github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/proto"
) )
// HandlerContext provides context information about where a push notification was received.
// This allows handlers to make informed decisions based on the source of the notification.
type HandlerContext struct {
// Client is the Redis client instance that received the notification
Client interface{}
// ConnPool is the connection pool from which the connection was obtained
ConnPool interface{}
// Conn is the specific connection on which the notification was received
Conn interface{}
}
// Handler defines the interface for push notification handlers. // Handler defines the interface for push notification handlers.
type Handler interface { type Handler interface {
// HandlePushNotification processes a push notification. // HandlePushNotification processes a push notification with context information.
// The handlerCtx provides information about the client, connection pool, and connection
// on which the notification was received, allowing handlers to make informed decisions.
// Returns true if the notification was handled, false otherwise. // Returns true if the notification was handled, false otherwise.
HandlePushNotification(ctx context.Context, notification []interface{}) bool HandlePushNotification(ctx context.Context, handlerCtx *HandlerContext, notification []interface{}) bool
} }
// ProcessorInterface defines the interface for push notification processors. // ProcessorInterface defines the interface for push notification processors.
type ProcessorInterface interface { type ProcessorInterface interface {
GetHandler(pushNotificationName string) Handler GetHandler(pushNotificationName string) Handler
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error ProcessPendingNotifications(ctx context.Context, handlerCtx *HandlerContext, rd *proto.Reader) error
RegisterHandler(pushNotificationName string, handler Handler, protected bool) error RegisterHandler(pushNotificationName string, handler Handler, protected bool) error
} }

View File

@ -217,19 +217,11 @@ type Options struct {
// When unstable mode is enabled, the client will use RESP3 protocol and only be able to use RawResult // When unstable mode is enabled, the client will use RESP3 protocol and only be able to use RawResult
UnstableResp3 bool UnstableResp3 bool
// PushNotifications enables general push notification processing. // Push notifications are always enabled for RESP3 connections (Protocol: 3)
// When enabled, the client will process RESP3 push notifications and // and are not available for RESP2 connections. No configuration option is needed.
// route them to registered handlers.
//
// For RESP3 connections (Protocol: 3), push notifications are always enabled
// and cannot be disabled. To avoid push notifications, use Protocol: 2 (RESP2).
// For RESP2 connections, push notifications are not available.
//
// default: always enabled for RESP3, disabled for RESP2
PushNotifications bool
// PushNotificationProcessor is the processor for handling push notifications. // PushNotificationProcessor is the processor for handling push notifications.
// If nil, a default processor will be created when PushNotifications is enabled. // If nil, a default processor will be created for RESP3 connections.
PushNotificationProcessor PushNotificationProcessorInterface PushNotificationProcessor PushNotificationProcessorInterface
} }

View File

@ -1623,7 +1623,7 @@ func (c *ClusterClient) processTxPipelineNode(
} }
func (c *ClusterClient) processTxPipelineNodeConn( func (c *ClusterClient) processTxPipelineNodeConn(
ctx context.Context, _ *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap, ctx context.Context, node *clusterNode, cn *pool.Conn, cmds []Cmder, failedCmds *cmdsMap,
) error { ) error {
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds) return writeCmds(wr, cmds)
@ -1641,7 +1641,7 @@ func (c *ClusterClient) processTxPipelineNodeConn(
trimmedCmds := cmds[1 : len(cmds)-1] trimmedCmds := cmds[1 : len(cmds)-1]
if err := c.txPipelineReadQueued( if err := c.txPipelineReadQueued(
ctx, rd, statusCmd, trimmedCmds, failedCmds, ctx, node, cn, rd, statusCmd, trimmedCmds, failedCmds,
); err != nil { ); err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
@ -1653,23 +1653,37 @@ func (c *ClusterClient) processTxPipelineNodeConn(
return err return err
} }
return pipelineReadCmds(rd, trimmedCmds) return node.Client.pipelineReadCmds(ctx, cn, rd, trimmedCmds)
}) })
} }
func (c *ClusterClient) txPipelineReadQueued( func (c *ClusterClient) txPipelineReadQueued(
ctx context.Context, ctx context.Context,
node *clusterNode,
cn *pool.Conn,
rd *proto.Reader, rd *proto.Reader,
statusCmd *StatusCmd, statusCmd *StatusCmd,
cmds []Cmder, cmds []Cmder,
failedCmds *cmdsMap, failedCmds *cmdsMap,
) error { ) error {
// Parse queued replies. // Parse queued replies.
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := node.Client.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
if err := statusCmd.readReply(rd); err != nil { if err := statusCmd.readReply(rd); err != nil {
return err return err
} }
for _, cmd := range cmds { for _, cmd := range cmds {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := node.Client.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
err := statusCmd.readReply(rd) err := statusCmd.readReply(rd)
if err == nil || c.checkMovedErr(ctx, cmd, err, failedCmds) || isRedisError(err) { if err == nil || c.checkMovedErr(ctx, cmd, err, failedCmds) || isRedisError(err) {
continue continue
@ -1677,6 +1691,12 @@ func (c *ClusterClient) txPipelineReadQueued(
return err return err
} }
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := node.Client.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
// Parse number of replies. // Parse number of replies.
line, err := rd.ReadLine() line, err := rd.ReadLine()
if err != nil { if err != nil {

View File

@ -10,6 +10,7 @@ import (
"github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/internal/pushnotif"
) )
// PubSub implements Pub/Sub commands as described in // PubSub implements Pub/Sub commands as described in
@ -438,7 +439,13 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
ctx := c.getContext() ctx := c.getContext()
handler := c.pushProcessor.GetHandler(kind) handler := c.pushProcessor.GetHandler(kind)
if handler != nil { if handler != nil {
handled := handler.HandlePushNotification(ctx, reply) // Create handler context for pubsub
handlerCtx := &pushnotif.HandlerContext{
Client: c,
ConnPool: nil, // Not available in pubsub context
Conn: nil, // Not available in pubsub context
}
handled := handler.HandlePushNotification(ctx, handlerCtx, reply)
if handled { if handled {
// Return a special message type to indicate it was handled // Return a special message type to indicate it was handled
return &PushNotificationMessage{ return &PushNotificationMessage{

View File

@ -75,8 +75,9 @@ func (p *PushNotificationProcessor) UnregisterHandler(pushNotificationName strin
} }
// ProcessPendingNotifications checks for and processes any pending push notifications. // ProcessPendingNotifications checks for and processes any pending push notifications.
func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { // The handlerCtx provides context about the client, connection pool, and connection.
return p.processor.ProcessPendingNotifications(ctx, rd) func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx *pushnotif.HandlerContext, rd *proto.Reader) error {
return p.processor.ProcessPendingNotifications(ctx, handlerCtx, rd)
} }
// VoidPushNotificationProcessor discards all push notifications without processing them. // VoidPushNotificationProcessor discards all push notifications without processing them.
@ -102,8 +103,8 @@ func (v *VoidPushNotificationProcessor) RegisterHandler(pushNotificationName str
} }
// ProcessPendingNotifications reads and discards any pending push notifications. // ProcessPendingNotifications reads and discards any pending push notifications.
func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx *pushnotif.HandlerContext, rd *proto.Reader) error {
return v.processor.ProcessPendingNotifications(ctx, rd) return v.processor.ProcessPendingNotifications(ctx, handlerCtx, rd)
} }
// Redis Cluster push notification names // Redis Cluster push notification names

168
redis.go
View File

@ -14,6 +14,7 @@ import (
"github.com/redis/go-redis/v9/internal/hscan" "github.com/redis/go-redis/v9/internal/hscan"
"github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/internal/pushnotif"
) )
// Scanner internal/hscan.Scanner exposed interface. // Scanner internal/hscan.Scanner exposed interface.
@ -273,13 +274,6 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
} }
if cn.Inited { if cn.Inited {
// Process all pending push notifications before returning the connection
// This ensures that cluster topology changes are handled immediately
if err := c.processPushNotifications(ctx, cn); err != nil {
// If push notification processing fails, remove the connection
c.connPool.Remove(ctx, cn, err)
return nil, err
}
return cn, nil return cn, nil
} }
@ -291,32 +285,9 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
return nil, err return nil, err
} }
// Process any pending push notifications on the newly initialized connection
// This ensures that any notifications received during connection setup are handled
if err := c.processPushNotifications(ctx, cn); err != nil {
// If push notification processing fails, remove the connection
c.connPool.Remove(ctx, cn, err)
return nil, err
}
return cn, nil return cn, nil
} }
// processPushNotifications processes all pending push notifications on a connection
// This ensures that cluster topology changes are handled immediately before the connection is used
func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn) error {
// Only process push notifications for RESP3 connections with a processor
if c.opt.Protocol != 3 || c.pushProcessor == nil {
return nil
}
// Use WithReader to access the reader and process push notifications
// This is critical for hitless upgrades to work properly
return cn.WithReader(ctx, 0, func(rd *proto.Reader) error {
return c.pushProcessor.ProcessPendingNotifications(ctx, rd)
})
}
func (c *baseClient) newReAuthCredentialsListener(poolCn *pool.Conn) auth.CredentialsListener { func (c *baseClient) newReAuthCredentialsListener(poolCn *pool.Conn) auth.CredentialsListener {
return auth.NewReAuthCredentialsListener( return auth.NewReAuthCredentialsListener(
c.reAuthConnection(poolCn), c.reAuthConnection(poolCn),
@ -489,6 +460,12 @@ func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error)
if isBadConn(err, false, c.opt.Addr) { if isBadConn(err, false, c.opt.Addr) {
c.connPool.Remove(ctx, cn, err) c.connPool.Remove(ctx, cn, err)
} else { } else {
// process any pending push notifications before returning the connection to the pool
if err := c.processPushNotifications(ctx, cn); err != nil {
// Log the error but don't fail the connection release
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before releasing connection: %v", err)
}
c.connPool.Put(ctx, cn) c.connPool.Put(ctx, cn)
} }
} }
@ -552,6 +529,13 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
retryTimeout := uint32(0) retryTimeout := uint32(0)
if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
// Process any pending push notifications before executing the command
if err := c.processPushNotifications(ctx, cn); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before command: %v", err)
}
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmd(wr, cmd) return writeCmd(wr, cmd)
}); err != nil { }); err != nil {
@ -564,6 +548,12 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
readReplyFunc = cmd.readRawReply readReplyFunc = cmd.readRawReply
} }
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error { if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
return readReplyFunc(rd) return readReplyFunc(rd)
}); err != nil { }); err != nil {
if cmd.readTimeout() == nil { if cmd.readTimeout() == nil {
@ -660,6 +650,12 @@ func (c *baseClient) generalProcessPipeline(
// Enable retries by default to retry dial errors returned by withConn. // Enable retries by default to retry dial errors returned by withConn.
canRetry := true canRetry := true
lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error { lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
// Process any pending push notifications before executing the pipeline
if err := c.processPushNotifications(ctx, cn); err != nil {
// Log the error but don't fail the pipeline execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before pipeline: %v", err)
}
var err error var err error
canRetry, err = p(ctx, cn, cmds) canRetry, err = p(ctx, cn, cmds)
return err return err
@ -674,6 +670,14 @@ func (c *baseClient) generalProcessPipeline(
func (c *baseClient) pipelineProcessCmds( func (c *baseClient) pipelineProcessCmds(
ctx context.Context, cn *pool.Conn, cmds []Cmder, ctx context.Context, cn *pool.Conn, cmds []Cmder,
) (bool, error) { ) (bool, error) {
// Process any pending push notifications before executing the pipeline
// This ensures that cluster topology changes are handled immediately
if err := c.processPushNotifications(ctx, cn); err != nil {
// Log the error but don't fail the pipeline execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before pipeline: %v", err)
}
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds) return writeCmds(wr, cmds)
}); err != nil { }); err != nil {
@ -682,7 +686,8 @@ func (c *baseClient) pipelineProcessCmds(
} }
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error { if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
return pipelineReadCmds(rd, cmds) // read all replies
return c.pipelineReadCmds(ctx, cn, rd, cmds)
}); err != nil { }); err != nil {
return true, err return true, err
} }
@ -690,8 +695,14 @@ func (c *baseClient) pipelineProcessCmds(
return false, nil return false, nil
} }
func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error { func (c *baseClient) pipelineReadCmds(ctx context.Context, cn *pool.Conn, rd *proto.Reader, cmds []Cmder) error {
for i, cmd := range cmds { for i, cmd := range cmds {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
err := cmd.readReply(rd) err := cmd.readReply(rd)
cmd.SetErr(err) cmd.SetErr(err)
if err != nil && !isRedisError(err) { if err != nil && !isRedisError(err) {
@ -706,6 +717,14 @@ func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
func (c *baseClient) txPipelineProcessCmds( func (c *baseClient) txPipelineProcessCmds(
ctx context.Context, cn *pool.Conn, cmds []Cmder, ctx context.Context, cn *pool.Conn, cmds []Cmder,
) (bool, error) { ) (bool, error) {
// Process any pending push notifications before executing the transaction pipeline
// This ensures that cluster topology changes are handled immediately
if err := c.processPushNotifications(ctx, cn); err != nil {
// Log the error but don't fail the transaction execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before transaction: %v", err)
}
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error { if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds) return writeCmds(wr, cmds)
}); err != nil { }); err != nil {
@ -714,16 +733,24 @@ func (c *baseClient) txPipelineProcessCmds(
} }
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error { if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
statusCmd := cmds[0].(*StatusCmd) statusCmd := cmds[0].(*StatusCmd)
// Trim multi and exec. // Trim multi and exec.
trimmedCmds := cmds[1 : len(cmds)-1] trimmedCmds := cmds[1 : len(cmds)-1]
if err := txPipelineReadQueued(rd, statusCmd, trimmedCmds); err != nil { if err := c.txPipelineReadQueued(ctx, cn, rd, statusCmd, trimmedCmds); err != nil {
setCmdsErr(cmds, err) setCmdsErr(cmds, err)
return err return err
} }
return pipelineReadCmds(rd, trimmedCmds) // Read replies.
return c.pipelineReadCmds(ctx, cn, rd, trimmedCmds)
}); err != nil { }); err != nil {
return false, err return false, err
} }
@ -731,7 +758,15 @@ func (c *baseClient) txPipelineProcessCmds(
return false, nil return false, nil
} }
func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error { // txPipelineReadQueued reads queued replies from the Redis server.
// It returns an error if the server returns an error or if the number of replies does not match the number of commands.
func (c *baseClient) txPipelineReadQueued(ctx context.Context, cn *pool.Conn, rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
// Parse +OK. // Parse +OK.
if err := statusCmd.readReply(rd); err != nil { if err := statusCmd.readReply(rd); err != nil {
return err return err
@ -739,11 +774,23 @@ func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder)
// Parse +QUEUED. // Parse +QUEUED.
for range cmds { for range cmds {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) { if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) {
return err return err
} }
} }
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
// Parse number of replies. // Parse number of replies.
line, err := rd.ReadLine() line, err := rd.ReadLine()
if err != nil { if err != nil {
@ -780,10 +827,6 @@ func NewClient(opt *Options) *Client {
opt.init() opt.init()
// Push notifications are always enabled for RESP3 (cannot be disabled) // Push notifications are always enabled for RESP3 (cannot be disabled)
// Only override if no custom processor is provided
if opt.Protocol == 3 && opt.PushNotificationProcessor == nil {
opt.PushNotifications = true
}
c := Client{ c := Client{
baseClient: &baseClient{ baseClient: &baseClient{
@ -843,13 +886,13 @@ func initializePushProcessor(opt *Options) PushNotificationProcessorInterface {
return opt.PushNotificationProcessor return opt.PushNotificationProcessor
} }
// For regular clients, respect the PushNotifications setting // Push notifications are always enabled for RESP3, disabled for RESP2
if opt.PushNotifications { if opt.Protocol == 3 {
// Create default processor when push notifications are enabled // Create default processor for RESP3 connections
return NewPushNotificationProcessor() return NewPushNotificationProcessor()
} }
// Create void processor when push notifications are disabled // Create void processor for RESP2 connections (push notifications not available)
return NewVoidPushNotificationProcessor() return NewVoidPushNotificationProcessor()
} }
@ -1070,3 +1113,42 @@ func (c *Conn) TxPipeline() Pipeliner {
pipe.init() pipe.init()
return &pipe return &pipe
} }
// processPushNotifications processes all pending push notifications on a connection
// This ensures that cluster topology changes are handled immediately before the connection is used
// This method should be called by the client before using WithReader for command execution
func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn) error {
// Only process push notifications for RESP3 connections with a processor
if c.opt.Protocol != 3 || c.pushProcessor == nil {
return nil
}
// Use WithReader to access the reader and process push notifications
// This is critical for hitless upgrades to work properly
return cn.WithReader(ctx, 0, func(rd *proto.Reader) error {
// Create handler context with client, connection pool, and connection information
handlerCtx := c.pushNotificationHandlerContext(cn)
return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd)
})
}
// processPendingPushNotificationWithReader processes all pending push notifications on a connection
// This method should be called by the client in WithReader before reading the reply
func (c *baseClient) processPendingPushNotificationWithReader(ctx context.Context, cn *pool.Conn, rd *proto.Reader) error {
if c.opt.Protocol != 3 || c.pushProcessor == nil {
return nil
}
// Create handler context with client, connection pool, and connection information
handlerCtx := c.pushNotificationHandlerContext(cn)
return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd)
}
// pushNotificationHandlerContext creates a handler context for push notification processing
func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) *pushnotif.HandlerContext {
return &pushnotif.HandlerContext{
Client: c,
ConnPool: c.connPool,
Conn: cn,
}
}

View File

@ -62,9 +62,7 @@ type FailoverOptions struct {
Username string Username string
Password string Password string
// PushNotifications enables push notifications for RESP3. // Push notifications are always enabled for RESP3 connections
// Defaults to true for RESP3 connections.
PushNotifications bool
// CredentialsProvider allows the username and password to be updated // CredentialsProvider allows the username and password to be updated
// before reconnecting. It should return the current username and password. // before reconnecting. It should return the current username and password.
CredentialsProvider func() (username string, password string) CredentialsProvider func() (username string, password string)
@ -133,7 +131,6 @@ func (opt *FailoverOptions) clientOptions() *Options {
Protocol: opt.Protocol, Protocol: opt.Protocol,
Username: opt.Username, Username: opt.Username,
Password: opt.Password, Password: opt.Password,
PushNotifications: opt.PushNotifications,
CredentialsProvider: opt.CredentialsProvider, CredentialsProvider: opt.CredentialsProvider,
CredentialsProviderContext: opt.CredentialsProviderContext, CredentialsProviderContext: opt.CredentialsProviderContext,
StreamingCredentialsProvider: opt.StreamingCredentialsProvider, StreamingCredentialsProvider: opt.StreamingCredentialsProvider,