diff --git a/options.go b/options.go index b93df01e..00568c6c 100644 --- a/options.go +++ b/options.go @@ -15,6 +15,7 @@ import ( "github.com/redis/go-redis/v9/auth" "github.com/redis/go-redis/v9/internal/pool" + "github.com/redis/go-redis/v9/push" ) // Limiter is the interface of a rate limiter or a circuit breaker. @@ -222,7 +223,7 @@ type Options struct { // PushNotificationProcessor is the processor for handling push notifications. // If nil, a default processor will be created for RESP3 connections. - PushNotificationProcessor PushNotificationProcessorInterface + PushNotificationProcessor push.NotificationProcessor } func (opt *Options) init() { diff --git a/pubsub.go b/pubsub.go index 243c3979..218a06d2 100644 --- a/pubsub.go +++ b/pubsub.go @@ -10,6 +10,7 @@ import ( "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/proto" + "github.com/redis/go-redis/v9/push" ) // PubSub implements Pub/Sub commands as described in @@ -40,7 +41,7 @@ type PubSub struct { allCh *channel // Push notification processor for handling generic push notifications - pushProcessor PushNotificationProcessorInterface + pushProcessor push.NotificationProcessor } func (c *PubSub) init() { @@ -551,14 +552,13 @@ func (c *PubSub) processPendingPushNotificationWithReader(ctx context.Context, c return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd) } -func (c *PubSub) pushNotificationHandlerContext(cn *pool.Conn) PushNotificationHandlerContext { +func (c *PubSub) pushNotificationHandlerContext(cn *pool.Conn) push.NotificationHandlerContext { // PubSub doesn't have a client or connection pool, so we pass nil for those // PubSub connections are blocking - return NewPushNotificationHandlerContext(nil, nil, c, cn, true) + return push.HandlerContext{} + return push.NewNotificationHandlerContext(nil, nil, c, cn, true) } - - type ChannelOption func(c *channel) // WithChannelSize specifies the Go chan size that is used to buffer incoming messages. diff --git a/push/errors.go b/push/errors.go new file mode 100644 index 00000000..8f6c2a16 --- /dev/null +++ b/push/errors.go @@ -0,0 +1,150 @@ +package push + +import ( + "errors" + "fmt" + "strings" +) + +// Push notification error definitions +// This file contains all error types and messages used by the push notification system + +// Common error variables for reuse +var ( + // ErrHandlerNil is returned when attempting to register a nil handler + ErrHandlerNil = errors.New("handler cannot be nil") +) + +// Registry errors + +// ErrHandlerExists creates an error for when attempting to overwrite an existing handler +func ErrHandlerExists(pushNotificationName string) error { + return fmt.Errorf("cannot overwrite existing handler for push notification: %s", pushNotificationName) +} + +// ErrProtectedHandler creates an error for when attempting to unregister a protected handler +func ErrProtectedHandler(pushNotificationName string) error { + return fmt.Errorf("cannot unregister protected handler for push notification: %s", pushNotificationName) +} + +// VoidProcessor errors + +// ErrVoidProcessorRegister creates an error for when attempting to register a handler on void processor +func ErrVoidProcessorRegister(pushNotificationName string) error { + return fmt.Errorf("cannot register push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName) +} + +// ErrVoidProcessorUnregister creates an error for when attempting to unregister a handler on void processor +func ErrVoidProcessorUnregister(pushNotificationName string) error { + return fmt.Errorf("cannot unregister push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName) +} + +// Error message constants for consistency +const ( + // Error message templates + MsgHandlerNil = "handler cannot be nil" + MsgHandlerExists = "cannot overwrite existing handler for push notification: %s" + MsgProtectedHandler = "cannot unregister protected handler for push notification: %s" + MsgVoidProcessorRegister = "cannot register push notification handler '%s': push notifications are disabled (using void processor)" + MsgVoidProcessorUnregister = "cannot unregister push notification handler '%s': push notifications are disabled (using void processor)" +) + +// Error type definitions for advanced error handling + +// HandlerError represents errors related to handler operations +type HandlerError struct { + Operation string // "register", "unregister", "get" + PushNotificationName string + Reason string + Err error +} + +func (e *HandlerError) Error() string { + if e.Err != nil { + return fmt.Sprintf("handler %s failed for '%s': %s (%v)", e.Operation, e.PushNotificationName, e.Reason, e.Err) + } + return fmt.Sprintf("handler %s failed for '%s': %s", e.Operation, e.PushNotificationName, e.Reason) +} + +func (e *HandlerError) Unwrap() error { + return e.Err +} + +// NewHandlerError creates a new HandlerError +func NewHandlerError(operation, pushNotificationName, reason string, err error) *HandlerError { + return &HandlerError{ + Operation: operation, + PushNotificationName: pushNotificationName, + Reason: reason, + Err: err, + } +} + +// ProcessorError represents errors related to processor operations +type ProcessorError struct { + ProcessorType string // "processor", "void_processor" + Operation string // "process", "register", "unregister" + Reason string + Err error +} + +func (e *ProcessorError) Error() string { + if e.Err != nil { + return fmt.Sprintf("%s %s failed: %s (%v)", e.ProcessorType, e.Operation, e.Reason, e.Err) + } + return fmt.Sprintf("%s %s failed: %s", e.ProcessorType, e.Operation, e.Reason) +} + +func (e *ProcessorError) Unwrap() error { + return e.Err +} + +// NewProcessorError creates a new ProcessorError +func NewProcessorError(processorType, operation, reason string, err error) *ProcessorError { + return &ProcessorError{ + ProcessorType: processorType, + Operation: operation, + Reason: reason, + Err: err, + } +} + +// Helper functions for common error scenarios + +// IsHandlerNilError checks if an error is due to a nil handler +func IsHandlerNilError(err error) bool { + return errors.Is(err, ErrHandlerNil) +} + +// IsHandlerExistsError checks if an error is due to attempting to overwrite an existing handler +func IsHandlerExistsError(err error) bool { + if err == nil { + return false + } + return fmt.Sprintf("%v", err) == fmt.Sprintf(MsgHandlerExists, extractNotificationName(err)) +} + +// IsProtectedHandlerError checks if an error is due to attempting to unregister a protected handler +func IsProtectedHandlerError(err error) bool { + if err == nil { + return false + } + return fmt.Sprintf("%v", err) == fmt.Sprintf(MsgProtectedHandler, extractNotificationName(err)) +} + +// IsVoidProcessorError checks if an error is due to void processor operations +func IsVoidProcessorError(err error) bool { + if err == nil { + return false + } + errStr := err.Error() + return strings.Contains(errStr, "push notifications are disabled (using void processor)") +} + +// extractNotificationName attempts to extract the notification name from error messages +// This is a helper function for error type checking +func extractNotificationName(err error) string { + // This is a simplified implementation - in practice, you might want more sophisticated parsing + // For now, we return a placeholder since the exact extraction logic depends on the error format + return "unknown" +} diff --git a/push/handler.go b/push/handler.go new file mode 100644 index 00000000..815edce3 --- /dev/null +++ b/push/handler.go @@ -0,0 +1,14 @@ +package push + +import ( + "context" +) + +// NotificationHandler defines the interface for push notification handlers. +type NotificationHandler interface { + // HandlePushNotification processes a push notification with context information. + // The handlerCtx provides information about the client, connection pool, and connection + // on which the notification was received, allowing handlers to make informed decisions. + // Returns an error if the notification could not be handled. + HandlePushNotification(ctx context.Context, handlerCtx NotificationHandlerContext, notification []interface{}) error +} diff --git a/push/handler_context.go b/push/handler_context.go new file mode 100644 index 00000000..ab6b7dd1 --- /dev/null +++ b/push/handler_context.go @@ -0,0 +1,89 @@ +package push + +import ( + "github.com/redis/go-redis/v9/internal/pool" +) + +// NotificationHandlerContext provides context information about where a push notification was received. +// This interface allows handlers to make informed decisions based on the source of the notification +// with strongly typed access to different client types using concrete types. +type NotificationHandlerContext interface { + // GetClient returns the Redis client instance that received the notification. + // Returns nil if no client context is available. + // It is interface to both allow for future expansion and to avoid + // circular dependencies. The developer is responsible for type assertion. + // It can be one of the following types: + // - *redis.Client + // - *redis.ClusterClient + // - *redis.Conn + GetClient() interface{} + + // GetConnPool returns the connection pool from which the connection was obtained. + // Returns nil if no connection pool context is available. + // It is interface to both allow for future expansion and to avoid + // circular dependencies. The developer is responsible for type assertion. + // It can be one of the following types: + // - *pool.ConnPool + // - *pool.SingleConnPool + // - *pool.StickyConnPool + GetConnPool() interface{} + + // GetPubSub returns the PubSub instance that received the notification. + // Returns nil if this is not a PubSub connection. + // It is interface to both allow for future expansion and to avoid + // circular dependencies. The developer is responsible for type assertion. + // It can be one of the following types: + // - *redis.PubSub + GetPubSub() interface{} + + // GetConn returns the specific connection on which the notification was received. + // Returns nil if no connection context is available. + GetConn() *pool.Conn + + // IsBlocking returns true if the notification was received on a blocking connection. + IsBlocking() bool +} + +// pushNotificationHandlerContext is the concrete implementation of PushNotificationHandlerContext interface +type pushNotificationHandlerContext struct { + client interface{} + connPool interface{} + pubSub interface{} + conn *pool.Conn + isBlocking bool +} + +// NewNotificationHandlerContext creates a new push.NotificationHandlerContext instance +func NewNotificationHandlerContext(client, connPool, pubSub interface{}, conn *pool.Conn, isBlocking bool) NotificationHandlerContext { + return &pushNotificationHandlerContext{ + client: client, + connPool: connPool, + pubSub: pubSub, + conn: conn, + isBlocking: isBlocking, + } +} + +// GetClient returns the Redis client instance that received the notification +func (h *pushNotificationHandlerContext) GetClient() interface{} { + return h.client +} + +// GetConnPool returns the connection pool from which the connection was obtained +func (h *pushNotificationHandlerContext) GetConnPool() interface{} { + return h.connPool +} + +func (h *pushNotificationHandlerContext) GetPubSub() interface{} { + return h.pubSub +} + +// GetConn returns the specific connection on which the notification was received +func (h *pushNotificationHandlerContext) GetConn() *pool.Conn { + return h.conn +} + +// IsBlocking returns true if the notification was received on a blocking connection +func (h *pushNotificationHandlerContext) IsBlocking() bool { + return h.isBlocking +} diff --git a/push_notification_processor.go b/push/processor.go similarity index 56% rename from push_notification_processor.go rename to push/processor.go index 38877206..3b65b126 100644 --- a/push_notification_processor.go +++ b/push/processor.go @@ -1,67 +1,22 @@ -package redis +package push import ( "context" - "fmt" "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/proto" ) -// Registry manages push notification handlers -type Registry struct { - handlers map[string]PushNotificationHandler - protected map[string]bool -} - -// NewRegistry creates a new push notification registry -func NewRegistry() *Registry { - return &Registry{ - handlers: make(map[string]PushNotificationHandler), - protected: make(map[string]bool), - } -} - -// RegisterHandler registers a handler for a specific push notification name -func (r *Registry) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - if handler == nil { - return fmt.Errorf("handler cannot be nil") - } - - // Check if handler already exists and is protected - if existingProtected, exists := r.protected[pushNotificationName]; exists && existingProtected { - return fmt.Errorf("cannot overwrite protected handler for push notification: %s", pushNotificationName) - } - - r.handlers[pushNotificationName] = handler - r.protected[pushNotificationName] = protected - return nil -} - -// GetHandler returns the handler for a specific push notification name -func (r *Registry) GetHandler(pushNotificationName string) PushNotificationHandler { - return r.handlers[pushNotificationName] -} - -// UnregisterHandler removes a handler for a specific push notification name -func (r *Registry) UnregisterHandler(pushNotificationName string) error { - // Check if handler is protected - if protected, exists := r.protected[pushNotificationName]; exists && protected { - return fmt.Errorf("cannot unregister protected handler for push notification: %s", pushNotificationName) - } - - delete(r.handlers, pushNotificationName) - delete(r.protected, pushNotificationName) - return nil -} - -// GetRegisteredPushNotificationNames returns all registered push notification names -func (r *Registry) GetRegisteredPushNotificationNames() []string { - names := make([]string, 0, len(r.handlers)) - for name := range r.handlers { - names = append(names, name) - } - return names +// NotificationProcessor defines the interface for push notification processors. +type NotificationProcessor interface { + // GetHandler returns the handler for a specific push notification name. + GetHandler(pushNotificationName string) NotificationHandler + // ProcessPendingNotifications checks for and processes any pending push notifications. + ProcessPendingNotifications(ctx context.Context, handlerCtx NotificationHandlerContext, rd *proto.Reader) error + // RegisterHandler registers a handler for a specific push notification name. + RegisterHandler(pushNotificationName string, handler NotificationHandler, protected bool) error + // UnregisterHandler removes a handler for a specific push notification name. + UnregisterHandler(pushNotificationName string) error } // Processor handles push notifications with a registry of handlers @@ -77,12 +32,12 @@ func NewProcessor() *Processor { } // GetHandler returns the handler for a specific push notification name -func (p *Processor) GetHandler(pushNotificationName string) PushNotificationHandler { +func (p *Processor) GetHandler(pushNotificationName string) NotificationHandler { return p.registry.GetHandler(pushNotificationName) } // RegisterHandler registers a handler for a specific push notification name -func (p *Processor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { +func (p *Processor) RegisterHandler(pushNotificationName string, handler NotificationHandler, protected bool) error { return p.registry.RegisterHandler(pushNotificationName, handler, protected) } @@ -92,7 +47,7 @@ func (p *Processor) UnregisterHandler(pushNotificationName string) error { } // ProcessPendingNotifications checks for and processes any pending push notifications -func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { +func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx NotificationHandlerContext, rd *proto.Reader) error { if rd == nil { return nil } @@ -135,7 +90,10 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx // Get the handler for this notification type if handler := p.registry.GetHandler(notificationType); handler != nil { // Handle the notification - handler.HandlePushNotification(ctx, handlerCtx, notification) + err := handler.HandlePushNotification(ctx, handlerCtx, notification) + if err != nil { + internal.Logger.Printf(ctx, "push: error handling push notification: %v", err) + } } } } @@ -144,6 +102,69 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx return nil } +// VoidProcessor discards all push notifications without processing them +type VoidProcessor struct{} + +// NewVoidProcessor creates a new void push notification processor +func NewVoidProcessor() *VoidProcessor { + return &VoidProcessor{} +} + +// GetHandler returns nil for void processor since it doesn't maintain handlers +func (v *VoidProcessor) GetHandler(_ string) NotificationHandler { + return nil +} + +// RegisterHandler returns an error for void processor since it doesn't maintain handlers +func (v *VoidProcessor) RegisterHandler(pushNotificationName string, _ NotificationHandler, _ bool) error { + return ErrVoidProcessorRegister(pushNotificationName) +} + +// UnregisterHandler returns an error for void processor since it doesn't maintain handlers +func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error { + return ErrVoidProcessorUnregister(pushNotificationName) +} + +// ProcessPendingNotifications for VoidProcessor does nothing since push notifications +// are only available in RESP3 and this processor is used for RESP2 connections. +// This avoids unnecessary buffer scanning overhead. +func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, _ NotificationHandlerContext, rd *proto.Reader) error { + // read and discard all push notifications + if rd != nil { + for { + replyType, err := rd.PeekReplyType() + if err != nil { + // No more data available or error reading + break + } + + // Only process push notifications (arrays starting with >) + if replyType != proto.RespPush { + break + } + // see if we should skip this notification + notificationName, err := rd.PeekPushNotificationName() + if err != nil { + break + } + if shouldSkipNotification(notificationName) { + // discard the notification + if err := rd.DiscardNext(); err != nil { + break + } + continue + } + + // Read the push notification + _, err = rd.ReadReply() + if err != nil { + return nil + } + } + } + return nil +} + // shouldSkipNotification checks if a notification type should be ignored by the push notification // processor and handled by other specialized systems instead (pub/sub, streams, keyspace, etc.). func shouldSkipNotification(notificationType string) bool { @@ -163,36 +184,3 @@ func shouldSkipNotification(notificationType string) bool { return false } } - -// VoidProcessor discards all push notifications without processing them -type VoidProcessor struct{} - -// NewVoidProcessor creates a new void push notification processor -func NewVoidProcessor() *VoidProcessor { - return &VoidProcessor{} -} - -// GetHandler returns nil for void processor since it doesn't maintain handlers -func (v *VoidProcessor) GetHandler(pushNotificationName string) PushNotificationHandler { - return nil -} - -// RegisterHandler returns an error for void processor since it doesn't maintain handlers -func (v *VoidProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - return fmt.Errorf("cannot register push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName) -} - -// UnregisterHandler returns an error for void processor since it doesn't maintain handlers -func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error { - return fmt.Errorf("cannot unregister push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName) -} - -// ProcessPendingNotifications for VoidProcessor does nothing since push notifications -// are only available in RESP3 and this processor is used for RESP2 connections. -// This avoids unnecessary buffer scanning overhead. -func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { - // VoidProcessor is used for RESP2 connections where push notifications are not available. - // Since push notifications only exist in RESP3, we can safely skip all processing - // to avoid unnecessary buffer scanning overhead. - return nil -} diff --git a/push/push.go b/push/push.go new file mode 100644 index 00000000..e6adeaa4 --- /dev/null +++ b/push/push.go @@ -0,0 +1,7 @@ +// Package push provides push notifications for Redis. +// This is an EXPERIMENTAL API for handling push notifications from Redis. +// It is not yet stable and may change in the future. +// Although this is in a public package, in its current form public use is not advised. +// Pending push notifications should be processed before executing any readReply from the connection +// as per RESP3 specification push notifications can be sent at any time. +package push diff --git a/push/push_test.go b/push/push_test.go new file mode 100644 index 00000000..0fe7e0f4 --- /dev/null +++ b/push/push_test.go @@ -0,0 +1,1554 @@ +package push + +import ( + "bytes" + "context" + "errors" + "fmt" + "strings" + "testing" + + "github.com/redis/go-redis/v9/internal/pool" + "github.com/redis/go-redis/v9/internal/proto" +) + +// TestHandler implements NotificationHandler interface for testing +type TestHandler struct { + name string + handled [][]interface{} + returnError error +} + +func NewTestHandler(name string) *TestHandler { + return &TestHandler{ + name: name, + handled: make([][]interface{}, 0), + } +} + +func (h *TestHandler) HandlePushNotification(ctx context.Context, handlerCtx NotificationHandlerContext, notification []interface{}) error { + h.handled = append(h.handled, notification) + return h.returnError +} + +func (h *TestHandler) GetHandledNotifications() [][]interface{} { + return h.handled +} + +func (h *TestHandler) SetReturnError(err error) { + h.returnError = err +} + +func (h *TestHandler) Reset() { + h.handled = make([][]interface{}, 0) + h.returnError = nil +} + +// Mock client types for testing +type MockClient struct { + name string +} + +type MockConnPool struct { + name string +} + +type MockPubSub struct { + name string +} + +// TestNotificationHandlerContext tests the handler context implementation +func TestNotificationHandlerContext(t *testing.T) { + t.Run("NewNotificationHandlerContext", func(t *testing.T) { + client := &MockClient{name: "test-client"} + connPool := &MockConnPool{name: "test-pool"} + pubSub := &MockPubSub{name: "test-pubsub"} + conn := &pool.Conn{} + + ctx := NewNotificationHandlerContext(client, connPool, pubSub, conn, true) + if ctx == nil { + t.Error("NewNotificationHandlerContext should not return nil") + } + + if ctx.GetClient() != client { + t.Error("GetClient should return the provided client") + } + + if ctx.GetConnPool() != connPool { + t.Error("GetConnPool should return the provided connection pool") + } + + if ctx.GetPubSub() != pubSub { + t.Error("GetPubSub should return the provided PubSub") + } + + if ctx.GetConn() != conn { + t.Error("GetConn should return the provided connection") + } + + if !ctx.IsBlocking() { + t.Error("IsBlocking should return true") + } + }) + + t.Run("NilValues", func(t *testing.T) { + ctx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + if ctx.GetClient() != nil { + t.Error("GetClient should return nil when client is nil") + } + + if ctx.GetConnPool() != nil { + t.Error("GetConnPool should return nil when connPool is nil") + } + + if ctx.GetPubSub() != nil { + t.Error("GetPubSub should return nil when pubSub is nil") + } + + if ctx.GetConn() != nil { + t.Error("GetConn should return nil when conn is nil") + } + + if ctx.IsBlocking() { + t.Error("IsBlocking should return false") + } + }) +} + +// TestRegistry tests the registry implementation +func TestRegistry(t *testing.T) { + t.Run("NewRegistry", func(t *testing.T) { + registry := NewRegistry() + if registry == nil { + t.Error("NewRegistry should not return nil") + } + + if registry.handlers == nil { + t.Error("Registry handlers map should be initialized") + } + + if registry.protected == nil { + t.Error("Registry protected map should be initialized") + } + }) + + t.Run("RegisterHandler", func(t *testing.T) { + registry := NewRegistry() + handler := NewTestHandler("test") + + err := registry.RegisterHandler("TEST", handler, false) + if err != nil { + t.Errorf("RegisterHandler should not error: %v", err) + } + + retrievedHandler := registry.GetHandler("TEST") + if retrievedHandler != handler { + t.Error("GetHandler should return the registered handler") + } + }) + + t.Run("RegisterNilHandler", func(t *testing.T) { + registry := NewRegistry() + + err := registry.RegisterHandler("TEST", nil, false) + if err == nil { + t.Error("RegisterHandler should error when handler is nil") + } + + if !strings.Contains(err.Error(), "handler cannot be nil") { + t.Errorf("Error message should mention nil handler, got: %v", err) + } + }) + + t.Run("RegisterProtectedHandler", func(t *testing.T) { + registry := NewRegistry() + handler := NewTestHandler("test") + + // Register protected handler + err := registry.RegisterHandler("TEST", handler, true) + if err != nil { + t.Errorf("RegisterHandler should not error: %v", err) + } + + // Try to overwrite any existing handler (protected or not) + newHandler := NewTestHandler("new") + err = registry.RegisterHandler("TEST", newHandler, false) + if err == nil { + t.Error("RegisterHandler should error when trying to overwrite existing handler") + } + + if !strings.Contains(err.Error(), "cannot overwrite existing handler") { + t.Errorf("Error message should mention existing handler, got: %v", err) + } + + // Original handler should still be there + retrievedHandler := registry.GetHandler("TEST") + if retrievedHandler != handler { + t.Error("Existing handler should not be overwritten") + } + }) + + t.Run("CannotOverwriteExistingHandler", func(t *testing.T) { + registry := NewRegistry() + handler1 := NewTestHandler("test1") + handler2 := NewTestHandler("test2") + + // Register non-protected handler + err := registry.RegisterHandler("TEST", handler1, false) + if err != nil { + t.Errorf("RegisterHandler should not error: %v", err) + } + + // Try to overwrite with another handler (should fail) + err = registry.RegisterHandler("TEST", handler2, false) + if err == nil { + t.Error("RegisterHandler should error when trying to overwrite existing handler") + } + + if !strings.Contains(err.Error(), "cannot overwrite existing handler") { + t.Errorf("Error message should mention existing handler, got: %v", err) + } + + // Original handler should still be there + retrievedHandler := registry.GetHandler("TEST") + if retrievedHandler != handler1 { + t.Error("Existing handler should not be overwritten") + } + }) + + t.Run("GetNonExistentHandler", func(t *testing.T) { + registry := NewRegistry() + + handler := registry.GetHandler("NONEXISTENT") + if handler != nil { + t.Error("GetHandler should return nil for non-existent handler") + } + }) + + t.Run("UnregisterHandler", func(t *testing.T) { + registry := NewRegistry() + handler := NewTestHandler("test") + + registry.RegisterHandler("TEST", handler, false) + + err := registry.UnregisterHandler("TEST") + if err != nil { + t.Errorf("UnregisterHandler should not error: %v", err) + } + + retrievedHandler := registry.GetHandler("TEST") + if retrievedHandler != nil { + t.Error("GetHandler should return nil after unregistering") + } + }) + + t.Run("UnregisterProtectedHandler", func(t *testing.T) { + registry := NewRegistry() + handler := NewTestHandler("test") + + // Register protected handler + registry.RegisterHandler("TEST", handler, true) + + // Try to unregister protected handler + err := registry.UnregisterHandler("TEST") + if err == nil { + t.Error("UnregisterHandler should error for protected handler") + } + + if !strings.Contains(err.Error(), "cannot unregister protected handler") { + t.Errorf("Error message should mention protected handler, got: %v", err) + } + + // Handler should still be there + retrievedHandler := registry.GetHandler("TEST") + if retrievedHandler != handler { + t.Error("Protected handler should still be registered") + } + }) + + t.Run("UnregisterNonExistentHandler", func(t *testing.T) { + registry := NewRegistry() + + err := registry.UnregisterHandler("NONEXISTENT") + if err != nil { + t.Errorf("UnregisterHandler should not error for non-existent handler: %v", err) + } + }) + + t.Run("CannotOverwriteExistingHandler", func(t *testing.T) { + registry := NewRegistry() + handler1 := NewTestHandler("handler1") + handler2 := NewTestHandler("handler2") + + // Register first handler (non-protected) + err := registry.RegisterHandler("TEST_NOTIFICATION", handler1, false) + if err != nil { + t.Errorf("First RegisterHandler should not error: %v", err) + } + + // Verify first handler is registered + retrievedHandler := registry.GetHandler("TEST_NOTIFICATION") + if retrievedHandler != handler1 { + t.Error("First handler should be registered correctly") + } + + // Attempt to overwrite with second handler (should fail) + err = registry.RegisterHandler("TEST_NOTIFICATION", handler2, false) + if err == nil { + t.Error("RegisterHandler should error when trying to overwrite existing handler") + } + + // Verify error message mentions overwriting + if !strings.Contains(err.Error(), "cannot overwrite existing handler") { + t.Errorf("Error message should mention overwriting existing handler, got: %v", err) + } + + // Verify error message includes the notification name + if !strings.Contains(err.Error(), "TEST_NOTIFICATION") { + t.Errorf("Error message should include notification name, got: %v", err) + } + + // Verify original handler is still there (not overwritten) + retrievedHandler = registry.GetHandler("TEST_NOTIFICATION") + if retrievedHandler != handler1 { + t.Error("Original handler should still be registered (not overwritten)") + } + + // Verify second handler was NOT registered + if retrievedHandler == handler2 { + t.Error("Second handler should NOT be registered") + } + }) + + t.Run("CannotOverwriteProtectedHandler", func(t *testing.T) { + registry := NewRegistry() + protectedHandler := NewTestHandler("protected") + newHandler := NewTestHandler("new") + + // Register protected handler + err := registry.RegisterHandler("PROTECTED_NOTIFICATION", protectedHandler, true) + if err != nil { + t.Errorf("RegisterHandler should not error for protected handler: %v", err) + } + + // Attempt to overwrite protected handler (should fail) + err = registry.RegisterHandler("PROTECTED_NOTIFICATION", newHandler, false) + if err == nil { + t.Error("RegisterHandler should error when trying to overwrite protected handler") + } + + // Verify error message + if !strings.Contains(err.Error(), "cannot overwrite existing handler") { + t.Errorf("Error message should mention overwriting existing handler, got: %v", err) + } + + // Verify protected handler is still there + retrievedHandler := registry.GetHandler("PROTECTED_NOTIFICATION") + if retrievedHandler != protectedHandler { + t.Error("Protected handler should still be registered") + } + }) + + t.Run("CanRegisterDifferentHandlers", func(t *testing.T) { + registry := NewRegistry() + handler1 := NewTestHandler("handler1") + handler2 := NewTestHandler("handler2") + + // Register handlers for different notification names (should succeed) + err := registry.RegisterHandler("NOTIFICATION_1", handler1, false) + if err != nil { + t.Errorf("RegisterHandler should not error for first notification: %v", err) + } + + err = registry.RegisterHandler("NOTIFICATION_2", handler2, true) + if err != nil { + t.Errorf("RegisterHandler should not error for second notification: %v", err) + } + + // Verify both handlers are registered correctly + retrievedHandler1 := registry.GetHandler("NOTIFICATION_1") + if retrievedHandler1 != handler1 { + t.Error("First handler should be registered correctly") + } + + retrievedHandler2 := registry.GetHandler("NOTIFICATION_2") + if retrievedHandler2 != handler2 { + t.Error("Second handler should be registered correctly") + } + }) +} + +// TestProcessor tests the processor implementation +func TestProcessor(t *testing.T) { + t.Run("NewProcessor", func(t *testing.T) { + processor := NewProcessor() + if processor == nil { + t.Error("NewProcessor should not return nil") + } + + if processor.registry == nil { + t.Error("Processor should have a registry") + } + }) + + t.Run("RegisterAndGetHandler", func(t *testing.T) { + processor := NewProcessor() + handler := NewTestHandler("test") + + err := processor.RegisterHandler("TEST", handler, false) + if err != nil { + t.Errorf("RegisterHandler should not error: %v", err) + } + + retrievedHandler := processor.GetHandler("TEST") + if retrievedHandler != handler { + t.Error("GetHandler should return the registered handler") + } + }) + + t.Run("UnregisterHandler", func(t *testing.T) { + processor := NewProcessor() + handler := NewTestHandler("test") + + processor.RegisterHandler("TEST", handler, false) + + err := processor.UnregisterHandler("TEST") + if err != nil { + t.Errorf("UnregisterHandler should not error: %v", err) + } + + retrievedHandler := processor.GetHandler("TEST") + if retrievedHandler != nil { + t.Error("GetHandler should return nil after unregistering") + } + }) + + t.Run("ProcessPendingNotifications_NilReader", func(t *testing.T) { + processor := NewProcessor() + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, nil) + if err != nil { + t.Errorf("ProcessPendingNotifications should not error with nil reader: %v", err) + } + }) +} + +// TestVoidProcessor tests the void processor implementation +func TestVoidProcessor(t *testing.T) { + t.Run("NewVoidProcessor", func(t *testing.T) { + processor := NewVoidProcessor() + if processor == nil { + t.Error("NewVoidProcessor should not return nil") + } + }) + + t.Run("GetHandler", func(t *testing.T) { + processor := NewVoidProcessor() + handler := processor.GetHandler("TEST") + if handler != nil { + t.Error("VoidProcessor GetHandler should always return nil") + } + }) + + t.Run("RegisterHandler", func(t *testing.T) { + processor := NewVoidProcessor() + handler := NewTestHandler("test") + + err := processor.RegisterHandler("TEST", handler, false) + if err == nil { + t.Error("VoidProcessor RegisterHandler should return error") + } + + if !strings.Contains(err.Error(), "cannot register push notification handler") { + t.Errorf("Error message should mention registration failure, got: %v", err) + } + + if !strings.Contains(err.Error(), "push notifications are disabled") { + t.Errorf("Error message should mention disabled notifications, got: %v", err) + } + }) + + t.Run("UnregisterHandler", func(t *testing.T) { + processor := NewVoidProcessor() + + err := processor.UnregisterHandler("TEST") + if err == nil { + t.Error("VoidProcessor UnregisterHandler should return error") + } + + if !strings.Contains(err.Error(), "cannot unregister push notification handler") { + t.Errorf("Error message should mention unregistration failure, got: %v", err) + } + }) + + t.Run("ProcessPendingNotifications_NilReader", func(t *testing.T) { + processor := NewVoidProcessor() + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, nil) + if err != nil { + t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err) + } + }) +} + +// TestShouldSkipNotification tests the notification filtering logic +func TestShouldSkipNotification(t *testing.T) { + testCases := []struct { + name string + notification string + shouldSkip bool + }{ + // Pub/Sub notifications that should be skipped + {"message", "message", true}, + {"pmessage", "pmessage", true}, + {"subscribe", "subscribe", true}, + {"unsubscribe", "unsubscribe", true}, + {"psubscribe", "psubscribe", true}, + {"punsubscribe", "punsubscribe", true}, + {"smessage", "smessage", true}, + {"ssubscribe", "ssubscribe", true}, + {"sunsubscribe", "sunsubscribe", true}, + + // Push notifications that should NOT be skipped + {"MOVING", "MOVING", false}, + {"MIGRATING", "MIGRATING", false}, + {"MIGRATED", "MIGRATED", false}, + {"FAILING_OVER", "FAILING_OVER", false}, + {"FAILED_OVER", "FAILED_OVER", false}, + {"custom", "custom", false}, + {"unknown", "unknown", false}, + {"empty", "", false}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := shouldSkipNotification(tc.notification) + if result != tc.shouldSkip { + t.Errorf("shouldSkipNotification(%q) = %v, want %v", tc.notification, result, tc.shouldSkip) + } + }) + } +} + +// TestNotificationHandlerInterface tests that our test handler implements the interface correctly +func TestNotificationHandlerInterface(t *testing.T) { + var _ NotificationHandler = (*TestHandler)(nil) + + handler := NewTestHandler("test") + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + notification := []interface{}{"TEST", "data"} + + err := handler.HandlePushNotification(ctx, handlerCtx, notification) + if err != nil { + t.Errorf("HandlePushNotification should not error: %v", err) + } + + handled := handler.GetHandledNotifications() + if len(handled) != 1 { + t.Errorf("Expected 1 handled notification, got %d", len(handled)) + } + + if len(handled[0]) != 2 || handled[0][0] != "TEST" || handled[0][1] != "data" { + t.Errorf("Handled notification should match input: %v", handled[0]) + } +} + +// TestNotificationHandlerError tests error handling in handlers +func TestNotificationHandlerError(t *testing.T) { + handler := NewTestHandler("test") + expectedError := errors.New("test error") + handler.SetReturnError(expectedError) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + notification := []interface{}{"TEST", "data"} + + err := handler.HandlePushNotification(ctx, handlerCtx, notification) + if err != expectedError { + t.Errorf("HandlePushNotification should return the set error: got %v, want %v", err, expectedError) + } + + // Reset and test no error + handler.Reset() + err = handler.HandlePushNotification(ctx, handlerCtx, notification) + if err != nil { + t.Errorf("HandlePushNotification should not error after reset: %v", err) + } +} + +// TestRegistryConcurrency tests concurrent access to registry +func TestRegistryConcurrency(t *testing.T) { + registry := NewRegistry() + + // Test concurrent registration and access + done := make(chan bool, 10) + + // Start multiple goroutines registering handlers + for i := 0; i < 5; i++ { + go func(id int) { + handler := NewTestHandler("test") + err := registry.RegisterHandler(fmt.Sprintf("TEST_%d", id), handler, false) + if err != nil { + t.Errorf("RegisterHandler should not error: %v", err) + } + done <- true + }(i) + } + + // Start multiple goroutines reading handlers + for i := 0; i < 5; i++ { + go func(id int) { + registry.GetHandler(fmt.Sprintf("TEST_%d", id)) + done <- true + }(i) + } + + // Wait for all goroutines to complete + for i := 0; i < 10; i++ { + <-done + } +} + +// TestProcessorConcurrency tests concurrent access to processor +func TestProcessorConcurrency(t *testing.T) { + processor := NewProcessor() + + // Test concurrent registration and access + done := make(chan bool, 10) + + // Start multiple goroutines registering handlers + for i := 0; i < 5; i++ { + go func(id int) { + handler := NewTestHandler("test") + err := processor.RegisterHandler(fmt.Sprintf("TEST_%d", id), handler, false) + if err != nil { + t.Errorf("RegisterHandler should not error: %v", err) + } + done <- true + }(i) + } + + // Start multiple goroutines reading handlers + for i := 0; i < 5; i++ { + go func(id int) { + processor.GetHandler(fmt.Sprintf("TEST_%d", id)) + done <- true + }(i) + } + + // Wait for all goroutines to complete + for i := 0; i < 10; i++ { + <-done + } +} + +// TestRegistryEdgeCases tests edge cases for registry +func TestRegistryEdgeCases(t *testing.T) { + t.Run("RegisterHandlerWithEmptyName", func(t *testing.T) { + registry := NewRegistry() + handler := NewTestHandler("test") + + err := registry.RegisterHandler("", handler, false) + if err != nil { + t.Errorf("RegisterHandler should not error with empty name: %v", err) + } + + retrievedHandler := registry.GetHandler("") + if retrievedHandler != handler { + t.Error("GetHandler should return handler even with empty name") + } + }) + + t.Run("MultipleProtectedHandlers", func(t *testing.T) { + registry := NewRegistry() + handler1 := NewTestHandler("test1") + handler2 := NewTestHandler("test2") + + // Register multiple protected handlers + err := registry.RegisterHandler("TEST1", handler1, true) + if err != nil { + t.Errorf("RegisterHandler should not error: %v", err) + } + + err = registry.RegisterHandler("TEST2", handler2, true) + if err != nil { + t.Errorf("RegisterHandler should not error: %v", err) + } + + // Try to unregister both + err = registry.UnregisterHandler("TEST1") + if err == nil { + t.Error("UnregisterHandler should error for protected handler") + } + + err = registry.UnregisterHandler("TEST2") + if err == nil { + t.Error("UnregisterHandler should error for protected handler") + } + }) + + t.Run("CannotOverwriteAnyExistingHandler", func(t *testing.T) { + registry := NewRegistry() + handler1 := NewTestHandler("test1") + handler2 := NewTestHandler("test2") + + // Register protected handler + err := registry.RegisterHandler("TEST", handler1, true) + if err != nil { + t.Errorf("RegisterHandler should not error: %v", err) + } + + // Try to overwrite with another protected handler (should fail) + err = registry.RegisterHandler("TEST", handler2, true) + if err == nil { + t.Error("RegisterHandler should error when trying to overwrite existing handler") + } + + if !strings.Contains(err.Error(), "cannot overwrite existing handler") { + t.Errorf("Error message should mention existing handler, got: %v", err) + } + + // Original handler should still be there + retrievedHandler := registry.GetHandler("TEST") + if retrievedHandler != handler1 { + t.Error("Existing handler should not be overwritten") + } + }) +} + +// TestProcessorEdgeCases tests edge cases for processor +func TestProcessorEdgeCases(t *testing.T) { + t.Run("ProcessorWithNilRegistry", func(t *testing.T) { + // This tests internal consistency - processor should always have a registry + processor := &Processor{registry: nil} + + // This should panic or handle gracefully + defer func() { + if r := recover(); r != nil { + // Expected behavior - accessing nil registry should panic + t.Logf("Expected panic when accessing nil registry: %v", r) + } + }() + + // This will likely panic, which is expected behavior + processor.GetHandler("TEST") + }) + + t.Run("ProcessorRegisterNilHandler", func(t *testing.T) { + processor := NewProcessor() + + err := processor.RegisterHandler("TEST", nil, false) + if err == nil { + t.Error("RegisterHandler should error when handler is nil") + } + }) +} + +// TestVoidProcessorEdgeCases tests edge cases for void processor +func TestVoidProcessorEdgeCases(t *testing.T) { + t.Run("VoidProcessorMultipleOperations", func(t *testing.T) { + processor := NewVoidProcessor() + handler := NewTestHandler("test") + + // Multiple register attempts should all fail + for i := 0; i < 5; i++ { + err := processor.RegisterHandler(fmt.Sprintf("TEST_%d", i), handler, false) + if err == nil { + t.Errorf("VoidProcessor RegisterHandler should always return error") + } + } + + // Multiple unregister attempts should all fail + for i := 0; i < 5; i++ { + err := processor.UnregisterHandler(fmt.Sprintf("TEST_%d", i)) + if err == nil { + t.Errorf("VoidProcessor UnregisterHandler should always return error") + } + } + + // Multiple get attempts should all return nil + for i := 0; i < 5; i++ { + handler := processor.GetHandler(fmt.Sprintf("TEST_%d", i)) + if handler != nil { + t.Errorf("VoidProcessor GetHandler should always return nil") + } + } + }) +} + +// Helper functions to create fake RESP3 protocol data for testing + +// createFakeRESP3PushNotification creates a fake RESP3 push notification buffer +func createFakeRESP3PushNotification(notificationType string, args ...string) *bytes.Buffer { + buf := &bytes.Buffer{} + + // RESP3 Push notification format: >\r\n\r\n + totalElements := 1 + len(args) // notification type + arguments + buf.WriteString(fmt.Sprintf(">%d\r\n", totalElements)) + + // Write notification type as bulk string + buf.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(notificationType), notificationType)) + + // Write arguments as bulk strings + for _, arg := range args { + buf.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(arg), arg)) + } + + return buf +} + +// createFakeRESP3Array creates a fake RESP3 array (not push notification) +func createFakeRESP3Array(elements ...string) *bytes.Buffer { + buf := &bytes.Buffer{} + + // RESP3 Array format: *\r\n\r\n + buf.WriteString(fmt.Sprintf("*%d\r\n", len(elements))) + + // Write elements as bulk strings + for _, element := range elements { + buf.WriteString(fmt.Sprintf("$%d\r\n%s\r\n", len(element), element)) + } + + return buf +} + +// createFakeRESP3Error creates a fake RESP3 error +func createFakeRESP3Error(message string) *bytes.Buffer { + buf := &bytes.Buffer{} + buf.WriteString(fmt.Sprintf("-%s\r\n", message)) + return buf +} + +// createMultipleNotifications creates a buffer with multiple notifications +func createMultipleNotifications(notifications ...[]string) *bytes.Buffer { + buf := &bytes.Buffer{} + + for _, notification := range notifications { + if len(notification) == 0 { + continue + } + + notificationType := notification[0] + args := notification[1:] + + // Determine if this should be a push notification or regular array + if shouldSkipNotification(notificationType) { + // Create as push notification (will be skipped) + pushBuf := createFakeRESP3PushNotification(notificationType, args...) + buf.Write(pushBuf.Bytes()) + } else { + // Create as push notification (will be processed) + pushBuf := createFakeRESP3PushNotification(notificationType, args...) + buf.Write(pushBuf.Bytes()) + } + } + + return buf +} + +// TestProcessorWithFakeBuffer tests ProcessPendingNotifications with fake RESP3 data +func TestProcessorWithFakeBuffer(t *testing.T) { + t.Run("ProcessValidPushNotification", func(t *testing.T) { + processor := NewProcessor() + handler := NewTestHandler("test") + processor.RegisterHandler("MOVING", handler, false) + + // Create fake RESP3 push notification + buf := createFakeRESP3PushNotification("MOVING", "slot", "123", "from", "node1", "to", "node2") + reader := proto.NewReader(buf) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader) + if err != nil { + t.Errorf("ProcessPendingNotifications should not error: %v", err) + } + + handled := handler.GetHandledNotifications() + if len(handled) != 1 { + t.Errorf("Expected 1 handled notification, got %d", len(handled)) + } + + if len(handled[0]) != 7 || handled[0][0] != "MOVING" { + t.Errorf("Handled notification should match input: %v", handled[0]) + } + + if handled[0][1] != "slot" || handled[0][2] != "123" { + t.Errorf("Notification arguments should match: %v", handled[0]) + } + }) + + t.Run("ProcessSkippedPushNotification", func(t *testing.T) { + processor := NewProcessor() + handler := NewTestHandler("test") + processor.RegisterHandler("message", handler, false) + + // Create fake RESP3 push notification for pub/sub message (should be skipped) + buf := createFakeRESP3PushNotification("message", "channel", "hello world") + reader := proto.NewReader(buf) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader) + if err != nil { + t.Errorf("ProcessPendingNotifications should not error: %v", err) + } + + handled := handler.GetHandledNotifications() + if len(handled) != 0 { + t.Errorf("Expected 0 handled notifications (should be skipped), got %d", len(handled)) + } + }) + + t.Run("ProcessNotificationWithoutHandler", func(t *testing.T) { + processor := NewProcessor() + // No handler registered for MOVING + + // Create fake RESP3 push notification + buf := createFakeRESP3PushNotification("MOVING", "slot", "123") + reader := proto.NewReader(buf) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader) + if err != nil { + t.Errorf("ProcessPendingNotifications should not error when no handler: %v", err) + } + }) + + t.Run("ProcessNotificationWithHandlerError", func(t *testing.T) { + processor := NewProcessor() + handler := NewTestHandler("test") + handler.SetReturnError(errors.New("handler error")) + processor.RegisterHandler("MOVING", handler, false) + + // Create fake RESP3 push notification + buf := createFakeRESP3PushNotification("MOVING", "slot", "123") + reader := proto.NewReader(buf) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader) + if err != nil { + t.Errorf("ProcessPendingNotifications should not error even when handler errors: %v", err) + } + + handled := handler.GetHandledNotifications() + if len(handled) != 1 { + t.Errorf("Expected 1 handled notification even with error, got %d", len(handled)) + } + }) + + t.Run("ProcessNonPushNotification", func(t *testing.T) { + processor := NewProcessor() + handler := NewTestHandler("test") + processor.RegisterHandler("MOVING", handler, false) + + // Create fake RESP3 array (not push notification) + buf := createFakeRESP3Array("MOVING", "slot", "123") + reader := proto.NewReader(buf) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader) + if err != nil { + t.Errorf("ProcessPendingNotifications should not error: %v", err) + } + + handled := handler.GetHandledNotifications() + if len(handled) != 0 { + t.Errorf("Expected 0 handled notifications (not push type), got %d", len(handled)) + } + }) + + t.Run("ProcessMultipleNotifications", func(t *testing.T) { + processor := NewProcessor() + movingHandler := NewTestHandler("moving") + migratingHandler := NewTestHandler("migrating") + processor.RegisterHandler("MOVING", movingHandler, false) + processor.RegisterHandler("MIGRATING", migratingHandler, false) + + // Create buffer with multiple notifications + buf := createMultipleNotifications( + []string{"MOVING", "slot", "123", "from", "node1", "to", "node2"}, + []string{"message", "channel", "data"}, // Should be skipped + []string{"MIGRATING", "slot", "456", "from", "node2", "to", "node3"}, + ) + reader := proto.NewReader(buf) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader) + if err != nil { + t.Errorf("ProcessPendingNotifications should not error: %v", err) + } + + // Check MOVING handler + movingHandled := movingHandler.GetHandledNotifications() + if len(movingHandled) != 1 { + t.Errorf("Expected 1 MOVING notification, got %d", len(movingHandled)) + } + if len(movingHandled) > 0 && movingHandled[0][0] != "MOVING" { + t.Errorf("Expected MOVING notification, got %v", movingHandled[0][0]) + } + + // Check MIGRATING handler + migratingHandled := migratingHandler.GetHandledNotifications() + if len(migratingHandled) != 1 { + t.Errorf("Expected 1 MIGRATING notification, got %d", len(migratingHandled)) + } + if len(migratingHandled) > 0 && migratingHandled[0][0] != "MIGRATING" { + t.Errorf("Expected MIGRATING notification, got %v", migratingHandled[0][0]) + } + }) + + t.Run("ProcessEmptyNotification", func(t *testing.T) { + processor := NewProcessor() + handler := NewTestHandler("test") + processor.RegisterHandler("MOVING", handler, false) + + // Create fake RESP3 push notification with no elements + buf := &bytes.Buffer{} + buf.WriteString(">0\r\n") // Empty push notification + reader := proto.NewReader(buf) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader) + if err != nil { + t.Errorf("ProcessPendingNotifications should handle empty notification gracefully: %v", err) + } + + handled := handler.GetHandledNotifications() + if len(handled) != 0 { + t.Errorf("Expected 0 handled notifications for empty notification, got %d", len(handled)) + } + }) + + t.Run("ProcessNotificationWithNonStringType", func(t *testing.T) { + processor := NewProcessor() + handler := NewTestHandler("test") + processor.RegisterHandler("MOVING", handler, false) + + // Create fake RESP3 push notification with integer as first element + buf := &bytes.Buffer{} + buf.WriteString(">2\r\n") // 2 elements + buf.WriteString(":123\r\n") // Integer instead of string + buf.WriteString("$4\r\ndata\r\n") // String data + reader := proto.NewReader(buf) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader) + if err != nil { + t.Errorf("ProcessPendingNotifications should handle non-string type gracefully: %v", err) + } + + handled := handler.GetHandledNotifications() + if len(handled) != 0 { + t.Errorf("Expected 0 handled notifications for non-string type, got %d", len(handled)) + } + }) +} + +// TestVoidProcessorWithFakeBuffer tests VoidProcessor with fake RESP3 data +func TestVoidProcessorWithFakeBuffer(t *testing.T) { + t.Run("ProcessPushNotifications", func(t *testing.T) { + processor := NewVoidProcessor() + + // Create buffer with multiple push notifications + buf := createMultipleNotifications( + []string{"MOVING", "slot", "123"}, + []string{"MIGRATING", "slot", "456"}, + []string{"FAILED_OVER", "node", "node1"}, + ) + reader := proto.NewReader(buf) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader) + if err != nil { + t.Errorf("VoidProcessor ProcessPendingNotifications should not error: %v", err) + } + + // VoidProcessor should discard all notifications without processing + // We can't directly verify this, but the fact that it doesn't error is good + }) + + t.Run("ProcessSkippedNotifications", func(t *testing.T) { + processor := NewVoidProcessor() + + // Create buffer with pub/sub notifications (should be skipped) + buf := createMultipleNotifications( + []string{"message", "channel", "data"}, + []string{"pmessage", "pattern", "channel", "data"}, + []string{"subscribe", "channel", "1"}, + ) + reader := proto.NewReader(buf) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader) + if err != nil { + t.Errorf("VoidProcessor ProcessPendingNotifications should not error: %v", err) + } + }) + + t.Run("ProcessMixedNotifications", func(t *testing.T) { + processor := NewVoidProcessor() + + // Create buffer with mixed push notifications and regular arrays + buf := &bytes.Buffer{} + + // Add push notification + pushBuf := createFakeRESP3PushNotification("MOVING", "slot", "123") + buf.Write(pushBuf.Bytes()) + + // Add regular array (should stop processing) + arrayBuf := createFakeRESP3Array("SOME", "COMMAND") + buf.Write(arrayBuf.Bytes()) + + reader := proto.NewReader(buf) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader) + if err != nil { + t.Errorf("VoidProcessor ProcessPendingNotifications should not error: %v", err) + } + }) + + t.Run("ProcessInvalidNotificationFormat", func(t *testing.T) { + processor := NewVoidProcessor() + + // Create invalid RESP3 data + buf := &bytes.Buffer{} + buf.WriteString(">1\r\n") // Push notification with 1 element + buf.WriteString("invalid\r\n") // Invalid format (should be $\r\n\r\n) + reader := proto.NewReader(buf) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader) + // VoidProcessor should handle errors gracefully + if err != nil { + t.Logf("VoidProcessor handled error gracefully: %v", err) + } + }) +} + +// TestProcessorErrorHandling tests error handling scenarios +func TestProcessorErrorHandling(t *testing.T) { + t.Run("ProcessWithEmptyBuffer", func(t *testing.T) { + processor := NewProcessor() + handler := NewTestHandler("test") + processor.RegisterHandler("MOVING", handler, false) + + // Create empty buffer + buf := &bytes.Buffer{} + reader := proto.NewReader(buf) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader) + if err != nil { + t.Errorf("ProcessPendingNotifications should handle empty buffer gracefully: %v", err) + } + + handled := handler.GetHandledNotifications() + if len(handled) != 0 { + t.Errorf("Expected 0 handled notifications for empty buffer, got %d", len(handled)) + } + }) + + t.Run("ProcessWithCorruptedData", func(t *testing.T) { + processor := NewProcessor() + handler := NewTestHandler("test") + processor.RegisterHandler("MOVING", handler, false) + + // Create buffer with corrupted RESP3 data + buf := &bytes.Buffer{} + buf.WriteString(">2\r\n") // Says 2 elements + buf.WriteString("$6\r\nMOVING\r\n") // First element OK + buf.WriteString("corrupted") // Second element corrupted (no proper format) + reader := proto.NewReader(buf) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader) + // Should handle corruption gracefully + if err != nil { + t.Logf("Processor handled corrupted data gracefully: %v", err) + } + }) + + t.Run("ProcessWithPartialData", func(t *testing.T) { + processor := NewProcessor() + handler := NewTestHandler("test") + processor.RegisterHandler("MOVING", handler, false) + + // Create buffer with partial RESP3 data + buf := &bytes.Buffer{} + buf.WriteString(">2\r\n") // Says 2 elements + buf.WriteString("$6\r\nMOVING\r\n") // First element OK + // Missing second element + reader := proto.NewReader(buf) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader) + // Should handle partial data gracefully + if err != nil { + t.Logf("Processor handled partial data gracefully: %v", err) + } + }) +} + +// TestProcessorPerformanceWithFakeData tests performance with realistic data +func TestProcessorPerformanceWithFakeData(t *testing.T) { + processor := NewProcessor() + handler := NewTestHandler("test") + processor.RegisterHandler("MOVING", handler, false) + processor.RegisterHandler("MIGRATING", handler, false) + processor.RegisterHandler("MIGRATED", handler, false) + + // Create buffer with many notifications + notifications := make([][]string, 100) + for i := 0; i < 100; i++ { + switch i % 3 { + case 0: + notifications[i] = []string{"MOVING", "slot", fmt.Sprintf("%d", i), "from", "node1", "to", "node2"} + case 1: + notifications[i] = []string{"MIGRATING", "slot", fmt.Sprintf("%d", i), "from", "node2", "to", "node3"} + case 2: + notifications[i] = []string{"MIGRATED", "slot", fmt.Sprintf("%d", i), "from", "node3", "to", "node1"} + } + } + + buf := createMultipleNotifications(notifications...) + reader := proto.NewReader(buf) + + ctx := context.Background() + handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false) + + err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader) + if err != nil { + t.Errorf("ProcessPendingNotifications should not error with many notifications: %v", err) + } + + handled := handler.GetHandledNotifications() + if len(handled) != 100 { + t.Errorf("Expected 100 handled notifications, got %d", len(handled)) + } +} + +// TestInterfaceCompliance tests that all types implement their interfaces correctly +func TestInterfaceCompliance(t *testing.T) { + // Test that Processor implements NotificationProcessor + var _ NotificationProcessor = (*Processor)(nil) + + // Test that VoidProcessor implements NotificationProcessor + var _ NotificationProcessor = (*VoidProcessor)(nil) + + // Test that pushNotificationHandlerContext implements NotificationHandlerContext + var _ NotificationHandlerContext = (*pushNotificationHandlerContext)(nil) + + // Test that TestHandler implements NotificationHandler + var _ NotificationHandler = (*TestHandler)(nil) + + // Test that error types implement error interface + var _ error = (*HandlerError)(nil) + var _ error = (*ProcessorError)(nil) +} + +// TestErrors tests the error definitions and helper functions +func TestErrors(t *testing.T) { + t.Run("ErrHandlerNil", func(t *testing.T) { + err := ErrHandlerNil + if err == nil { + t.Error("ErrHandlerNil should not be nil") + } + + if err.Error() != "handler cannot be nil" { + t.Errorf("ErrHandlerNil message should be 'handler cannot be nil', got: %s", err.Error()) + } + }) + + t.Run("ErrHandlerExists", func(t *testing.T) { + notificationName := "TEST_NOTIFICATION" + err := ErrHandlerExists(notificationName) + + if err == nil { + t.Error("ErrHandlerExists should not return nil") + } + + expectedMsg := "cannot overwrite existing handler for push notification: TEST_NOTIFICATION" + if err.Error() != expectedMsg { + t.Errorf("ErrHandlerExists message should be '%s', got: %s", expectedMsg, err.Error()) + } + }) + + t.Run("ErrProtectedHandler", func(t *testing.T) { + notificationName := "PROTECTED_NOTIFICATION" + err := ErrProtectedHandler(notificationName) + + if err == nil { + t.Error("ErrProtectedHandler should not return nil") + } + + expectedMsg := "cannot unregister protected handler for push notification: PROTECTED_NOTIFICATION" + if err.Error() != expectedMsg { + t.Errorf("ErrProtectedHandler message should be '%s', got: %s", expectedMsg, err.Error()) + } + }) + + t.Run("ErrVoidProcessorRegister", func(t *testing.T) { + notificationName := "VOID_TEST" + err := ErrVoidProcessorRegister(notificationName) + + if err == nil { + t.Error("ErrVoidProcessorRegister should not return nil") + } + + expectedMsg := "cannot register push notification handler 'VOID_TEST': push notifications are disabled (using void processor)" + if err.Error() != expectedMsg { + t.Errorf("ErrVoidProcessorRegister message should be '%s', got: %s", expectedMsg, err.Error()) + } + }) + + t.Run("ErrVoidProcessorUnregister", func(t *testing.T) { + notificationName := "VOID_TEST" + err := ErrVoidProcessorUnregister(notificationName) + + if err == nil { + t.Error("ErrVoidProcessorUnregister should not return nil") + } + + expectedMsg := "cannot unregister push notification handler 'VOID_TEST': push notifications are disabled (using void processor)" + if err.Error() != expectedMsg { + t.Errorf("ErrVoidProcessorUnregister message should be '%s', got: %s", expectedMsg, err.Error()) + } + }) +} + +// TestHandlerError tests the HandlerError structured error type +func TestHandlerError(t *testing.T) { + t.Run("HandlerErrorWithoutWrappedError", func(t *testing.T) { + err := NewHandlerError("register", "TEST_NOTIFICATION", "handler already exists", nil) + + if err == nil { + t.Error("NewHandlerError should not return nil") + } + + expectedMsg := "handler register failed for 'TEST_NOTIFICATION': handler already exists" + if err.Error() != expectedMsg { + t.Errorf("HandlerError message should be '%s', got: %s", expectedMsg, err.Error()) + } + + if err.Operation != "register" { + t.Errorf("HandlerError Operation should be 'register', got: %s", err.Operation) + } + + if err.PushNotificationName != "TEST_NOTIFICATION" { + t.Errorf("HandlerError PushNotificationName should be 'TEST_NOTIFICATION', got: %s", err.PushNotificationName) + } + + if err.Reason != "handler already exists" { + t.Errorf("HandlerError Reason should be 'handler already exists', got: %s", err.Reason) + } + + if err.Unwrap() != nil { + t.Error("HandlerError Unwrap should return nil when no wrapped error") + } + }) + + t.Run("HandlerErrorWithWrappedError", func(t *testing.T) { + wrappedErr := errors.New("underlying error") + err := NewHandlerError("unregister", "PROTECTED_NOTIFICATION", "protected handler", wrappedErr) + + expectedMsg := "handler unregister failed for 'PROTECTED_NOTIFICATION': protected handler (underlying error)" + if err.Error() != expectedMsg { + t.Errorf("HandlerError message should be '%s', got: %s", expectedMsg, err.Error()) + } + + if err.Unwrap() != wrappedErr { + t.Error("HandlerError Unwrap should return the wrapped error") + } + }) +} + +// TestProcessorError tests the ProcessorError structured error type +func TestProcessorError(t *testing.T) { + t.Run("ProcessorErrorWithoutWrappedError", func(t *testing.T) { + err := NewProcessorError("processor", "process", "invalid notification format", nil) + + if err == nil { + t.Error("NewProcessorError should not return nil") + } + + expectedMsg := "processor process failed: invalid notification format" + if err.Error() != expectedMsg { + t.Errorf("ProcessorError message should be '%s', got: %s", expectedMsg, err.Error()) + } + + if err.ProcessorType != "processor" { + t.Errorf("ProcessorError ProcessorType should be 'processor', got: %s", err.ProcessorType) + } + + if err.Operation != "process" { + t.Errorf("ProcessorError Operation should be 'process', got: %s", err.Operation) + } + + if err.Reason != "invalid notification format" { + t.Errorf("ProcessorError Reason should be 'invalid notification format', got: %s", err.Reason) + } + + if err.Unwrap() != nil { + t.Error("ProcessorError Unwrap should return nil when no wrapped error") + } + }) + + t.Run("ProcessorErrorWithWrappedError", func(t *testing.T) { + wrappedErr := errors.New("network error") + err := NewProcessorError("void_processor", "register", "disabled", wrappedErr) + + expectedMsg := "void_processor register failed: disabled (network error)" + if err.Error() != expectedMsg { + t.Errorf("ProcessorError message should be '%s', got: %s", expectedMsg, err.Error()) + } + + if err.Unwrap() != wrappedErr { + t.Error("ProcessorError Unwrap should return the wrapped error") + } + }) +} + +// TestErrorHelperFunctions tests the error checking helper functions +func TestErrorHelperFunctions(t *testing.T) { + t.Run("IsHandlerNilError", func(t *testing.T) { + // Test with ErrHandlerNil + if !IsHandlerNilError(ErrHandlerNil) { + t.Error("IsHandlerNilError should return true for ErrHandlerNil") + } + + // Test with other error + otherErr := ErrHandlerExists("TEST") + if IsHandlerNilError(otherErr) { + t.Error("IsHandlerNilError should return false for other errors") + } + + // Test with nil + if IsHandlerNilError(nil) { + t.Error("IsHandlerNilError should return false for nil") + } + }) + + t.Run("IsVoidProcessorError", func(t *testing.T) { + // Test with void processor register error + registerErr := ErrVoidProcessorRegister("TEST") + if !IsVoidProcessorError(registerErr) { + t.Error("IsVoidProcessorError should return true for void processor register error") + } + + // Test with void processor unregister error + unregisterErr := ErrVoidProcessorUnregister("TEST") + if !IsVoidProcessorError(unregisterErr) { + t.Error("IsVoidProcessorError should return true for void processor unregister error") + } + + // Test with other error + otherErr := ErrHandlerNil + if IsVoidProcessorError(otherErr) { + t.Error("IsVoidProcessorError should return false for other errors") + } + + // Test with nil + if IsVoidProcessorError(nil) { + t.Error("IsVoidProcessorError should return false for nil") + } + }) +} + +// TestErrorConstants tests the error message constants +func TestErrorConstants(t *testing.T) { + t.Run("ErrorMessageConstants", func(t *testing.T) { + if MsgHandlerNil != "handler cannot be nil" { + t.Errorf("MsgHandlerNil should be 'handler cannot be nil', got: %s", MsgHandlerNil) + } + + if MsgHandlerExists != "cannot overwrite existing handler for push notification: %s" { + t.Errorf("MsgHandlerExists should be 'cannot overwrite existing handler for push notification: %%s', got: %s", MsgHandlerExists) + } + + if MsgProtectedHandler != "cannot unregister protected handler for push notification: %s" { + t.Errorf("MsgProtectedHandler should be 'cannot unregister protected handler for push notification: %%s', got: %s", MsgProtectedHandler) + } + + if MsgVoidProcessorRegister != "cannot register push notification handler '%s': push notifications are disabled (using void processor)" { + t.Errorf("MsgVoidProcessorRegister constant mismatch, got: %s", MsgVoidProcessorRegister) + } + + if MsgVoidProcessorUnregister != "cannot unregister push notification handler '%s': push notifications are disabled (using void processor)" { + t.Errorf("MsgVoidProcessorUnregister constant mismatch, got: %s", MsgVoidProcessorUnregister) + } + }) +} + +// Benchmark tests for performance +func BenchmarkRegistry(b *testing.B) { + registry := NewRegistry() + handler := NewTestHandler("test") + + b.Run("RegisterHandler", func(b *testing.B) { + for i := 0; i < b.N; i++ { + registry.RegisterHandler("TEST", handler, false) + } + }) + + b.Run("GetHandler", func(b *testing.B) { + registry.RegisterHandler("TEST", handler, false) + b.ResetTimer() + for i := 0; i < b.N; i++ { + registry.GetHandler("TEST") + } + }) +} + +func BenchmarkProcessor(b *testing.B) { + processor := NewProcessor() + handler := NewTestHandler("test") + processor.RegisterHandler("MOVING", handler, false) + + b.Run("RegisterHandler", func(b *testing.B) { + for i := 0; i < b.N; i++ { + processor.RegisterHandler("TEST", handler, false) + } + }) + + b.Run("GetHandler", func(b *testing.B) { + for i := 0; i < b.N; i++ { + processor.GetHandler("MOVING") + } + }) +} diff --git a/push/registry.go b/push/registry.go new file mode 100644 index 00000000..a265ae92 --- /dev/null +++ b/push/registry.go @@ -0,0 +1,61 @@ +package push + +import ( + "sync" +) + +// Registry manages push notification handlers +type Registry struct { + mu sync.RWMutex + handlers map[string]NotificationHandler + protected map[string]bool +} + +// NewRegistry creates a new push notification registry +func NewRegistry() *Registry { + return &Registry{ + handlers: make(map[string]NotificationHandler), + protected: make(map[string]bool), + } +} + +// RegisterHandler registers a handler for a specific push notification name +func (r *Registry) RegisterHandler(pushNotificationName string, handler NotificationHandler, protected bool) error { + if handler == nil { + return ErrHandlerNil + } + + r.mu.Lock() + defer r.mu.Unlock() + + // Check if handler already exists + if _, exists := r.protected[pushNotificationName]; exists { + return ErrHandlerExists(pushNotificationName) + } + + r.handlers[pushNotificationName] = handler + r.protected[pushNotificationName] = protected + return nil +} + +// GetHandler returns the handler for a specific push notification name +func (r *Registry) GetHandler(pushNotificationName string) NotificationHandler { + r.mu.RLock() + defer r.mu.RUnlock() + return r.handlers[pushNotificationName] +} + +// UnregisterHandler removes a handler for a specific push notification name +func (r *Registry) UnregisterHandler(pushNotificationName string) error { + r.mu.Lock() + defer r.mu.Unlock() + + // Check if handler is protected + if protected, exists := r.protected[pushNotificationName]; exists && protected { + return ErrProtectedHandler(pushNotificationName) + } + + delete(r.handlers, pushNotificationName) + delete(r.protected, pushNotificationName) + return nil +} diff --git a/push_notification_handler_context.go b/push_notification_handler_context.go deleted file mode 100644 index 03f9affd..00000000 --- a/push_notification_handler_context.go +++ /dev/null @@ -1,125 +0,0 @@ -package redis - -import ( - "github.com/redis/go-redis/v9/internal/pool" -) - -// PushNotificationHandlerContext provides context information about where a push notification was received. -// This interface allows handlers to make informed decisions based on the source of the notification -// with strongly typed access to different client types using concrete types. -type PushNotificationHandlerContext interface { - // GetClient returns the Redis client instance that received the notification. - // Returns nil if no client context is available. - GetClient() interface{} - - // GetClusterClient returns the client as a ClusterClient if it is one. - // Returns nil if the client is not a ClusterClient or no client context is available. - GetClusterClient() *ClusterClient - - // GetSentinelClient returns the client as a SentinelClient if it is one. - // Returns nil if the client is not a SentinelClient or no client context is available. - GetSentinelClient() *SentinelClient - - // GetFailoverClient returns the client as a FailoverClient if it is one. - // Returns nil if the client is not a FailoverClient or no client context is available. - GetFailoverClient() *Client - - // GetRegularClient returns the client as a regular Client if it is one. - // Returns nil if the client is not a regular Client or no client context is available. - GetRegularClient() *Client - - // GetConnPool returns the connection pool from which the connection was obtained. - // Returns nil if no connection pool context is available. - GetConnPool() interface{} - - // GetPubSub returns the PubSub instance that received the notification. - // Returns nil if this is not a PubSub connection. - GetPubSub() *PubSub - - // GetConn returns the specific connection on which the notification was received. - // Returns nil if no connection context is available. - GetConn() *pool.Conn - - // IsBlocking returns true if the notification was received on a blocking connection. - IsBlocking() bool -} - -// pushNotificationHandlerContext is the concrete implementation of PushNotificationHandlerContext interface -type pushNotificationHandlerContext struct { - client interface{} - connPool interface{} - pubSub interface{} - conn *pool.Conn - isBlocking bool -} - -// NewPushNotificationHandlerContext creates a new PushNotificationHandlerContext implementation -func NewPushNotificationHandlerContext(client, connPool, pubSub interface{}, conn *pool.Conn, isBlocking bool) PushNotificationHandlerContext { - return &pushNotificationHandlerContext{ - client: client, - connPool: connPool, - pubSub: pubSub, - conn: conn, - isBlocking: isBlocking, - } -} - -// GetClient returns the Redis client instance that received the notification -func (h *pushNotificationHandlerContext) GetClient() interface{} { - return h.client -} - -// GetClusterClient returns the client as a ClusterClient if it is one -func (h *pushNotificationHandlerContext) GetClusterClient() *ClusterClient { - if client, ok := h.client.(*ClusterClient); ok { - return client - } - return nil -} - -// GetSentinelClient returns the client as a SentinelClient if it is one -func (h *pushNotificationHandlerContext) GetSentinelClient() *SentinelClient { - if client, ok := h.client.(*SentinelClient); ok { - return client - } - return nil -} - -// GetFailoverClient returns the client as a FailoverClient if it is one -func (h *pushNotificationHandlerContext) GetFailoverClient() *Client { - if client, ok := h.client.(*Client); ok { - return client - } - return nil -} - -// GetRegularClient returns the client as a regular Client if it is one -func (h *pushNotificationHandlerContext) GetRegularClient() *Client { - if client, ok := h.client.(*Client); ok { - return client - } - return nil -} - -// GetConnPool returns the connection pool from which the connection was obtained -func (h *pushNotificationHandlerContext) GetConnPool() interface{} { - return h.connPool -} - -// GetPubSub returns the PubSub instance that received the notification -func (h *pushNotificationHandlerContext) GetPubSub() *PubSub { - if pubSub, ok := h.pubSub.(*PubSub); ok { - return pubSub - } - return nil -} - -// GetConn returns the specific connection on which the notification was received -func (h *pushNotificationHandlerContext) GetConn() *pool.Conn { - return h.conn -} - -// IsBlocking returns true if the notification was received on a blocking connection -func (h *pushNotificationHandlerContext) IsBlocking() bool { - return h.isBlocking -} diff --git a/push_notifications.go b/push_notifications.go index d9666c04..ceffe04a 100644 --- a/push_notifications.go +++ b/push_notifications.go @@ -1,9 +1,7 @@ package redis import ( - "context" - - "github.com/redis/go-redis/v9/internal/proto" + "github.com/redis/go-redis/v9/push" ) // Push notification constants for cluster operations @@ -24,147 +22,18 @@ const ( PushNotificationFailedOver = "FAILED_OVER" ) -// PushNotificationHandlerContext is defined in push_notification_handler_context.go - -// PushNotificationHandler defines the interface for push notification handlers. -type PushNotificationHandler interface { - // HandlePushNotification processes a push notification with context information. - // The handlerCtx provides information about the client, connection pool, and connection - // on which the notification was received, allowing handlers to make informed decisions. - // Returns true if the notification was handled, false otherwise. - HandlePushNotification(ctx context.Context, handlerCtx PushNotificationHandlerContext, notification []interface{}) bool +// NewPushNotificationProcessor creates a new push notification processor +// This processor maintains a registry of handlers and processes push notifications +// It is used for RESP3 connections where push notifications are available +func NewPushNotificationProcessor() push.NotificationProcessor { + return push.NewProcessor() } -// NewPushNotificationHandlerContext is defined in push_notification_handler_context.go - -// Registry, Processor, and VoidProcessor are defined in push_notification_processor.go - -// PushNotificationProcessorInterface defines the interface for push notification processors. -type PushNotificationProcessorInterface interface { - GetHandler(pushNotificationName string) PushNotificationHandler - ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error - RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error -} - -// PushNotificationRegistry manages push notification handlers. -type PushNotificationRegistry struct { - registry *Registry -} - -// NewPushNotificationRegistry creates a new push notification registry. -func NewPushNotificationRegistry() *PushNotificationRegistry { - return &PushNotificationRegistry{ - registry: NewRegistry(), - } -} - -// RegisterHandler registers a handler for a specific push notification name. -func (r *PushNotificationRegistry) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - return r.registry.RegisterHandler(pushNotificationName, handler, protected) -} - -// UnregisterHandler removes a handler for a specific push notification name. -func (r *PushNotificationRegistry) UnregisterHandler(pushNotificationName string) error { - return r.registry.UnregisterHandler(pushNotificationName) -} - -// GetHandler returns the handler for a specific push notification name. -func (r *PushNotificationRegistry) GetHandler(pushNotificationName string) PushNotificationHandler { - return r.registry.GetHandler(pushNotificationName) -} - -// GetRegisteredPushNotificationNames returns a list of all registered push notification names. -func (r *PushNotificationRegistry) GetRegisteredPushNotificationNames() []string { - return r.registry.GetRegisteredPushNotificationNames() -} - -// PushNotificationProcessor handles push notifications with a registry of handlers. -type PushNotificationProcessor struct { - processor *Processor -} - -// NewPushNotificationProcessor creates a new push notification processor. -func NewPushNotificationProcessor() *PushNotificationProcessor { - return &PushNotificationProcessor{ - processor: NewProcessor(), - } -} - -// GetHandler returns the handler for a specific push notification name. -func (p *PushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler { - return p.processor.GetHandler(pushNotificationName) -} - -// RegisterHandler registers a handler for a specific push notification name. -func (p *PushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - return p.processor.RegisterHandler(pushNotificationName, handler, protected) -} - -// UnregisterHandler removes a handler for a specific push notification name. -func (p *PushNotificationProcessor) UnregisterHandler(pushNotificationName string) error { - return p.processor.UnregisterHandler(pushNotificationName) -} - -// ProcessPendingNotifications checks for and processes any pending push notifications. -// The handlerCtx provides context about the client, connection pool, and connection. -func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { - return p.processor.ProcessPendingNotifications(ctx, handlerCtx, rd) -} - -// VoidPushNotificationProcessor discards all push notifications without processing them. -type VoidPushNotificationProcessor struct { - processor *VoidProcessor -} - -// NewVoidPushNotificationProcessor creates a new void push notification processor. -func NewVoidPushNotificationProcessor() *VoidPushNotificationProcessor { - return &VoidPushNotificationProcessor{ - processor: NewVoidProcessor(), - } -} - -// GetHandler returns nil for void processor since it doesn't maintain handlers. -func (v *VoidPushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler { - return v.processor.GetHandler(pushNotificationName) -} - -// RegisterHandler returns an error for void processor since it doesn't maintain handlers. -func (v *VoidPushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - return v.processor.RegisterHandler(pushNotificationName, handler, protected) -} - -// ProcessPendingNotifications reads and discards any pending push notifications. -func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { - return v.processor.ProcessPendingNotifications(ctx, handlerCtx, rd) -} - -// PushNotificationInfo contains metadata about a push notification. -type PushNotificationInfo struct { - Name string - Args []interface{} -} - -// ParsePushNotificationInfo extracts information from a push notification. -func ParsePushNotificationInfo(notification []interface{}) *PushNotificationInfo { - if len(notification) == 0 { - return nil - } - - name, ok := notification[0].(string) - if !ok { - return nil - } - - return &PushNotificationInfo{ - Name: name, - Args: notification[1:], - } -} - -// String returns a string representation of the push notification info. -func (info *PushNotificationInfo) String() string { - if info == nil { - return "" - } - return info.Name +// NewVoidPushNotificationProcessor creates a new void push notification processor +// This processor does not maintain any handlers and always returns nil for all operations +// It is used for RESP2 connections where push notifications are not available +// It can also be used to disable push notifications for RESP3 connections, where +// it will discard all push notifications without processing them +func NewVoidPushNotificationProcessor() push.NotificationProcessor { + return push.NewVoidProcessor() } diff --git a/redis.go b/redis.go index 205caeec..897f59fa 100644 --- a/redis.go +++ b/redis.go @@ -14,6 +14,7 @@ import ( "github.com/redis/go-redis/v9/internal/hscan" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/proto" + "github.com/redis/go-redis/v9/push" ) // Scanner internal/hscan.Scanner exposed interface. @@ -209,7 +210,7 @@ type baseClient struct { onClose func() error // hook called when client is closed // Push notification processing - pushProcessor PushNotificationProcessorInterface + pushProcessor push.NotificationProcessor } func (c *baseClient) clone() *baseClient { @@ -880,7 +881,7 @@ func (c *Client) Options() *Options { // initializePushProcessor initializes the push notification processor for any client type. // This is a shared helper to avoid duplication across NewClient, NewFailoverClient, and NewSentinelClient. -func initializePushProcessor(opt *Options) PushNotificationProcessorInterface { +func initializePushProcessor(opt *Options) push.NotificationProcessor { // Always use custom processor if provided if opt.PushNotificationProcessor != nil { return opt.PushNotificationProcessor @@ -899,18 +900,13 @@ func initializePushProcessor(opt *Options) PushNotificationProcessorInterface { // RegisterPushNotificationHandler registers a handler for a specific push notification name. // Returns an error if a handler is already registered for this push notification name. // If protected is true, the handler cannot be unregistered. -func (c *Client) RegisterPushNotificationHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { +func (c *Client) RegisterPushNotificationHandler(pushNotificationName string, handler push.NotificationHandler, protected bool) error { return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected) } -// GetPushNotificationProcessor returns the push notification processor. -func (c *Client) GetPushNotificationProcessor() PushNotificationProcessorInterface { - return c.pushProcessor -} - // GetPushNotificationHandler returns the handler for a specific push notification name. // Returns nil if no handler is registered for the given name. -func (c *Client) GetPushNotificationHandler(pushNotificationName string) PushNotificationHandler { +func (c *Client) GetPushNotificationHandler(pushNotificationName string) push.NotificationHandler { return c.pushProcessor.GetHandler(pushNotificationName) } @@ -1070,15 +1066,10 @@ func (c *Conn) Process(ctx context.Context, cmd Cmder) error { // RegisterPushNotificationHandler registers a handler for a specific push notification name. // Returns an error if a handler is already registered for this push notification name. // If protected is true, the handler cannot be unregistered. -func (c *Conn) RegisterPushNotificationHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { +func (c *Conn) RegisterPushNotificationHandler(pushNotificationName string, handler push.NotificationHandler, protected bool) error { return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected) } -// GetPushNotificationProcessor returns the push notification processor. -func (c *Conn) GetPushNotificationProcessor() PushNotificationProcessorInterface { - return c.pushProcessor -} - func (c *Conn) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { return c.Pipeline().Pipelined(ctx, fn) } @@ -1138,8 +1129,6 @@ func (c *baseClient) processPendingPushNotificationWithReader(ctx context.Contex } // pushNotificationHandlerContext creates a handler context for push notification processing -func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) PushNotificationHandlerContext { - return NewPushNotificationHandlerContext(c, c.connPool, nil, cn, false) +func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) push.NotificationHandlerContext { + return push.NewNotificationHandlerContext(c, c.connPool, nil, cn, false) } - - diff --git a/sentinel.go b/sentinel.go index fa22db7f..76bf1aeb 100644 --- a/sentinel.go +++ b/sentinel.go @@ -16,6 +16,7 @@ import ( "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/rand" + "github.com/redis/go-redis/v9/push" ) //------------------------------------------------------------------------------ @@ -511,21 +512,16 @@ func NewSentinelClient(opt *Options) *SentinelClient { return c } -// GetPushNotificationProcessor returns the push notification processor. -func (c *SentinelClient) GetPushNotificationProcessor() PushNotificationProcessorInterface { - return c.pushProcessor -} - // GetPushNotificationHandler returns the handler for a specific push notification name. // Returns nil if no handler is registered for the given name. -func (c *SentinelClient) GetPushNotificationHandler(pushNotificationName string) PushNotificationHandler { +func (c *SentinelClient) GetPushNotificationHandler(pushNotificationName string) push.NotificationHandler { return c.pushProcessor.GetHandler(pushNotificationName) } // RegisterPushNotificationHandler registers a handler for a specific push notification name. // Returns an error if a handler is already registered for this push notification name. // If protected is true, the handler cannot be unregistered. -func (c *SentinelClient) RegisterPushNotificationHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { +func (c *SentinelClient) RegisterPushNotificationHandler(pushNotificationName string, handler push.NotificationHandler, protected bool) error { return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected) }