diff --git a/internal/pushnotif/processor.go b/internal/pushnotif/processor.go new file mode 100644 index 00000000..ac582544 --- /dev/null +++ b/internal/pushnotif/processor.go @@ -0,0 +1,147 @@ +package pushnotif + +import ( + "context" + "fmt" + + "github.com/redis/go-redis/v9/internal/proto" +) + +// Processor handles push notifications with a registry of handlers. +type Processor struct { + registry *Registry +} + +// NewProcessor creates a new push notification processor. +func NewProcessor() *Processor { + return &Processor{ + registry: NewRegistry(), + } +} + +// GetHandler returns the handler for a specific push notification name. +// Returns nil if no handler is registered for the given name. +func (p *Processor) GetHandler(pushNotificationName string) Handler { + return p.registry.GetHandler(pushNotificationName) +} + +// RegisterHandler registers a handler for a specific push notification name. +// Returns an error if a handler is already registered for this push notification name. +// If protected is true, the handler cannot be unregistered. +func (p *Processor) RegisterHandler(pushNotificationName string, handler Handler, protected bool) error { + return p.registry.RegisterHandler(pushNotificationName, handler, protected) +} + +// UnregisterHandler removes a handler for a specific push notification name. +// Returns an error if the handler is protected or doesn't exist. +func (p *Processor) UnregisterHandler(pushNotificationName string) error { + return p.registry.UnregisterHandler(pushNotificationName) +} + +// GetRegistryForTesting returns the push notification registry for testing. +// This method should only be used by tests. +func (p *Processor) GetRegistryForTesting() *Registry { + return p.registry +} + +// ProcessPendingNotifications checks for and processes any pending push notifications. +func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { + // Check for nil reader + if rd == nil { + return nil + } + + // Check if there are any buffered bytes that might contain push notifications + if rd.Buffered() == 0 { + return nil + } + + // Process all available push notifications + for { + // Peek at the next reply type to see if it's a push notification + replyType, err := rd.PeekReplyType() + if err != nil { + // No more data available or error reading + break + } + + // Push notifications use RespPush type in RESP3 + if replyType != proto.RespPush { + break + } + + // Try to read the push notification + reply, err := rd.ReadReply() + if err != nil { + return fmt.Errorf("failed to read push notification: %w", err) + } + + // Convert to slice of interfaces + notification, ok := reply.([]interface{}) + if !ok { + continue + } + + // Handle the notification + p.registry.HandleNotification(ctx, notification) + } + + return nil +} + +// VoidProcessor discards all push notifications without processing them. +type VoidProcessor struct{} + +// NewVoidProcessor creates a new void push notification processor. +func NewVoidProcessor() *VoidProcessor { + return &VoidProcessor{} +} + +// GetHandler returns nil for void processor since it doesn't maintain handlers. +func (v *VoidProcessor) GetHandler(pushNotificationName string) Handler { + return nil +} + +// RegisterHandler returns an error for void processor since it doesn't maintain handlers. +func (v *VoidProcessor) RegisterHandler(pushNotificationName string, handler Handler, protected bool) error { + return fmt.Errorf("void push notification processor does not support handler registration") +} + +// GetRegistryForTesting returns nil for void processor since it doesn't maintain handlers. +// This method should only be used by tests. +func (v *VoidProcessor) GetRegistryForTesting() *Registry { + return nil +} + +// ProcessPendingNotifications reads and discards any pending push notifications. +func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { + // Check for nil reader + if rd == nil { + return nil + } + + // Read and discard any pending push notifications to clean the buffer + for { + // Peek at the next reply type to see if it's a push notification + replyType, err := rd.PeekReplyType() + if err != nil { + // No more data available or error reading + break + } + + // Push notifications use RespPush type in RESP3 + if replyType != proto.RespPush { + break + } + + // Read and discard the push notification + _, err = rd.ReadReply() + if err != nil { + return fmt.Errorf("failed to read push notification for discarding: %w", err) + } + + // Notification discarded - continue to next one + } + + return nil +} diff --git a/internal/pushnotif/registry.go b/internal/pushnotif/registry.go new file mode 100644 index 00000000..28233c85 --- /dev/null +++ b/internal/pushnotif/registry.go @@ -0,0 +1,105 @@ +package pushnotif + +import ( + "context" + "fmt" + "sync" +) + +// Registry manages push notification handlers. +type Registry struct { + mu sync.RWMutex + handlers map[string]handlerEntry +} + +// NewRegistry creates a new push notification registry. +func NewRegistry() *Registry { + return &Registry{ + handlers: make(map[string]handlerEntry), + } +} + +// RegisterHandler registers a handler for a specific push notification name. +// Returns an error if a handler is already registered for this push notification name. +// If protected is true, the handler cannot be unregistered. +func (r *Registry) RegisterHandler(pushNotificationName string, handler Handler, protected bool) error { + r.mu.Lock() + defer r.mu.Unlock() + + if _, exists := r.handlers[pushNotificationName]; exists { + return fmt.Errorf("handler already registered for push notification: %s", pushNotificationName) + } + + r.handlers[pushNotificationName] = handlerEntry{ + handler: handler, + protected: protected, + } + return nil +} + +// UnregisterHandler removes a handler for a specific push notification name. +// Returns an error if the handler is protected or doesn't exist. +func (r *Registry) UnregisterHandler(pushNotificationName string) error { + r.mu.Lock() + defer r.mu.Unlock() + + entry, exists := r.handlers[pushNotificationName] + if !exists { + return fmt.Errorf("no handler registered for push notification: %s", pushNotificationName) + } + + if entry.protected { + return fmt.Errorf("cannot unregister protected handler for push notification: %s", pushNotificationName) + } + + delete(r.handlers, pushNotificationName) + return nil +} + +// GetHandler returns the handler for a specific push notification name. +// Returns nil if no handler is registered for the given name. +func (r *Registry) GetHandler(pushNotificationName string) Handler { + r.mu.RLock() + defer r.mu.RUnlock() + + entry, exists := r.handlers[pushNotificationName] + if !exists { + return nil + } + return entry.handler +} + +// GetRegisteredPushNotificationNames returns a list of all registered push notification names. +func (r *Registry) GetRegisteredPushNotificationNames() []string { + r.mu.RLock() + defer r.mu.RUnlock() + + names := make([]string, 0, len(r.handlers)) + for name := range r.handlers { + names = append(names, name) + } + return names +} + +// HandleNotification attempts to handle a push notification using registered handlers. +// Returns true if a handler was found and successfully processed the notification. +func (r *Registry) HandleNotification(ctx context.Context, notification []interface{}) bool { + if len(notification) == 0 { + return false + } + + // Extract the notification type (first element) + notificationType, ok := notification[0].(string) + if !ok { + return false + } + + // Get the handler for this notification type + handler := r.GetHandler(notificationType) + if handler == nil { + return false + } + + // Handle the notification + return handler.HandlePushNotification(ctx, notification) +} diff --git a/internal/pushnotif/types.go b/internal/pushnotif/types.go new file mode 100644 index 00000000..062e16fd --- /dev/null +++ b/internal/pushnotif/types.go @@ -0,0 +1,36 @@ +package pushnotif + +import ( + "context" + + "github.com/redis/go-redis/v9/internal/proto" +) + +// Handler defines the interface for push notification handlers. +type Handler interface { + // HandlePushNotification processes a push notification. + // Returns true if the notification was handled, false otherwise. + HandlePushNotification(ctx context.Context, notification []interface{}) bool +} + +// ProcessorInterface defines the interface for push notification processors. +type ProcessorInterface interface { + GetHandler(pushNotificationName string) Handler + ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error + RegisterHandler(pushNotificationName string, handler Handler, protected bool) error +} + +// RegistryInterface defines the interface for push notification registries. +type RegistryInterface interface { + RegisterHandler(pushNotificationName string, handler Handler, protected bool) error + UnregisterHandler(pushNotificationName string) error + GetHandler(pushNotificationName string) Handler + GetRegisteredPushNotificationNames() []string + HandleNotification(ctx context.Context, notification []interface{}) bool +} + +// handlerEntry represents a registered handler with its protection status. +type handlerEntry struct { + handler Handler + protected bool +} diff --git a/push_notifications.go b/push_notifications.go index a0eba283..03ea8a7a 100644 --- a/push_notifications.go +++ b/push_notifications.go @@ -2,113 +2,18 @@ package redis import ( "context" - "fmt" - "sync" - "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/proto" + "github.com/redis/go-redis/v9/internal/pushnotif" ) -// PushNotificationHandler defines the interface for handling push notifications. +// PushNotificationHandler defines the interface for push notification handlers. type PushNotificationHandler interface { // HandlePushNotification processes a push notification. // Returns true if the notification was handled, false otherwise. HandlePushNotification(ctx context.Context, notification []interface{}) bool } -// PushNotificationRegistry manages handlers for different types of push notifications. -type PushNotificationRegistry struct { - mu sync.RWMutex - handlers map[string]PushNotificationHandler // pushNotificationName -> single handler - protected map[string]bool // pushNotificationName -> protected flag -} - -// NewPushNotificationRegistry creates a new push notification registry. -func NewPushNotificationRegistry() *PushNotificationRegistry { - return &PushNotificationRegistry{ - handlers: make(map[string]PushNotificationHandler), - protected: make(map[string]bool), - } -} - -// RegisterHandler registers a handler for a specific push notification name. -// Returns an error if a handler is already registered for this push notification name. -// If protected is true, the handler cannot be unregistered. -func (r *PushNotificationRegistry) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - r.mu.Lock() - defer r.mu.Unlock() - - if _, exists := r.handlers[pushNotificationName]; exists { - return fmt.Errorf("handler already registered for push notification: %s", pushNotificationName) - } - r.handlers[pushNotificationName] = handler - r.protected[pushNotificationName] = protected - return nil -} - -// UnregisterHandler removes the handler for a specific push notification name. -// Returns an error if the handler is protected. -func (r *PushNotificationRegistry) UnregisterHandler(pushNotificationName string) error { - r.mu.Lock() - defer r.mu.Unlock() - - if r.protected[pushNotificationName] { - return fmt.Errorf("cannot unregister protected handler for push notification: %s", pushNotificationName) - } - - delete(r.handlers, pushNotificationName) - delete(r.protected, pushNotificationName) - return nil -} - -// HandleNotification processes a push notification by calling the registered handler. -func (r *PushNotificationRegistry) HandleNotification(ctx context.Context, notification []interface{}) bool { - if len(notification) == 0 { - return false - } - - // Extract push notification name from notification - pushNotificationName, ok := notification[0].(string) - if !ok { - return false - } - - r.mu.RLock() - defer r.mu.RUnlock() - - // Call specific handler - if handler, exists := r.handlers[pushNotificationName]; exists { - return handler.HandlePushNotification(ctx, notification) - } - - return false -} - -// GetRegisteredPushNotificationNames returns a list of push notification names that have registered handlers. -func (r *PushNotificationRegistry) GetRegisteredPushNotificationNames() []string { - r.mu.RLock() - defer r.mu.RUnlock() - - names := make([]string, 0, len(r.handlers)) - for name := range r.handlers { - names = append(names, name) - } - return names -} - -// GetHandler returns the handler for a specific push notification name. -// Returns nil if no handler is registered for the given name. -func (r *PushNotificationRegistry) GetHandler(pushNotificationName string) PushNotificationHandler { - r.mu.RLock() - defer r.mu.RUnlock() - - handler, exists := r.handlers[pushNotificationName] - if !exists { - return nil - } - return handler -} - // PushNotificationProcessorInterface defines the interface for push notification processors. type PushNotificationProcessorInterface interface { GetHandler(pushNotificationName string) PushNotificationHandler @@ -116,92 +21,142 @@ type PushNotificationProcessorInterface interface { RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error } -// PushNotificationProcessor handles the processing of push notifications from Redis. +// PushNotificationRegistry manages push notification handlers. +type PushNotificationRegistry struct { + registry *pushnotif.Registry +} + +// NewPushNotificationRegistry creates a new push notification registry. +func NewPushNotificationRegistry() *PushNotificationRegistry { + return &PushNotificationRegistry{ + registry: pushnotif.NewRegistry(), + } +} + +// RegisterHandler registers a handler for a specific push notification name. +func (r *PushNotificationRegistry) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { + return r.registry.RegisterHandler(pushNotificationName, &handlerWrapper{handler}, protected) +} + +// UnregisterHandler removes a handler for a specific push notification name. +func (r *PushNotificationRegistry) UnregisterHandler(pushNotificationName string) error { + return r.registry.UnregisterHandler(pushNotificationName) +} + +// GetHandler returns the handler for a specific push notification name. +func (r *PushNotificationRegistry) GetHandler(pushNotificationName string) PushNotificationHandler { + handler := r.registry.GetHandler(pushNotificationName) + if handler == nil { + return nil + } + if wrapper, ok := handler.(*handlerWrapper); ok { + return wrapper.handler + } + return nil +} + +// GetRegisteredPushNotificationNames returns a list of all registered push notification names. +func (r *PushNotificationRegistry) GetRegisteredPushNotificationNames() []string { + return r.registry.GetRegisteredPushNotificationNames() +} + +// HandleNotification attempts to handle a push notification using registered handlers. +func (r *PushNotificationRegistry) HandleNotification(ctx context.Context, notification []interface{}) bool { + return r.registry.HandleNotification(ctx, notification) +} + +// PushNotificationProcessor handles push notifications with a registry of handlers. type PushNotificationProcessor struct { - registry *PushNotificationRegistry + processor *pushnotif.Processor } // NewPushNotificationProcessor creates a new push notification processor. func NewPushNotificationProcessor() *PushNotificationProcessor { return &PushNotificationProcessor{ - registry: NewPushNotificationRegistry(), + processor: pushnotif.NewProcessor(), } } // GetHandler returns the handler for a specific push notification name. -// Returns nil if no handler is registered for the given name. func (p *PushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler { - return p.registry.GetHandler(pushNotificationName) + handler := p.processor.GetHandler(pushNotificationName) + if handler == nil { + return nil + } + if wrapper, ok := handler.(*handlerWrapper); ok { + return wrapper.handler + } + return nil } -// GetRegistryForTesting returns the push notification registry for testing. -// This method should only be used by tests. -func (p *PushNotificationProcessor) GetRegistryForTesting() *PushNotificationRegistry { - return p.registry +// RegisterHandler registers a handler for a specific push notification name. +func (p *PushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { + return p.processor.RegisterHandler(pushNotificationName, &handlerWrapper{handler}, protected) +} + +// UnregisterHandler removes a handler for a specific push notification name. +func (p *PushNotificationProcessor) UnregisterHandler(pushNotificationName string) error { + return p.processor.UnregisterHandler(pushNotificationName) } // ProcessPendingNotifications checks for and processes any pending push notifications. func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { - // Check for nil reader - if rd == nil { - return nil + return p.processor.ProcessPendingNotifications(ctx, rd) +} + +// GetRegistryForTesting returns the push notification registry for testing. +func (p *PushNotificationProcessor) GetRegistryForTesting() *PushNotificationRegistry { + return &PushNotificationRegistry{ + registry: p.processor.GetRegistryForTesting(), } +} - // Check if there are any buffered bytes that might contain push notifications - if rd.Buffered() == 0 { - return nil - } - - // Process any pending push notifications - for { - // Peek at the next reply type to see if it's a push notification - replyType, err := rd.PeekReplyType() - if err != nil { - // No more data available or error peeking - break - } - - // Check if this is a RESP3 push notification - if replyType == '>' { // RespPush - // Read the push notification - reply, err := rd.ReadReply() - if err != nil { - internal.Logger.Printf(ctx, "push: error reading push notification: %v", err) - break - } - - // Process the push notification - if pushSlice, ok := reply.([]interface{}); ok && len(pushSlice) > 0 { - handled := p.registry.HandleNotification(ctx, pushSlice) - if handled { - internal.Logger.Printf(ctx, "push: processed push notification: %v", pushSlice[0]) - } else { - internal.Logger.Printf(ctx, "push: unhandled push notification: %v", pushSlice[0]) - } - } else { - internal.Logger.Printf(ctx, "push: invalid push notification format: %v", reply) - } - } else { - // Not a push notification, stop processing - break - } +// VoidPushNotificationProcessor discards all push notifications without processing them. +type VoidPushNotificationProcessor struct { + processor *pushnotif.VoidProcessor +} + +// NewVoidPushNotificationProcessor creates a new void push notification processor. +func NewVoidPushNotificationProcessor() *VoidPushNotificationProcessor { + return &VoidPushNotificationProcessor{ + processor: pushnotif.NewVoidProcessor(), } +} +// GetHandler returns nil for void processor since it doesn't maintain handlers. +func (v *VoidPushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler { return nil } -// RegisterHandler is a convenience method to register a handler for a specific push notification name. -// Returns an error if a handler is already registered for this push notification name. -// If protected is true, the handler cannot be unregistered. -func (p *PushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - return p.registry.RegisterHandler(pushNotificationName, handler, protected) +// RegisterHandler returns an error for void processor since it doesn't maintain handlers. +func (v *VoidPushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { + return v.processor.RegisterHandler(pushNotificationName, nil, protected) +} + +// ProcessPendingNotifications reads and discards any pending push notifications. +func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { + return v.processor.ProcessPendingNotifications(ctx, rd) +} + +// GetRegistryForTesting returns nil for void processor since it doesn't maintain handlers. +func (v *VoidPushNotificationProcessor) GetRegistryForTesting() *PushNotificationRegistry { + return nil +} + +// handlerWrapper wraps the public PushNotificationHandler interface to implement the internal Handler interface. +type handlerWrapper struct { + handler PushNotificationHandler +} + +func (w *handlerWrapper) HandlePushNotification(ctx context.Context, notification []interface{}) bool { + return w.handler.HandlePushNotification(ctx, notification) } // Redis Cluster push notification names const ( - PushNotificationMoving = "MOVING" - PushNotificationMigrating = "MIGRATING" - PushNotificationMigrated = "MIGRATED" + PushNotificationMoving = "MOVING" + PushNotificationMigrating = "MIGRATING" + PushNotificationMigrated = "MIGRATED" PushNotificationFailingOver = "FAILING_OVER" PushNotificationFailedOver = "FAILED_OVER" ) @@ -236,63 +191,3 @@ func (info *PushNotificationInfo) String() string { } return info.Name } - -// VoidPushNotificationProcessor is a no-op processor that discards all push notifications. -// Used when push notifications are disabled to avoid nil checks throughout the codebase. -type VoidPushNotificationProcessor struct{} - -// NewVoidPushNotificationProcessor creates a new void push notification processor. -func NewVoidPushNotificationProcessor() *VoidPushNotificationProcessor { - return &VoidPushNotificationProcessor{} -} - -// GetHandler returns nil for void processor since it doesn't maintain handlers. -func (v *VoidPushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler { - return nil -} - -// GetRegistryForTesting returns nil for void processor since it doesn't maintain handlers. -// This method should only be used by tests. -func (v *VoidPushNotificationProcessor) GetRegistryForTesting() *PushNotificationRegistry { - return nil -} - -// ProcessPendingNotifications reads and discards any pending push notifications. -func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { - // Check for nil reader - if rd == nil { - return nil - } - - // Read and discard any pending push notifications to clean the buffer - for { - // Peek at the next reply type to see if it's a push notification - replyType, err := rd.PeekReplyType() - if err != nil { - // No more data available or error peeking - break - } - - // Check if this is a RESP3 push notification - if replyType == '>' { // RespPush - // Read and discard the push notification - _, err := rd.ReadReply() - if err != nil { - internal.Logger.Printf(ctx, "push: error reading push notification to discard: %v", err) - break - } - // Continue to check for more push notifications - } else { - // Not a push notification, stop processing - break - } - } - - return nil -} - -// RegisterHandler is a no-op for void processor, always returns nil. -func (v *VoidPushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - // No-op: void processor doesn't register handlers - return nil -}