package redis import ( "context" "fmt" "sync" "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/proto" ) // PushNotificationHandler defines the interface for handling push notifications. type PushNotificationHandler interface { // HandlePushNotification processes a push notification. // Returns true if the notification was handled, false otherwise. HandlePushNotification(ctx context.Context, notification []interface{}) bool } // PushNotificationRegistry manages handlers for different types of push notifications. type PushNotificationRegistry struct { mu sync.RWMutex handlers map[string]PushNotificationHandler // pushNotificationName -> single handler protected map[string]bool // pushNotificationName -> protected flag } // NewPushNotificationRegistry creates a new push notification registry. func NewPushNotificationRegistry() *PushNotificationRegistry { return &PushNotificationRegistry{ handlers: make(map[string]PushNotificationHandler), protected: make(map[string]bool), } } // RegisterHandler registers a handler for a specific push notification name. // Returns an error if a handler is already registered for this push notification name. // If protected is true, the handler cannot be unregistered. func (r *PushNotificationRegistry) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { r.mu.Lock() defer r.mu.Unlock() if _, exists := r.handlers[pushNotificationName]; exists { return fmt.Errorf("handler already registered for push notification: %s", pushNotificationName) } r.handlers[pushNotificationName] = handler r.protected[pushNotificationName] = protected return nil } // UnregisterHandler removes the handler for a specific push notification name. // Returns an error if the handler is protected. func (r *PushNotificationRegistry) UnregisterHandler(pushNotificationName string) error { r.mu.Lock() defer r.mu.Unlock() if r.protected[pushNotificationName] { return fmt.Errorf("cannot unregister protected handler for push notification: %s", pushNotificationName) } delete(r.handlers, pushNotificationName) delete(r.protected, pushNotificationName) return nil } // HandleNotification processes a push notification by calling the registered handler. func (r *PushNotificationRegistry) HandleNotification(ctx context.Context, notification []interface{}) bool { if len(notification) == 0 { return false } // Extract push notification name from notification pushNotificationName, ok := notification[0].(string) if !ok { return false } r.mu.RLock() defer r.mu.RUnlock() // Call specific handler if handler, exists := r.handlers[pushNotificationName]; exists { return handler.HandlePushNotification(ctx, notification) } return false } // GetRegisteredPushNotificationNames returns a list of push notification names that have registered handlers. func (r *PushNotificationRegistry) GetRegisteredPushNotificationNames() []string { r.mu.RLock() defer r.mu.RUnlock() names := make([]string, 0, len(r.handlers)) for name := range r.handlers { names = append(names, name) } return names } // GetHandler returns the handler for a specific push notification name. // Returns nil if no handler is registered for the given name. func (r *PushNotificationRegistry) GetHandler(pushNotificationName string) PushNotificationHandler { r.mu.RLock() defer r.mu.RUnlock() handler, exists := r.handlers[pushNotificationName] if !exists { return nil } return handler } // PushNotificationProcessorInterface defines the interface for push notification processors. type PushNotificationProcessorInterface interface { GetHandler(pushNotificationName string) PushNotificationHandler ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error } // PushNotificationProcessor handles the processing of push notifications from Redis. type PushNotificationProcessor struct { registry *PushNotificationRegistry } // NewPushNotificationProcessor creates a new push notification processor. func NewPushNotificationProcessor() *PushNotificationProcessor { return &PushNotificationProcessor{ registry: NewPushNotificationRegistry(), } } // GetHandler returns the handler for a specific push notification name. // Returns nil if no handler is registered for the given name. func (p *PushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler { return p.registry.GetHandler(pushNotificationName) } // GetRegistryForTesting returns the push notification registry for testing. // This method should only be used by tests. func (p *PushNotificationProcessor) GetRegistryForTesting() *PushNotificationRegistry { return p.registry } // ProcessPendingNotifications checks for and processes any pending push notifications. func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { // Check if there are any buffered bytes that might contain push notifications if rd.Buffered() == 0 { return nil } // Process any pending push notifications for { // Peek at the next reply type to see if it's a push notification replyType, err := rd.PeekReplyType() if err != nil { // No more data available or error peeking break } // Check if this is a RESP3 push notification if replyType == '>' { // RespPush // Read the push notification reply, err := rd.ReadReply() if err != nil { internal.Logger.Printf(ctx, "push: error reading push notification: %v", err) break } // Process the push notification if pushSlice, ok := reply.([]interface{}); ok && len(pushSlice) > 0 { handled := p.registry.HandleNotification(ctx, pushSlice) if handled { internal.Logger.Printf(ctx, "push: processed push notification: %v", pushSlice[0]) } else { internal.Logger.Printf(ctx, "push: unhandled push notification: %v", pushSlice[0]) } } else { internal.Logger.Printf(ctx, "push: invalid push notification format: %v", reply) } } else { // Not a push notification, stop processing break } } return nil } // RegisterHandler is a convenience method to register a handler for a specific push notification name. // Returns an error if a handler is already registered for this push notification name. // If protected is true, the handler cannot be unregistered. func (p *PushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { return p.registry.RegisterHandler(pushNotificationName, handler, protected) } // Redis Cluster push notification names const ( PushNotificationMoving = "MOVING" PushNotificationMigrating = "MIGRATING" PushNotificationMigrated = "MIGRATED" PushNotificationFailingOver = "FAILING_OVER" PushNotificationFailedOver = "FAILED_OVER" ) // PushNotificationInfo contains metadata about a push notification. type PushNotificationInfo struct { Name string Args []interface{} } // ParsePushNotificationInfo extracts information from a push notification. func ParsePushNotificationInfo(notification []interface{}) *PushNotificationInfo { if len(notification) == 0 { return nil } name, ok := notification[0].(string) if !ok { return nil } return &PushNotificationInfo{ Name: name, Args: notification[1:], } } // String returns a string representation of the push notification info. func (info *PushNotificationInfo) String() string { if info == nil { return "" } return info.Name } // VoidPushNotificationProcessor is a no-op processor that discards all push notifications. // Used when push notifications are disabled to avoid nil checks throughout the codebase. type VoidPushNotificationProcessor struct{} // NewVoidPushNotificationProcessor creates a new void push notification processor. func NewVoidPushNotificationProcessor() *VoidPushNotificationProcessor { return &VoidPushNotificationProcessor{} } // GetHandler returns nil for void processor since it doesn't maintain handlers. func (v *VoidPushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler { return nil } // GetRegistryForTesting returns nil for void processor since it doesn't maintain handlers. // This method should only be used by tests. func (v *VoidPushNotificationProcessor) GetRegistryForTesting() *PushNotificationRegistry { return nil } // ProcessPendingNotifications reads and discards any pending push notifications. func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { // Read and discard any pending push notifications to clean the buffer for { // Peek at the next reply type to see if it's a push notification replyType, err := rd.PeekReplyType() if err != nil { // No more data available or error peeking break } // Check if this is a RESP3 push notification if replyType == '>' { // RespPush // Read and discard the push notification _, err := rd.ReadReply() if err != nil { internal.Logger.Printf(ctx, "push: error reading push notification to discard: %v", err) break } // Continue to check for more push notifications } else { // Not a push notification, stop processing break } } return nil } // RegisterHandler is a no-op for void processor, always returns nil. func (v *VoidPushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { // No-op: void processor doesn't register handlers return nil }