package push import ( "context" "time" "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/proto" ) // NotificationProcessor defines the interface for push notification processors. type NotificationProcessor interface { // GetHandler returns the handler for a specific push notification name. GetHandler(pushNotificationName string) NotificationHandler // ProcessPendingNotifications checks for and processes any pending push notifications. ProcessPendingNotifications(ctx context.Context, handlerCtx NotificationHandlerContext, rd *proto.Reader) error // RegisterHandler registers a handler for a specific push notification name. RegisterHandler(pushNotificationName string, handler NotificationHandler, protected bool) error // UnregisterHandler removes a handler for a specific push notification name. UnregisterHandler(pushNotificationName string) error } // Processor handles push notifications with a registry of handlers type Processor struct { registry *Registry } // NewProcessor creates a new push notification processor func NewProcessor() *Processor { return &Processor{ registry: NewRegistry(), } } // GetHandler returns the handler for a specific push notification name func (p *Processor) GetHandler(pushNotificationName string) NotificationHandler { return p.registry.GetHandler(pushNotificationName) } // RegisterHandler registers a handler for a specific push notification name func (p *Processor) RegisterHandler(pushNotificationName string, handler NotificationHandler, protected bool) error { return p.registry.RegisterHandler(pushNotificationName, handler, protected) } // UnregisterHandler removes a handler for a specific push notification name func (p *Processor) UnregisterHandler(pushNotificationName string) error { return p.registry.UnregisterHandler(pushNotificationName) } // ProcessPendingNotifications checks for and processes any pending push notifications func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx NotificationHandlerContext, rd *proto.Reader) error { if rd == nil { return nil } conn := handlerCtx.Conn if conn == nil { return nil } netConn := handlerCtx.Conn.GetNetConn() if netConn == nil { return nil } for { // Set a short read deadline to check for available data // otherwise we may block on Peek if there is no data available err := netConn.SetReadDeadline(time.Now().Add(1)) if err != nil { return err } // Check if there's data available to read replyType, err := rd.PeekReplyType() if err != nil { // No more data available or error reading break } // Only process push notifications (arrays starting with >) if replyType != proto.RespPush { break } // see if we should skip this notification notificationName, err := rd.PeekPushNotificationName() if err != nil { break } if willHandleNotificationInClient(notificationName) { break } // Read the push notification reply, err := rd.ReadReply() if err != nil { internal.Logger.Printf(ctx, "push: error reading push notification: %v", err) break } // Convert to slice of interfaces notification, ok := reply.([]interface{}) if !ok { break } // Handle the notification directly if len(notification) > 0 { // Extract the notification type (first element) if notificationType, ok := notification[0].(string); ok { // Get the handler for this notification type if handler := p.registry.GetHandler(notificationType); handler != nil { // Handle the notification err := handler.HandlePushNotification(ctx, handlerCtx, notification) if err != nil { internal.Logger.Printf(ctx, "push: error handling push notification: %v", err) } } } } } return netConn.SetReadDeadline(time.Time{}) } // VoidProcessor discards all push notifications without processing them type VoidProcessor struct{} // NewVoidProcessor creates a new void push notification processor func NewVoidProcessor() *VoidProcessor { return &VoidProcessor{} } // GetHandler returns nil for void processor since it doesn't maintain handlers func (v *VoidProcessor) GetHandler(_ string) NotificationHandler { return nil } // RegisterHandler returns an error for void processor since it doesn't maintain handlers func (v *VoidProcessor) RegisterHandler(pushNotificationName string, _ NotificationHandler, _ bool) error { return ErrVoidProcessorRegister(pushNotificationName) } // UnregisterHandler returns an error for void processor since it doesn't maintain handlers func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error { return ErrVoidProcessorUnregister(pushNotificationName) } // ProcessPendingNotifications for VoidProcessor does nothing since push notifications // are only available in RESP3 and this processor is used for RESP2 connections. // This avoids unnecessary buffer scanning overhead. func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, handlerCtx NotificationHandlerContext, rd *proto.Reader) error { // read and discard all push notifications if rd == nil { return nil } conn := handlerCtx.Conn if conn == nil { return nil } netConn := handlerCtx.Conn.GetNetConn() if netConn == nil { return nil } for { // Set a short read deadline to check for available data err := netConn.SetReadDeadline(time.Now().Add(1)) if err != nil { return err } // Check if there's data available to read replyType, err := rd.PeekReplyType() if err != nil { // No more data available or error reading break } // Only process push notifications (arrays starting with >) if replyType != proto.RespPush { break } // see if we should skip this notification notificationName, err := rd.PeekPushNotificationName() if err != nil { break } if willHandleNotificationInClient(notificationName) { break } // Read the push notification _, err = rd.ReadReply() if err != nil { internal.Logger.Printf(context.Background(), "push: error reading push notification: %v", err) return nil } } return netConn.SetReadDeadline(time.Time{}) } // willHandleNotificationInClient checks if a notification type should be ignored by the push notification // processor and handled by other specialized systems instead (pub/sub, streams, keyspace, etc.). func willHandleNotificationInClient(notificationType string) bool { switch notificationType { // Pub/Sub notifications - handled by pub/sub system case "message", // Regular pub/sub message "pmessage", // Pattern pub/sub message "subscribe", // Subscription confirmation "unsubscribe", // Unsubscription confirmation "psubscribe", // Pattern subscription confirmation "punsubscribe", // Pattern unsubscription confirmation "smessage", // Sharded pub/sub message (Redis 7.0+) "ssubscribe", // Sharded subscription confirmation "sunsubscribe": // Sharded unsubscription confirmation return true default: return false } }