diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 664dc3a0..570aefcd 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -8,7 +8,6 @@ import ( "time" "github.com/redis/go-redis/v9/internal/proto" - "github.com/redis/go-redis/v9/internal/pushnotif" ) var noDeadline = time.Time{} @@ -26,10 +25,6 @@ type Conn struct { createdAt time.Time onClose func() error - - // Push notification processor for handling push notifications on this connection - // This is set when the connection is created and is a reference to the processor - PushNotificationProcessor pushnotif.ProcessorInterface } func NewConn(netConn net.Conn) *Conn { @@ -77,9 +72,6 @@ func (cn *Conn) RemoteAddr() net.Addr { func (cn *Conn) WithReader( ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error, ) error { - // Push notification processing is now handled by the client before calling WithReader - // This ensures proper context (client, connection pool, connection) is available to handlers - if timeout >= 0 { if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil { return err diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 8f0a7b1c..9ab4e105 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -10,7 +10,6 @@ import ( "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/proto" - "github.com/redis/go-redis/v9/internal/pushnotif" ) var ( @@ -74,10 +73,6 @@ type Options struct { ConnMaxIdleTime time.Duration ConnMaxLifetime time.Duration - // Push notification processor for connections - // This is an interface to avoid circular imports - PushNotificationProcessor pushnotif.ProcessorInterface - // Protocol version for optimization (3 = RESP3 with push notifications, 2 = RESP2 without) Protocol int } diff --git a/internal/pushnotif/processor.go b/internal/pushnotif/processor.go index 8acff455..d3982427 100644 --- a/internal/pushnotif/processor.go +++ b/internal/pushnotif/processor.go @@ -40,7 +40,7 @@ func (p *Processor) UnregisterHandler(pushNotificationName string) error { // ProcessPendingNotifications checks for and processes any pending push notifications. // The handlerCtx provides context about the client, connection pool, and connection. -func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx *HandlerContext, rd *proto.Reader) error { +func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx HandlerContext, rd *proto.Reader) error { // Check for nil reader if rd == nil { return nil @@ -179,7 +179,7 @@ func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error { // ProcessPendingNotifications for VoidProcessor does nothing since push notifications // are only available in RESP3 and this processor is used for RESP2 connections. // This avoids unnecessary buffer scanning overhead. -func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx *HandlerContext, rd *proto.Reader) error { +func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx HandlerContext, rd *proto.Reader) error { // VoidProcessor is used for RESP2 connections where push notifications are not available. // Since push notifications only exist in RESP3, we can safely skip all processing // to avoid unnecessary buffer scanning overhead. diff --git a/internal/pushnotif/pushnotif.go b/internal/pushnotif/pushnotif.go new file mode 100644 index 00000000..42910775 --- /dev/null +++ b/internal/pushnotif/pushnotif.go @@ -0,0 +1,8 @@ +package pushnotif + +// This is an EXPERIMENTAL API for push notifications. +// It is subject to change without notice. +// The handler interface may change in the future to include more or less context information. +// The handler context has fields that are currently empty interfaces. +// This is to allow for future expansion without breaking compatibility. +// The context information will be filled in with concrete types or more specific interfaces in the future. diff --git a/internal/pushnotif/pushnotif_test.go b/internal/pushnotif/pushnotif_test.go index f4442176..54d08679 100644 --- a/internal/pushnotif/pushnotif_test.go +++ b/internal/pushnotif/pushnotif_test.go @@ -25,7 +25,7 @@ func NewTestHandler(name string, returnValue bool) *TestHandler { } } -func (h *TestHandler) HandlePushNotification(ctx context.Context, handlerCtx *HandlerContext, notification []interface{}) bool { +func (h *TestHandler) HandlePushNotification(ctx context.Context, handlerCtx HandlerContext, notification []interface{}) bool { h.handled = append(h.handled, notification) // Store the handler context for testing if needed _ = handlerCtx @@ -134,11 +134,7 @@ func testProcessPendingNotifications(processor *Processor, ctx context.Context, } // Create a test handler context - handlerCtx := &HandlerContext{ - Client: nil, - ConnPool: nil, - Conn: nil, - } + handlerCtx := NewHandlerContext(nil, nil, nil, nil, false) for { // Check if there are push notifications available @@ -429,11 +425,7 @@ func TestProcessor(t *testing.T) { ctx := context.Background() // Test with nil reader - handlerCtx := &HandlerContext{ - Client: nil, - ConnPool: nil, - Conn: nil, - } + handlerCtx := NewHandlerContext(nil, nil, nil, nil, false) err := processor.ProcessPendingNotifications(ctx, handlerCtx, nil) if err != nil { t.Errorf("ProcessPendingNotifications with nil reader should not error, got: %v", err) @@ -651,11 +643,7 @@ func TestVoidProcessor(t *testing.T) { t.Run("ProcessPendingNotifications", func(t *testing.T) { processor := NewVoidProcessor() ctx := context.Background() - handlerCtx := &HandlerContext{ - Client: nil, - ConnPool: nil, - Conn: nil, - } + handlerCtx := NewHandlerContext(nil, nil, nil, nil, false) // VoidProcessor should always succeed and do nothing err := processor.ProcessPendingNotifications(ctx, handlerCtx, nil) diff --git a/internal/pushnotif/types.go b/internal/pushnotif/types.go index d5b3cd2e..7f4c657a 100644 --- a/internal/pushnotif/types.go +++ b/internal/pushnotif/types.go @@ -3,20 +3,154 @@ package pushnotif import ( "context" + "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/proto" ) // HandlerContext provides context information about where a push notification was received. -// This allows handlers to make informed decisions based on the source of the notification. -type HandlerContext struct { - // Client is the Redis client instance that received the notification - Client interface{} +// This interface allows handlers to make informed decisions based on the source of the notification +// with strongly typed access to different client types. +type HandlerContext interface { + // GetClient returns the Redis client instance that received the notification. + // Returns nil if no client context is available. + GetClient() interface{} - // ConnPool is the connection pool from which the connection was obtained - ConnPool interface{} + // GetClusterClient returns the client as a ClusterClient if it is one. + // Returns nil if the client is not a ClusterClient or no client context is available. + GetClusterClient() ClusterClientInterface - // Conn is the specific connection on which the notification was received - Conn interface{} + // GetSentinelClient returns the client as a SentinelClient if it is one. + // Returns nil if the client is not a SentinelClient or no client context is available. + GetSentinelClient() SentinelClientInterface + + // GetFailoverClient returns the client as a FailoverClient if it is one. + // Returns nil if the client is not a FailoverClient or no client context is available. + GetFailoverClient() FailoverClientInterface + + // GetRegularClient returns the client as a regular Client if it is one. + // Returns nil if the client is not a regular Client or no client context is available. + GetRegularClient() RegularClientInterface + + // GetConnPool returns the connection pool from which the connection was obtained. + // Returns nil if no connection pool context is available. + GetConnPool() interface{} + + // GetPubSub returns the PubSub instance that received the notification. + // Returns nil if this is not a PubSub connection. + GetPubSub() PubSubInterface + + // GetConn returns the specific connection on which the notification was received. + // Returns nil if no connection context is available. + GetConn() *pool.Conn + + // IsBlocking returns true if the notification was received on a blocking connection. + IsBlocking() bool +} + +// Client interfaces for strongly typed access +type ClusterClientInterface interface { + // Add methods that handlers might need from ClusterClient + String() string +} + +type SentinelClientInterface interface { + // Add methods that handlers might need from SentinelClient + String() string +} + +type FailoverClientInterface interface { + // Add methods that handlers might need from FailoverClient + String() string +} + +type RegularClientInterface interface { + // Add methods that handlers might need from regular Client + String() string +} + +type PubSubInterface interface { + // Add methods that handlers might need from PubSub + String() string +} + +// handlerContext is the concrete implementation of HandlerContext interface +type handlerContext struct { + client interface{} + connPool interface{} + pubSub interface{} + conn *pool.Conn + isBlocking bool +} + +// NewHandlerContext creates a new HandlerContext implementation +func NewHandlerContext(client, connPool, pubSub interface{}, conn *pool.Conn, isBlocking bool) HandlerContext { + return &handlerContext{ + client: client, + connPool: connPool, + pubSub: pubSub, + conn: conn, + isBlocking: isBlocking, + } +} + +// GetClient returns the Redis client instance that received the notification +func (h *handlerContext) GetClient() interface{} { + return h.client +} + +// GetClusterClient returns the client as a ClusterClient if it is one +func (h *handlerContext) GetClusterClient() ClusterClientInterface { + if client, ok := h.client.(ClusterClientInterface); ok { + return client + } + return nil +} + +// GetSentinelClient returns the client as a SentinelClient if it is one +func (h *handlerContext) GetSentinelClient() SentinelClientInterface { + if client, ok := h.client.(SentinelClientInterface); ok { + return client + } + return nil +} + +// GetFailoverClient returns the client as a FailoverClient if it is one +func (h *handlerContext) GetFailoverClient() FailoverClientInterface { + if client, ok := h.client.(FailoverClientInterface); ok { + return client + } + return nil +} + +// GetRegularClient returns the client as a regular Client if it is one +func (h *handlerContext) GetRegularClient() RegularClientInterface { + if client, ok := h.client.(RegularClientInterface); ok { + return client + } + return nil +} + +// GetConnPool returns the connection pool from which the connection was obtained +func (h *handlerContext) GetConnPool() interface{} { + return h.connPool +} + +// GetPubSub returns the PubSub instance that received the notification +func (h *handlerContext) GetPubSub() PubSubInterface { + if pubSub, ok := h.pubSub.(PubSubInterface); ok { + return pubSub + } + return nil +} + +// GetConn returns the specific connection on which the notification was received +func (h *handlerContext) GetConn() *pool.Conn { + return h.conn +} + +// IsBlocking returns true if the notification was received on a blocking connection +func (h *handlerContext) IsBlocking() bool { + return h.isBlocking } // Handler defines the interface for push notification handlers. @@ -25,13 +159,13 @@ type Handler interface { // The handlerCtx provides information about the client, connection pool, and connection // on which the notification was received, allowing handlers to make informed decisions. // Returns true if the notification was handled, false otherwise. - HandlePushNotification(ctx context.Context, handlerCtx *HandlerContext, notification []interface{}) bool + HandlePushNotification(ctx context.Context, handlerCtx HandlerContext, notification []interface{}) bool } // ProcessorInterface defines the interface for push notification processors. type ProcessorInterface interface { GetHandler(pushNotificationName string) Handler - ProcessPendingNotifications(ctx context.Context, handlerCtx *HandlerContext, rd *proto.Reader) error + ProcessPendingNotifications(ctx context.Context, handlerCtx HandlerContext, rd *proto.Reader) error RegisterHandler(pushNotificationName string, handler Handler, protected bool) error } diff --git a/options.go b/options.go index a0616b00..b93df01e 100644 --- a/options.go +++ b/options.go @@ -599,8 +599,6 @@ func newConnPool( MaxActiveConns: opt.MaxActiveConns, ConnMaxIdleTime: opt.ConnMaxIdleTime, ConnMaxLifetime: opt.ConnMaxLifetime, - // Pass push notification processor for connection initialization - PushNotificationProcessor: opt.PushNotificationProcessor, // Pass protocol version for push notification optimization Protocol: opt.Protocol, }) diff --git a/pubsub.go b/pubsub.go index bbc778f4..fd671dbe 100644 --- a/pubsub.go +++ b/pubsub.go @@ -48,12 +48,6 @@ func (c *PubSub) init() { c.exit = make(chan struct{}) } -// SetPushNotificationProcessor sets the push notification processor for handling -// generic push notifications received on this PubSub connection. -func (c *PubSub) SetPushNotificationProcessor(processor PushNotificationProcessorInterface) { - c.pushProcessor = processor -} - func (c *PubSub) String() string { c.mu.Lock() defer c.mu.Unlock() @@ -377,18 +371,6 @@ func (p *Pong) String() string { return "Pong" } -// PushNotificationMessage represents a generic push notification received on a PubSub connection. -type PushNotificationMessage struct { - // Command is the push notification command (e.g., "MOVING", "CUSTOM_EVENT"). - Command string - // Args are the arguments following the command. - Args []interface{} -} - -func (m *PushNotificationMessage) String() string { - return fmt.Sprintf("push: %s", m.Command) -} - func (c *PubSub) newMessage(reply interface{}) (interface{}, error) { switch reply := reply.(type) { case string: @@ -435,25 +417,6 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) { Payload: reply[1].(string), }, nil default: - // Try to handle as generic push notification - ctx := c.getContext() - handler := c.pushProcessor.GetHandler(kind) - if handler != nil { - // Create handler context for pubsub - handlerCtx := &pushnotif.HandlerContext{ - Client: c, - ConnPool: nil, // Not available in pubsub context - Conn: nil, // Not available in pubsub context - } - handled := handler.HandlePushNotification(ctx, handlerCtx, reply) - if handled { - // Return a special message type to indicate it was handled - return &PushNotificationMessage{ - Command: kind, - Args: reply[1:], - }, nil - } - } return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind) } default: @@ -477,6 +440,12 @@ func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (int } err = cn.WithReader(ctx, timeout, func(rd *proto.Reader) error { + // To be sure there are no buffered push notifications, we process them before reading the reply + if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { + // Log the error but don't fail the command execution + // Push notification processing errors shouldn't break normal Redis operations + internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) + } return c.cmd.readReply(rd) }) @@ -573,6 +542,22 @@ func (c *PubSub) ChannelWithSubscriptions(opts ...ChannelOption) <-chan interfac return c.allCh.allCh } +func (c *PubSub) processPendingPushNotificationWithReader(ctx context.Context, cn *pool.Conn, rd *proto.Reader) error { + if c.pushProcessor == nil { + return nil + } + + // Create handler context with client, connection pool, and connection information + handlerCtx := c.pushNotificationHandlerContext(cn) + return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd) +} + +func (c *PubSub) pushNotificationHandlerContext(cn *pool.Conn) pushnotif.HandlerContext { + // PubSub doesn't have a client or connection pool, so we pass nil for those + // PubSub connections are blocking + return pushnotif.NewHandlerContext(nil, nil, c, cn, true) +} + type ChannelOption func(c *channel) // WithChannelSize specifies the Go chan size that is used to buffer incoming messages. @@ -699,9 +684,6 @@ func (c *channel) initMsgChan() { // Ignore. case *Pong: // Ignore. - case *PushNotificationMessage: - // Ignore push notifications in message-only channel - // They are already handled by the push notification processor case *Message: timer.Reset(c.chanSendTimeout) select { @@ -756,7 +738,7 @@ func (c *channel) initAllChan() { switch msg := msg.(type) { case *Pong: // Ignore. - case *Subscription, *Message, *PushNotificationMessage: + case *Subscription, *Message: timer.Reset(c.chanSendTimeout) select { case c.allCh <- msg: diff --git a/push_notifications.go b/push_notifications.go index 8533aba9..8514d52f 100644 --- a/push_notifications.go +++ b/push_notifications.go @@ -7,6 +7,8 @@ import ( "github.com/redis/go-redis/v9/internal/pushnotif" ) +type PushNotificationHandlerContext = pushnotif.HandlerContext + // PushNotificationHandler defines the interface for push notification handlers. // This is an alias to the internal push notification handler interface. type PushNotificationHandler = pushnotif.Handler @@ -76,7 +78,7 @@ func (p *PushNotificationProcessor) UnregisterHandler(pushNotificationName strin // ProcessPendingNotifications checks for and processes any pending push notifications. // The handlerCtx provides context about the client, connection pool, and connection. -func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx *pushnotif.HandlerContext, rd *proto.Reader) error { +func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { return p.processor.ProcessPendingNotifications(ctx, handlerCtx, rd) } @@ -103,7 +105,7 @@ func (v *VoidPushNotificationProcessor) RegisterHandler(pushNotificationName str } // ProcessPendingNotifications reads and discards any pending push notifications. -func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx *pushnotif.HandlerContext, rd *proto.Reader) error { +func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { return v.processor.ProcessPendingNotifications(ctx, handlerCtx, rd) } diff --git a/redis.go b/redis.go index e634de1d..229c1cfa 100644 --- a/redis.go +++ b/redis.go @@ -835,8 +835,9 @@ func NewClient(opt *Options) *Client { } c.init() - // Initialize push notification processor - c.initializePushProcessor() + // Initialize push notification processor using shared helper + // Use void processor for RESP2 connections (push notifications not available) + c.pushProcessor = initializePushProcessor(opt) // Update options with the initialized push processor for connection pool opt.PushNotificationProcessor = c.pushProcessor @@ -896,11 +897,6 @@ func initializePushProcessor(opt *Options) PushNotificationProcessorInterface { return NewVoidPushNotificationProcessor() } -// initializePushProcessor initializes the push notification processor for this client. -func (c *Client) initializePushProcessor() { - c.pushProcessor = initializePushProcessor(c.opt) -} - // RegisterPushNotificationHandler registers a handler for a specific push notification name. // Returns an error if a handler is already registered for this push notification name. // If protected is true, the handler cannot be unregistered. @@ -962,13 +958,11 @@ func (c *Client) pubSub() *PubSub { newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) { return c.newConn(ctx) }, - closeConn: c.connPool.CloseConn, + closeConn: c.connPool.CloseConn, + pushProcessor: c.pushProcessor, } pubsub.init() - // Set the push notification processor - pubsub.SetPushNotificationProcessor(c.pushProcessor) - return pubsub } @@ -1053,7 +1047,7 @@ func newConn(opt *Options, connPool pool.Pooler, parentHooks *hooksMixin) *Conn } // Initialize push notification processor using shared helper - // Use void processor by default for connections (typically don't need push notifications) + // Use void processor for RESP2 connections (push notifications not available) c.pushProcessor = initializePushProcessor(opt) c.cmdable = c.Process @@ -1145,10 +1139,6 @@ func (c *baseClient) processPendingPushNotificationWithReader(ctx context.Contex } // pushNotificationHandlerContext creates a handler context for push notification processing -func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) *pushnotif.HandlerContext { - return &pushnotif.HandlerContext{ - Client: c, - ConnPool: c.connPool, - Conn: cn, - } +func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) pushnotif.HandlerContext { + return pushnotif.NewHandlerContext(c, c.connPool, nil, cn, false) } diff --git a/sentinel.go b/sentinel.go index 126dc3ea..ad648f03 100644 --- a/sentinel.go +++ b/sentinel.go @@ -15,6 +15,7 @@ import ( "github.com/redis/go-redis/v9/auth" "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/pool" + "github.com/redis/go-redis/v9/internal/pushnotif" "github.com/redis/go-redis/v9/internal/rand" ) @@ -429,7 +430,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client { rdb.init() // Initialize push notification processor using shared helper - // Use void processor by default for failover clients (typically don't need push notifications) + // Use void processor by default for RESP2 connections rdb.pushProcessor = initializePushProcessor(opt) connPool = newConnPool(opt, rdb.dialHook) @@ -499,8 +500,8 @@ func NewSentinelClient(opt *Options) *SentinelClient { } // Initialize push notification processor using shared helper - // Use void processor by default for sentinel clients (typically don't need push notifications) - c.pushProcessor = initializePushProcessor(opt) + // Use void processor for Sentinel clients + c.pushProcessor = pushnotif.NewVoidProcessor() c.initHooks(hooks{ dial: c.baseClient.dial,