diff --git a/push_notification_handler_context.go b/push_notification_handler_context.go new file mode 100644 index 00000000..03f9affd --- /dev/null +++ b/push_notification_handler_context.go @@ -0,0 +1,125 @@ +package redis + +import ( + "github.com/redis/go-redis/v9/internal/pool" +) + +// PushNotificationHandlerContext provides context information about where a push notification was received. +// This interface allows handlers to make informed decisions based on the source of the notification +// with strongly typed access to different client types using concrete types. +type PushNotificationHandlerContext interface { + // GetClient returns the Redis client instance that received the notification. + // Returns nil if no client context is available. + GetClient() interface{} + + // GetClusterClient returns the client as a ClusterClient if it is one. + // Returns nil if the client is not a ClusterClient or no client context is available. + GetClusterClient() *ClusterClient + + // GetSentinelClient returns the client as a SentinelClient if it is one. + // Returns nil if the client is not a SentinelClient or no client context is available. + GetSentinelClient() *SentinelClient + + // GetFailoverClient returns the client as a FailoverClient if it is one. + // Returns nil if the client is not a FailoverClient or no client context is available. + GetFailoverClient() *Client + + // GetRegularClient returns the client as a regular Client if it is one. + // Returns nil if the client is not a regular Client or no client context is available. + GetRegularClient() *Client + + // GetConnPool returns the connection pool from which the connection was obtained. + // Returns nil if no connection pool context is available. + GetConnPool() interface{} + + // GetPubSub returns the PubSub instance that received the notification. + // Returns nil if this is not a PubSub connection. + GetPubSub() *PubSub + + // GetConn returns the specific connection on which the notification was received. + // Returns nil if no connection context is available. + GetConn() *pool.Conn + + // IsBlocking returns true if the notification was received on a blocking connection. + IsBlocking() bool +} + +// pushNotificationHandlerContext is the concrete implementation of PushNotificationHandlerContext interface +type pushNotificationHandlerContext struct { + client interface{} + connPool interface{} + pubSub interface{} + conn *pool.Conn + isBlocking bool +} + +// NewPushNotificationHandlerContext creates a new PushNotificationHandlerContext implementation +func NewPushNotificationHandlerContext(client, connPool, pubSub interface{}, conn *pool.Conn, isBlocking bool) PushNotificationHandlerContext { + return &pushNotificationHandlerContext{ + client: client, + connPool: connPool, + pubSub: pubSub, + conn: conn, + isBlocking: isBlocking, + } +} + +// GetClient returns the Redis client instance that received the notification +func (h *pushNotificationHandlerContext) GetClient() interface{} { + return h.client +} + +// GetClusterClient returns the client as a ClusterClient if it is one +func (h *pushNotificationHandlerContext) GetClusterClient() *ClusterClient { + if client, ok := h.client.(*ClusterClient); ok { + return client + } + return nil +} + +// GetSentinelClient returns the client as a SentinelClient if it is one +func (h *pushNotificationHandlerContext) GetSentinelClient() *SentinelClient { + if client, ok := h.client.(*SentinelClient); ok { + return client + } + return nil +} + +// GetFailoverClient returns the client as a FailoverClient if it is one +func (h *pushNotificationHandlerContext) GetFailoverClient() *Client { + if client, ok := h.client.(*Client); ok { + return client + } + return nil +} + +// GetRegularClient returns the client as a regular Client if it is one +func (h *pushNotificationHandlerContext) GetRegularClient() *Client { + if client, ok := h.client.(*Client); ok { + return client + } + return nil +} + +// GetConnPool returns the connection pool from which the connection was obtained +func (h *pushNotificationHandlerContext) GetConnPool() interface{} { + return h.connPool +} + +// GetPubSub returns the PubSub instance that received the notification +func (h *pushNotificationHandlerContext) GetPubSub() *PubSub { + if pubSub, ok := h.pubSub.(*PubSub); ok { + return pubSub + } + return nil +} + +// GetConn returns the specific connection on which the notification was received +func (h *pushNotificationHandlerContext) GetConn() *pool.Conn { + return h.conn +} + +// IsBlocking returns true if the notification was received on a blocking connection +func (h *pushNotificationHandlerContext) IsBlocking() bool { + return h.isBlocking +} diff --git a/push_notification_processor.go b/push_notification_processor.go new file mode 100644 index 00000000..38877206 --- /dev/null +++ b/push_notification_processor.go @@ -0,0 +1,198 @@ +package redis + +import ( + "context" + "fmt" + + "github.com/redis/go-redis/v9/internal" + "github.com/redis/go-redis/v9/internal/proto" +) + +// Registry manages push notification handlers +type Registry struct { + handlers map[string]PushNotificationHandler + protected map[string]bool +} + +// NewRegistry creates a new push notification registry +func NewRegistry() *Registry { + return &Registry{ + handlers: make(map[string]PushNotificationHandler), + protected: make(map[string]bool), + } +} + +// RegisterHandler registers a handler for a specific push notification name +func (r *Registry) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { + if handler == nil { + return fmt.Errorf("handler cannot be nil") + } + + // Check if handler already exists and is protected + if existingProtected, exists := r.protected[pushNotificationName]; exists && existingProtected { + return fmt.Errorf("cannot overwrite protected handler for push notification: %s", pushNotificationName) + } + + r.handlers[pushNotificationName] = handler + r.protected[pushNotificationName] = protected + return nil +} + +// GetHandler returns the handler for a specific push notification name +func (r *Registry) GetHandler(pushNotificationName string) PushNotificationHandler { + return r.handlers[pushNotificationName] +} + +// UnregisterHandler removes a handler for a specific push notification name +func (r *Registry) UnregisterHandler(pushNotificationName string) error { + // Check if handler is protected + if protected, exists := r.protected[pushNotificationName]; exists && protected { + return fmt.Errorf("cannot unregister protected handler for push notification: %s", pushNotificationName) + } + + delete(r.handlers, pushNotificationName) + delete(r.protected, pushNotificationName) + return nil +} + +// GetRegisteredPushNotificationNames returns all registered push notification names +func (r *Registry) GetRegisteredPushNotificationNames() []string { + names := make([]string, 0, len(r.handlers)) + for name := range r.handlers { + names = append(names, name) + } + return names +} + +// Processor handles push notifications with a registry of handlers +type Processor struct { + registry *Registry +} + +// NewProcessor creates a new push notification processor +func NewProcessor() *Processor { + return &Processor{ + registry: NewRegistry(), + } +} + +// GetHandler returns the handler for a specific push notification name +func (p *Processor) GetHandler(pushNotificationName string) PushNotificationHandler { + return p.registry.GetHandler(pushNotificationName) +} + +// RegisterHandler registers a handler for a specific push notification name +func (p *Processor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { + return p.registry.RegisterHandler(pushNotificationName, handler, protected) +} + +// UnregisterHandler removes a handler for a specific push notification name +func (p *Processor) UnregisterHandler(pushNotificationName string) error { + return p.registry.UnregisterHandler(pushNotificationName) +} + +// ProcessPendingNotifications checks for and processes any pending push notifications +func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { + if rd == nil { + return nil + } + + for { + // Check if there's data available to read + replyType, err := rd.PeekReplyType() + if err != nil { + // No more data available or error reading + break + } + + // Only process push notifications (arrays starting with >) + if replyType != proto.RespPush { + break + } + + // Read the push notification + reply, err := rd.ReadReply() + if err != nil { + internal.Logger.Printf(ctx, "push: error reading push notification: %v", err) + break + } + + // Convert to slice of interfaces + notification, ok := reply.([]interface{}) + if !ok { + continue + } + + // Handle the notification directly + if len(notification) > 0 { + // Extract the notification type (first element) + if notificationType, ok := notification[0].(string); ok { + // Skip notifications that should be handled by other systems + if shouldSkipNotification(notificationType) { + continue + } + + // Get the handler for this notification type + if handler := p.registry.GetHandler(notificationType); handler != nil { + // Handle the notification + handler.HandlePushNotification(ctx, handlerCtx, notification) + } + } + } + } + + return nil +} + +// shouldSkipNotification checks if a notification type should be ignored by the push notification +// processor and handled by other specialized systems instead (pub/sub, streams, keyspace, etc.). +func shouldSkipNotification(notificationType string) bool { + switch notificationType { + // Pub/Sub notifications - handled by pub/sub system + case "message", // Regular pub/sub message + "pmessage", // Pattern pub/sub message + "subscribe", // Subscription confirmation + "unsubscribe", // Unsubscription confirmation + "psubscribe", // Pattern subscription confirmation + "punsubscribe", // Pattern unsubscription confirmation + "smessage", // Sharded pub/sub message (Redis 7.0+) + "ssubscribe", // Sharded subscription confirmation + "sunsubscribe": // Sharded unsubscription confirmation + return true + default: + return false + } +} + +// VoidProcessor discards all push notifications without processing them +type VoidProcessor struct{} + +// NewVoidProcessor creates a new void push notification processor +func NewVoidProcessor() *VoidProcessor { + return &VoidProcessor{} +} + +// GetHandler returns nil for void processor since it doesn't maintain handlers +func (v *VoidProcessor) GetHandler(pushNotificationName string) PushNotificationHandler { + return nil +} + +// RegisterHandler returns an error for void processor since it doesn't maintain handlers +func (v *VoidProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { + return fmt.Errorf("cannot register push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName) +} + +// UnregisterHandler returns an error for void processor since it doesn't maintain handlers +func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error { + return fmt.Errorf("cannot unregister push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName) +} + +// ProcessPendingNotifications for VoidProcessor does nothing since push notifications +// are only available in RESP3 and this processor is used for RESP2 connections. +// This avoids unnecessary buffer scanning overhead. +func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { + // VoidProcessor is used for RESP2 connections where push notifications are not available. + // Since push notifications only exist in RESP3, we can safely skip all processing + // to avoid unnecessary buffer scanning overhead. + return nil +} diff --git a/push_notifications.go b/push_notifications.go index 9d2ed2cc..d9666c04 100644 --- a/push_notifications.go +++ b/push_notifications.go @@ -2,10 +2,7 @@ package redis import ( "context" - "fmt" - "github.com/redis/go-redis/v9/internal" - "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/proto" ) @@ -27,45 +24,7 @@ const ( PushNotificationFailedOver = "FAILED_OVER" ) -// PushNotificationHandlerContext provides context information about where a push notification was received. -// This interface allows handlers to make informed decisions based on the source of the notification -// with strongly typed access to different client types using concrete types. -type PushNotificationHandlerContext interface { - // GetClient returns the Redis client instance that received the notification. - // Returns nil if no client context is available. - GetClient() interface{} - - // GetClusterClient returns the client as a ClusterClient if it is one. - // Returns nil if the client is not a ClusterClient or no client context is available. - GetClusterClient() *ClusterClient - - // GetSentinelClient returns the client as a SentinelClient if it is one. - // Returns nil if the client is not a SentinelClient or no client context is available. - GetSentinelClient() *SentinelClient - - // GetFailoverClient returns the client as a FailoverClient if it is one. - // Returns nil if the client is not a FailoverClient or no client context is available. - GetFailoverClient() *Client - - // GetRegularClient returns the client as a regular Client if it is one. - // Returns nil if the client is not a regular Client or no client context is available. - GetRegularClient() *Client - - // GetConnPool returns the connection pool from which the connection was obtained. - // Returns nil if no connection pool context is available. - GetConnPool() interface{} - - // GetPubSub returns the PubSub instance that received the notification. - // Returns nil if this is not a PubSub connection. - GetPubSub() *PubSub - - // GetConn returns the specific connection on which the notification was received. - // Returns nil if no connection context is available. - GetConn() *pool.Conn - - // IsBlocking returns true if the notification was received on a blocking connection. - IsBlocking() bool -} +// PushNotificationHandlerContext is defined in push_notification_handler_context.go // PushNotificationHandler defines the interface for push notification handlers. type PushNotificationHandler interface { @@ -76,276 +35,9 @@ type PushNotificationHandler interface { HandlePushNotification(ctx context.Context, handlerCtx PushNotificationHandlerContext, notification []interface{}) bool } -// pushNotificationHandlerContext is the concrete implementation of PushNotificationHandlerContext interface -type pushNotificationHandlerContext struct { - client interface{} - connPool interface{} - pubSub interface{} - conn *pool.Conn - isBlocking bool -} - -// NewPushNotificationHandlerContext creates a new PushNotificationHandlerContext implementation -func NewPushNotificationHandlerContext(client, connPool, pubSub interface{}, conn *pool.Conn, isBlocking bool) PushNotificationHandlerContext { - return &pushNotificationHandlerContext{ - client: client, - connPool: connPool, - pubSub: pubSub, - conn: conn, - isBlocking: isBlocking, - } -} - -// GetClient returns the Redis client instance that received the notification -func (h *pushNotificationHandlerContext) GetClient() interface{} { - return h.client -} - -// GetClusterClient returns the client as a ClusterClient if it is one -func (h *pushNotificationHandlerContext) GetClusterClient() *ClusterClient { - if client, ok := h.client.(*ClusterClient); ok { - return client - } - return nil -} - -// GetSentinelClient returns the client as a SentinelClient if it is one -func (h *pushNotificationHandlerContext) GetSentinelClient() *SentinelClient { - if client, ok := h.client.(*SentinelClient); ok { - return client - } - return nil -} - -// GetFailoverClient returns the client as a FailoverClient if it is one -func (h *pushNotificationHandlerContext) GetFailoverClient() *Client { - if client, ok := h.client.(*Client); ok { - return client - } - return nil -} - -// GetRegularClient returns the client as a regular Client if it is one -func (h *pushNotificationHandlerContext) GetRegularClient() *Client { - if client, ok := h.client.(*Client); ok { - return client - } - return nil -} - -// GetConnPool returns the connection pool from which the connection was obtained -func (h *pushNotificationHandlerContext) GetConnPool() interface{} { - return h.connPool -} - -// GetPubSub returns the PubSub instance that received the notification -func (h *pushNotificationHandlerContext) GetPubSub() *PubSub { - if pubSub, ok := h.pubSub.(*PubSub); ok { - return pubSub - } - return nil -} - -// GetConn returns the specific connection on which the notification was received -func (h *pushNotificationHandlerContext) GetConn() *pool.Conn { - return h.conn -} - -// IsBlocking returns true if the notification was received on a blocking connection -func (h *pushNotificationHandlerContext) IsBlocking() bool { - return h.isBlocking -} - -// Registry manages push notification handlers -type Registry struct { - handlers map[string]PushNotificationHandler - protected map[string]bool -} - -// NewRegistry creates a new push notification registry -func NewRegistry() *Registry { - return &Registry{ - handlers: make(map[string]PushNotificationHandler), - protected: make(map[string]bool), - } -} - -// RegisterHandler registers a handler for a specific push notification name -func (r *Registry) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - if handler == nil { - return fmt.Errorf("handler cannot be nil") - } - - // Check if handler already exists and is protected - if existingProtected, exists := r.protected[pushNotificationName]; exists && existingProtected { - return fmt.Errorf("cannot overwrite protected handler for push notification: %s", pushNotificationName) - } - - r.handlers[pushNotificationName] = handler - r.protected[pushNotificationName] = protected - return nil -} - -// GetHandler returns the handler for a specific push notification name -func (r *Registry) GetHandler(pushNotificationName string) PushNotificationHandler { - return r.handlers[pushNotificationName] -} - -// UnregisterHandler removes a handler for a specific push notification name -func (r *Registry) UnregisterHandler(pushNotificationName string) error { - // Check if handler is protected - if protected, exists := r.protected[pushNotificationName]; exists && protected { - return fmt.Errorf("cannot unregister protected handler for push notification: %s", pushNotificationName) - } - - delete(r.handlers, pushNotificationName) - delete(r.protected, pushNotificationName) - return nil -} - -// GetRegisteredPushNotificationNames returns all registered push notification names -func (r *Registry) GetRegisteredPushNotificationNames() []string { - names := make([]string, 0, len(r.handlers)) - for name := range r.handlers { - names = append(names, name) - } - return names -} - -// Processor handles push notifications with a registry of handlers -type Processor struct { - registry *Registry -} - -// NewProcessor creates a new push notification processor -func NewProcessor() *Processor { - return &Processor{ - registry: NewRegistry(), - } -} - -// GetHandler returns the handler for a specific push notification name -func (p *Processor) GetHandler(pushNotificationName string) PushNotificationHandler { - return p.registry.GetHandler(pushNotificationName) -} - -// RegisterHandler registers a handler for a specific push notification name -func (p *Processor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - return p.registry.RegisterHandler(pushNotificationName, handler, protected) -} - -// UnregisterHandler removes a handler for a specific push notification name -func (p *Processor) UnregisterHandler(pushNotificationName string) error { - return p.registry.UnregisterHandler(pushNotificationName) -} - -// ProcessPendingNotifications checks for and processes any pending push notifications -func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { - if rd == nil { - return nil - } - - for { - // Check if there's data available to read - replyType, err := rd.PeekReplyType() - if err != nil { - // No more data available or error reading - break - } - - // Only process push notifications (arrays starting with >) - if replyType != proto.RespPush { - break - } - - // Read the push notification - reply, err := rd.ReadReply() - if err != nil { - internal.Logger.Printf(ctx, "push: error reading push notification: %v", err) - break - } - - // Convert to slice of interfaces - notification, ok := reply.([]interface{}) - if !ok { - continue - } - - // Handle the notification directly - if len(notification) > 0 { - // Extract the notification type (first element) - if notificationType, ok := notification[0].(string); ok { - // Skip notifications that should be handled by other systems - if shouldSkipNotification(notificationType) { - continue - } - - // Get the handler for this notification type - if handler := p.registry.GetHandler(notificationType); handler != nil { - // Handle the notification - handler.HandlePushNotification(ctx, handlerCtx, notification) - } - } - } - } - - return nil -} - -// shouldSkipNotification checks if a notification type should be ignored by the push notification -// processor and handled by other specialized systems instead (pub/sub, streams, keyspace, etc.). -func shouldSkipNotification(notificationType string) bool { - switch notificationType { - // Pub/Sub notifications - handled by pub/sub system - case "message", // Regular pub/sub message - "pmessage", // Pattern pub/sub message - "subscribe", // Subscription confirmation - "unsubscribe", // Unsubscription confirmation - "psubscribe", // Pattern subscription confirmation - "punsubscribe", // Pattern unsubscription confirmation - "smessage", // Sharded pub/sub message (Redis 7.0+) - "ssubscribe", // Sharded subscription confirmation - "sunsubscribe": // Sharded unsubscription confirmation - return true - default: - return false - } -} - -// VoidProcessor discards all push notifications without processing them -type VoidProcessor struct{} - -// NewVoidProcessor creates a new void push notification processor -func NewVoidProcessor() *VoidProcessor { - return &VoidProcessor{} -} - -// GetHandler returns nil for void processor since it doesn't maintain handlers -func (v *VoidProcessor) GetHandler(pushNotificationName string) PushNotificationHandler { - return nil -} - -// RegisterHandler returns an error for void processor since it doesn't maintain handlers -func (v *VoidProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - return fmt.Errorf("cannot register push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName) -} - -// UnregisterHandler returns an error for void processor since it doesn't maintain handlers -func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error { - return fmt.Errorf("cannot unregister push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName) -} - -// ProcessPendingNotifications for VoidProcessor does nothing since push notifications -// are only available in RESP3 and this processor is used for RESP2 connections. -// This avoids unnecessary buffer scanning overhead. -func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { - // VoidProcessor is used for RESP2 connections where push notifications are not available. - // Since push notifications only exist in RESP3, we can safely skip all processing - // to avoid unnecessary buffer scanning overhead. - return nil -} - +// NewPushNotificationHandlerContext is defined in push_notification_handler_context.go +// Registry, Processor, and VoidProcessor are defined in push_notification_processor.go // PushNotificationProcessorInterface defines the interface for push notification processors. type PushNotificationProcessorInterface interface {