diff --git a/internal/pushnotif/types.go b/internal/pushnotif/types.go deleted file mode 100644 index 7f4c657a..00000000 --- a/internal/pushnotif/types.go +++ /dev/null @@ -1,178 +0,0 @@ -package pushnotif - -import ( - "context" - - "github.com/redis/go-redis/v9/internal/pool" - "github.com/redis/go-redis/v9/internal/proto" -) - -// HandlerContext provides context information about where a push notification was received. -// This interface allows handlers to make informed decisions based on the source of the notification -// with strongly typed access to different client types. -type HandlerContext interface { - // GetClient returns the Redis client instance that received the notification. - // Returns nil if no client context is available. - GetClient() interface{} - - // GetClusterClient returns the client as a ClusterClient if it is one. - // Returns nil if the client is not a ClusterClient or no client context is available. - GetClusterClient() ClusterClientInterface - - // GetSentinelClient returns the client as a SentinelClient if it is one. - // Returns nil if the client is not a SentinelClient or no client context is available. - GetSentinelClient() SentinelClientInterface - - // GetFailoverClient returns the client as a FailoverClient if it is one. - // Returns nil if the client is not a FailoverClient or no client context is available. - GetFailoverClient() FailoverClientInterface - - // GetRegularClient returns the client as a regular Client if it is one. - // Returns nil if the client is not a regular Client or no client context is available. - GetRegularClient() RegularClientInterface - - // GetConnPool returns the connection pool from which the connection was obtained. - // Returns nil if no connection pool context is available. - GetConnPool() interface{} - - // GetPubSub returns the PubSub instance that received the notification. - // Returns nil if this is not a PubSub connection. - GetPubSub() PubSubInterface - - // GetConn returns the specific connection on which the notification was received. - // Returns nil if no connection context is available. - GetConn() *pool.Conn - - // IsBlocking returns true if the notification was received on a blocking connection. - IsBlocking() bool -} - -// Client interfaces for strongly typed access -type ClusterClientInterface interface { - // Add methods that handlers might need from ClusterClient - String() string -} - -type SentinelClientInterface interface { - // Add methods that handlers might need from SentinelClient - String() string -} - -type FailoverClientInterface interface { - // Add methods that handlers might need from FailoverClient - String() string -} - -type RegularClientInterface interface { - // Add methods that handlers might need from regular Client - String() string -} - -type PubSubInterface interface { - // Add methods that handlers might need from PubSub - String() string -} - -// handlerContext is the concrete implementation of HandlerContext interface -type handlerContext struct { - client interface{} - connPool interface{} - pubSub interface{} - conn *pool.Conn - isBlocking bool -} - -// NewHandlerContext creates a new HandlerContext implementation -func NewHandlerContext(client, connPool, pubSub interface{}, conn *pool.Conn, isBlocking bool) HandlerContext { - return &handlerContext{ - client: client, - connPool: connPool, - pubSub: pubSub, - conn: conn, - isBlocking: isBlocking, - } -} - -// GetClient returns the Redis client instance that received the notification -func (h *handlerContext) GetClient() interface{} { - return h.client -} - -// GetClusterClient returns the client as a ClusterClient if it is one -func (h *handlerContext) GetClusterClient() ClusterClientInterface { - if client, ok := h.client.(ClusterClientInterface); ok { - return client - } - return nil -} - -// GetSentinelClient returns the client as a SentinelClient if it is one -func (h *handlerContext) GetSentinelClient() SentinelClientInterface { - if client, ok := h.client.(SentinelClientInterface); ok { - return client - } - return nil -} - -// GetFailoverClient returns the client as a FailoverClient if it is one -func (h *handlerContext) GetFailoverClient() FailoverClientInterface { - if client, ok := h.client.(FailoverClientInterface); ok { - return client - } - return nil -} - -// GetRegularClient returns the client as a regular Client if it is one -func (h *handlerContext) GetRegularClient() RegularClientInterface { - if client, ok := h.client.(RegularClientInterface); ok { - return client - } - return nil -} - -// GetConnPool returns the connection pool from which the connection was obtained -func (h *handlerContext) GetConnPool() interface{} { - return h.connPool -} - -// GetPubSub returns the PubSub instance that received the notification -func (h *handlerContext) GetPubSub() PubSubInterface { - if pubSub, ok := h.pubSub.(PubSubInterface); ok { - return pubSub - } - return nil -} - -// GetConn returns the specific connection on which the notification was received -func (h *handlerContext) GetConn() *pool.Conn { - return h.conn -} - -// IsBlocking returns true if the notification was received on a blocking connection -func (h *handlerContext) IsBlocking() bool { - return h.isBlocking -} - -// Handler defines the interface for push notification handlers. -type Handler interface { - // HandlePushNotification processes a push notification with context information. - // The handlerCtx provides information about the client, connection pool, and connection - // on which the notification was received, allowing handlers to make informed decisions. - // Returns true if the notification was handled, false otherwise. - HandlePushNotification(ctx context.Context, handlerCtx HandlerContext, notification []interface{}) bool -} - -// ProcessorInterface defines the interface for push notification processors. -type ProcessorInterface interface { - GetHandler(pushNotificationName string) Handler - ProcessPendingNotifications(ctx context.Context, handlerCtx HandlerContext, rd *proto.Reader) error - RegisterHandler(pushNotificationName string, handler Handler, protected bool) error -} - -// RegistryInterface defines the interface for push notification registries. -type RegistryInterface interface { - RegisterHandler(pushNotificationName string, handler Handler, protected bool) error - UnregisterHandler(pushNotificationName string) error - GetHandler(pushNotificationName string) Handler - GetRegisteredPushNotificationNames() []string -} diff --git a/internal/pushnotif/processor.go b/internal/pushprocessor/processor.go similarity index 87% rename from internal/pushnotif/processor.go rename to internal/pushprocessor/processor.go index d3982427..87028aaf 100644 --- a/internal/pushnotif/processor.go +++ b/internal/pushprocessor/processor.go @@ -1,4 +1,4 @@ -package pushnotif +package pushprocessor import ( "context" @@ -114,7 +114,7 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx func shouldSkipNotification(notificationType string) bool { switch notificationType { // Pub/Sub notifications - handled by pub/sub system - case "message", // Regular pub/sub message + case "message", // Regular pub/sub message "pmessage", // Pattern pub/sub message "subscribe", // Subscription confirmation "unsubscribe", // Unsubscription confirmation @@ -124,27 +124,27 @@ func shouldSkipNotification(notificationType string) bool { "ssubscribe", // Sharded subscription confirmation "sunsubscribe", // Sharded unsubscription confirmation - // Stream notifications - handled by stream consumers + // Stream notifications - handled by stream consumers "xread-from", // Stream reading notifications "xreadgroup-from", // Stream consumer group notifications - // Client tracking notifications - handled by client tracking system + // Client tracking notifications - handled by client tracking system "invalidate", // Client-side caching invalidation - // Keyspace notifications - handled by keyspace notification subscribers - // Note: Keyspace notifications typically have prefixes like "__keyspace@0__:" or "__keyevent@0__:" - // but we'll handle the base notification types here - "expired", // Key expiration events - "evicted", // Key eviction events - "set", // Key set events - "del", // Key deletion events - "rename", // Key rename events - "move", // Key move events - "copy", // Key copy events - "restore", // Key restore events - "sort", // Sort operation events - "flushdb", // Database flush events - "flushall": // All databases flush events + // Keyspace notifications - handled by keyspace notification subscribers + // Note: Keyspace notifications typically have prefixes like "__keyspace@0__:" or "__keyevent@0__:" + // but we'll handle the base notification types here + "expired", // Key expiration events + "evicted", // Key eviction events + "set", // Key set events + "del", // Key deletion events + "rename", // Key rename events + "move", // Key move events + "copy", // Key copy events + "restore", // Key restore events + "sort", // Sort operation events + "flushdb", // Database flush events + "flushall": // All databases flush events return true default: return false diff --git a/internal/pushnotif/pushnotif.go b/internal/pushprocessor/pushprocessor.go similarity index 95% rename from internal/pushnotif/pushnotif.go rename to internal/pushprocessor/pushprocessor.go index 42910775..19c3014f 100644 --- a/internal/pushnotif/pushnotif.go +++ b/internal/pushprocessor/pushprocessor.go @@ -1,4 +1,4 @@ -package pushnotif +package pushprocessor // This is an EXPERIMENTAL API for push notifications. // It is subject to change without notice. diff --git a/internal/pushnotif/pushnotif_test.go b/internal/pushprocessor/pushprocessor_test.go similarity index 99% rename from internal/pushnotif/pushnotif_test.go rename to internal/pushprocessor/pushprocessor_test.go index 54d08679..7d35969b 100644 --- a/internal/pushnotif/pushnotif_test.go +++ b/internal/pushprocessor/pushprocessor_test.go @@ -1,4 +1,4 @@ -package pushnotif +package pushprocessor import ( "context" diff --git a/internal/pushnotif/registry.go b/internal/pushprocessor/registry.go similarity index 99% rename from internal/pushnotif/registry.go rename to internal/pushprocessor/registry.go index eb3ebfbd..9aaa4714 100644 --- a/internal/pushnotif/registry.go +++ b/internal/pushprocessor/registry.go @@ -1,4 +1,4 @@ -package pushnotif +package pushprocessor import ( "fmt" @@ -80,5 +80,3 @@ func (r *Registry) GetRegisteredPushNotificationNames() []string { } return names } - - diff --git a/pubsub.go b/pubsub.go index b252530e..243c3979 100644 --- a/pubsub.go +++ b/pubsub.go @@ -10,7 +10,6 @@ import ( "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/proto" - "github.com/redis/go-redis/v9/internal/pushnotif" ) // PubSub implements Pub/Sub commands as described in @@ -549,27 +548,16 @@ func (c *PubSub) processPendingPushNotificationWithReader(ctx context.Context, c // Create handler context with client, connection pool, and connection information handlerCtx := c.pushNotificationHandlerContext(cn) - // Convert internal context to public context for the processor - publicCtx := convertInternalToPublicContext(handlerCtx) - return c.pushProcessor.ProcessPendingNotifications(ctx, publicCtx, rd) + return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd) } -func (c *PubSub) pushNotificationHandlerContext(cn *pool.Conn) pushnotif.HandlerContext { +func (c *PubSub) pushNotificationHandlerContext(cn *pool.Conn) PushNotificationHandlerContext { // PubSub doesn't have a client or connection pool, so we pass nil for those // PubSub connections are blocking - return pushnotif.NewHandlerContext(nil, nil, c, cn, true) + return NewPushNotificationHandlerContext(nil, nil, c, cn, true) } -// convertInternalToPublicContext converts internal HandlerContext to public PushNotificationHandlerContext -func convertInternalToPublicContext(internalCtx pushnotif.HandlerContext) PushNotificationHandlerContext { - return NewPushNotificationHandlerContext( - internalCtx.GetClient(), - internalCtx.GetConnPool(), - internalCtx.GetPubSub(), - internalCtx.GetConn(), - internalCtx.IsBlocking(), - ) -} + type ChannelOption func(c *channel) diff --git a/push_notifications.go b/push_notifications.go index 6b150769..9d2ed2cc 100644 --- a/push_notifications.go +++ b/push_notifications.go @@ -2,10 +2,29 @@ package redis import ( "context" + "fmt" + "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/proto" - "github.com/redis/go-redis/v9/internal/pushnotif" +) + +// Push notification constants for cluster operations +const ( + // MOVING indicates a slot is being moved to a different node + PushNotificationMoving = "MOVING" + + // MIGRATING indicates a slot is being migrated from this node + PushNotificationMigrating = "MIGRATING" + + // MIGRATED indicates a slot has been migrated to this node + PushNotificationMigrated = "MIGRATED" + + // FAILING_OVER indicates a failover is starting + PushNotificationFailingOver = "FAILING_OVER" + + // FAILED_OVER indicates a failover has completed + PushNotificationFailedOver = "FAILED_OVER" ) // PushNotificationHandlerContext provides context information about where a push notification was received. @@ -137,75 +156,197 @@ func (h *pushNotificationHandlerContext) IsBlocking() bool { return h.isBlocking } -// handlerAdapter adapts a PushNotificationHandler to the internal pushnotif.Handler interface -type handlerAdapter struct { - handler PushNotificationHandler +// Registry manages push notification handlers +type Registry struct { + handlers map[string]PushNotificationHandler + protected map[string]bool } -// HandlePushNotification adapts the public handler to the internal interface -func (a *handlerAdapter) HandlePushNotification(ctx context.Context, handlerCtx pushnotif.HandlerContext, notification []interface{}) bool { - // Convert internal HandlerContext to public PushNotificationHandlerContext - // We need to extract the fields from the internal context and create a public one - var client, connPool, pubSub interface{} - var conn *pool.Conn - var isBlocking bool +// NewRegistry creates a new push notification registry +func NewRegistry() *Registry { + return &Registry{ + handlers: make(map[string]PushNotificationHandler), + protected: make(map[string]bool), + } +} - // Extract information from internal context - client = handlerCtx.GetClient() - connPool = handlerCtx.GetConnPool() - conn = handlerCtx.GetConn() - isBlocking = handlerCtx.IsBlocking() - - // Try to get PubSub if available - if handlerCtx.GetPubSub() != nil { - pubSub = handlerCtx.GetPubSub() +// RegisterHandler registers a handler for a specific push notification name +func (r *Registry) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { + if handler == nil { + return fmt.Errorf("handler cannot be nil") } - // Create public context - publicCtx := NewPushNotificationHandlerContext(client, connPool, pubSub, conn, isBlocking) - - // Call the public handler - return a.handler.HandlePushNotification(ctx, publicCtx, notification) -} - -// contextAdapter converts internal HandlerContext to public PushNotificationHandlerContext - -// voidProcessorAdapter adapts a VoidProcessor to the public interface -type voidProcessorAdapter struct { - processor *pushnotif.VoidProcessor -} - -// NewVoidProcessorAdapter creates a new void processor adapter -func NewVoidProcessorAdapter() PushNotificationProcessorInterface { - return &voidProcessorAdapter{ - processor: pushnotif.NewVoidProcessor(), + // Check if handler already exists and is protected + if existingProtected, exists := r.protected[pushNotificationName]; exists && existingProtected { + return fmt.Errorf("cannot overwrite protected handler for push notification: %s", pushNotificationName) } + + r.handlers[pushNotificationName] = handler + r.protected[pushNotificationName] = protected + return nil +} + +// GetHandler returns the handler for a specific push notification name +func (r *Registry) GetHandler(pushNotificationName string) PushNotificationHandler { + return r.handlers[pushNotificationName] +} + +// UnregisterHandler removes a handler for a specific push notification name +func (r *Registry) UnregisterHandler(pushNotificationName string) error { + // Check if handler is protected + if protected, exists := r.protected[pushNotificationName]; exists && protected { + return fmt.Errorf("cannot unregister protected handler for push notification: %s", pushNotificationName) + } + + delete(r.handlers, pushNotificationName) + delete(r.protected, pushNotificationName) + return nil +} + +// GetRegisteredPushNotificationNames returns all registered push notification names +func (r *Registry) GetRegisteredPushNotificationNames() []string { + names := make([]string, 0, len(r.handlers)) + for name := range r.handlers { + names = append(names, name) + } + return names +} + +// Processor handles push notifications with a registry of handlers +type Processor struct { + registry *Registry +} + +// NewProcessor creates a new push notification processor +func NewProcessor() *Processor { + return &Processor{ + registry: NewRegistry(), + } +} + +// GetHandler returns the handler for a specific push notification name +func (p *Processor) GetHandler(pushNotificationName string) PushNotificationHandler { + return p.registry.GetHandler(pushNotificationName) +} + +// RegisterHandler registers a handler for a specific push notification name +func (p *Processor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { + return p.registry.RegisterHandler(pushNotificationName, handler, protected) +} + +// UnregisterHandler removes a handler for a specific push notification name +func (p *Processor) UnregisterHandler(pushNotificationName string) error { + return p.registry.UnregisterHandler(pushNotificationName) +} + +// ProcessPendingNotifications checks for and processes any pending push notifications +func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { + if rd == nil { + return nil + } + + for { + // Check if there's data available to read + replyType, err := rd.PeekReplyType() + if err != nil { + // No more data available or error reading + break + } + + // Only process push notifications (arrays starting with >) + if replyType != proto.RespPush { + break + } + + // Read the push notification + reply, err := rd.ReadReply() + if err != nil { + internal.Logger.Printf(ctx, "push: error reading push notification: %v", err) + break + } + + // Convert to slice of interfaces + notification, ok := reply.([]interface{}) + if !ok { + continue + } + + // Handle the notification directly + if len(notification) > 0 { + // Extract the notification type (first element) + if notificationType, ok := notification[0].(string); ok { + // Skip notifications that should be handled by other systems + if shouldSkipNotification(notificationType) { + continue + } + + // Get the handler for this notification type + if handler := p.registry.GetHandler(notificationType); handler != nil { + // Handle the notification + handler.HandlePushNotification(ctx, handlerCtx, notification) + } + } + } + } + + return nil +} + +// shouldSkipNotification checks if a notification type should be ignored by the push notification +// processor and handled by other specialized systems instead (pub/sub, streams, keyspace, etc.). +func shouldSkipNotification(notificationType string) bool { + switch notificationType { + // Pub/Sub notifications - handled by pub/sub system + case "message", // Regular pub/sub message + "pmessage", // Pattern pub/sub message + "subscribe", // Subscription confirmation + "unsubscribe", // Unsubscription confirmation + "psubscribe", // Pattern subscription confirmation + "punsubscribe", // Pattern unsubscription confirmation + "smessage", // Sharded pub/sub message (Redis 7.0+) + "ssubscribe", // Sharded subscription confirmation + "sunsubscribe": // Sharded unsubscription confirmation + return true + default: + return false + } +} + +// VoidProcessor discards all push notifications without processing them +type VoidProcessor struct{} + +// NewVoidProcessor creates a new void push notification processor +func NewVoidProcessor() *VoidProcessor { + return &VoidProcessor{} } // GetHandler returns nil for void processor since it doesn't maintain handlers -func (v *voidProcessorAdapter) GetHandler(pushNotificationName string) PushNotificationHandler { +func (v *VoidProcessor) GetHandler(pushNotificationName string) PushNotificationHandler { return nil } // RegisterHandler returns an error for void processor since it doesn't maintain handlers -func (v *voidProcessorAdapter) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - // Void processor doesn't support handlers - return v.processor.RegisterHandler(pushNotificationName, nil, protected) +func (v *VoidProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { + return fmt.Errorf("cannot register push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName) } -// ProcessPendingNotifications reads and discards any pending push notifications -func (v *voidProcessorAdapter) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { - // Convert public context to internal context - internalCtx := pushnotif.NewHandlerContext( - handlerCtx.GetClient(), - handlerCtx.GetConnPool(), - handlerCtx.GetPubSub(), - handlerCtx.GetConn(), - handlerCtx.IsBlocking(), - ) - return v.processor.ProcessPendingNotifications(ctx, internalCtx, rd) +// UnregisterHandler returns an error for void processor since it doesn't maintain handlers +func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error { + return fmt.Errorf("cannot unregister push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName) } +// ProcessPendingNotifications for VoidProcessor does nothing since push notifications +// are only available in RESP3 and this processor is used for RESP2 connections. +// This avoids unnecessary buffer scanning overhead. +func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { + // VoidProcessor is used for RESP2 connections where push notifications are not available. + // Since push notifications only exist in RESP3, we can safely skip all processing + // to avoid unnecessary buffer scanning overhead. + return nil +} + + + // PushNotificationProcessorInterface defines the interface for push notification processors. type PushNotificationProcessorInterface interface { GetHandler(pushNotificationName string) PushNotificationHandler @@ -215,21 +356,19 @@ type PushNotificationProcessorInterface interface { // PushNotificationRegistry manages push notification handlers. type PushNotificationRegistry struct { - registry *pushnotif.Registry + registry *Registry } // NewPushNotificationRegistry creates a new push notification registry. func NewPushNotificationRegistry() *PushNotificationRegistry { return &PushNotificationRegistry{ - registry: pushnotif.NewRegistry(), + registry: NewRegistry(), } } // RegisterHandler registers a handler for a specific push notification name. func (r *PushNotificationRegistry) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - // Wrap the public handler in an adapter for the internal interface - adapter := &handlerAdapter{handler: handler} - return r.registry.RegisterHandler(pushNotificationName, adapter, protected) + return r.registry.RegisterHandler(pushNotificationName, handler, protected) } // UnregisterHandler removes a handler for a specific push notification name. @@ -239,18 +378,7 @@ func (r *PushNotificationRegistry) UnregisterHandler(pushNotificationName string // GetHandler returns the handler for a specific push notification name. func (r *PushNotificationRegistry) GetHandler(pushNotificationName string) PushNotificationHandler { - internalHandler := r.registry.GetHandler(pushNotificationName) - if internalHandler == nil { - return nil - } - - // If it's our adapter, return the original handler - if adapter, ok := internalHandler.(*handlerAdapter); ok { - return adapter.handler - } - - // This shouldn't happen in normal usage, but handle it gracefully - return nil + return r.registry.GetHandler(pushNotificationName) } // GetRegisteredPushNotificationNames returns a list of all registered push notification names. @@ -260,37 +388,24 @@ func (r *PushNotificationRegistry) GetRegisteredPushNotificationNames() []string // PushNotificationProcessor handles push notifications with a registry of handlers. type PushNotificationProcessor struct { - processor *pushnotif.Processor + processor *Processor } // NewPushNotificationProcessor creates a new push notification processor. func NewPushNotificationProcessor() *PushNotificationProcessor { return &PushNotificationProcessor{ - processor: pushnotif.NewProcessor(), + processor: NewProcessor(), } } // GetHandler returns the handler for a specific push notification name. func (p *PushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler { - internalHandler := p.processor.GetHandler(pushNotificationName) - if internalHandler == nil { - return nil - } - - // If it's our adapter, return the original handler - if adapter, ok := internalHandler.(*handlerAdapter); ok { - return adapter.handler - } - - // This shouldn't happen in normal usage, but handle it gracefully - return nil + return p.processor.GetHandler(pushNotificationName) } // RegisterHandler registers a handler for a specific push notification name. func (p *PushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - // Wrap the public handler in an adapter for the internal interface - adapter := &handlerAdapter{handler: handler} - return p.processor.RegisterHandler(pushNotificationName, adapter, protected) + return p.processor.RegisterHandler(pushNotificationName, handler, protected) } // UnregisterHandler removes a handler for a specific push notification name. @@ -301,61 +416,36 @@ func (p *PushNotificationProcessor) UnregisterHandler(pushNotificationName strin // ProcessPendingNotifications checks for and processes any pending push notifications. // The handlerCtx provides context about the client, connection pool, and connection. func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { - // Convert public context to internal context - internalCtx := pushnotif.NewHandlerContext( - handlerCtx.GetClient(), - handlerCtx.GetConnPool(), - handlerCtx.GetPubSub(), - handlerCtx.GetConn(), - handlerCtx.IsBlocking(), - ) - return p.processor.ProcessPendingNotifications(ctx, internalCtx, rd) + return p.processor.ProcessPendingNotifications(ctx, handlerCtx, rd) } // VoidPushNotificationProcessor discards all push notifications without processing them. type VoidPushNotificationProcessor struct { - processor *pushnotif.VoidProcessor + processor *VoidProcessor } // NewVoidPushNotificationProcessor creates a new void push notification processor. func NewVoidPushNotificationProcessor() *VoidPushNotificationProcessor { return &VoidPushNotificationProcessor{ - processor: pushnotif.NewVoidProcessor(), + processor: NewVoidProcessor(), } } // GetHandler returns nil for void processor since it doesn't maintain handlers. func (v *VoidPushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler { - return nil + return v.processor.GetHandler(pushNotificationName) } // RegisterHandler returns an error for void processor since it doesn't maintain handlers. func (v *VoidPushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - return v.processor.RegisterHandler(pushNotificationName, nil, protected) + return v.processor.RegisterHandler(pushNotificationName, handler, protected) } // ProcessPendingNotifications reads and discards any pending push notifications. func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { - // Convert public context to internal context - internalCtx := pushnotif.NewHandlerContext( - handlerCtx.GetClient(), - handlerCtx.GetConnPool(), - handlerCtx.GetPubSub(), - handlerCtx.GetConn(), - handlerCtx.IsBlocking(), - ) - return v.processor.ProcessPendingNotifications(ctx, internalCtx, rd) + return v.processor.ProcessPendingNotifications(ctx, handlerCtx, rd) } -// Redis Cluster push notification names -const ( - PushNotificationMoving = "MOVING" - PushNotificationMigrating = "MIGRATING" - PushNotificationMigrated = "MIGRATED" - PushNotificationFailingOver = "FAILING_OVER" - PushNotificationFailedOver = "FAILED_OVER" -) - // PushNotificationInfo contains metadata about a push notification. type PushNotificationInfo struct { Name string diff --git a/push_notifications_test.go b/push_notifications_test.go new file mode 100644 index 00000000..06137f2c --- /dev/null +++ b/push_notifications_test.go @@ -0,0 +1,242 @@ +package redis + +import ( + "context" + "testing" + + "github.com/redis/go-redis/v9/internal/pool" +) + +// TestHandler implements PushNotificationHandler interface for testing +type TestHandler struct { + name string + handled [][]interface{} + returnValue bool +} + +func NewTestHandler(name string, returnValue bool) *TestHandler { + return &TestHandler{ + name: name, + handled: make([][]interface{}, 0), + returnValue: returnValue, + } +} + +func (h *TestHandler) HandlePushNotification(ctx context.Context, handlerCtx PushNotificationHandlerContext, notification []interface{}) bool { + h.handled = append(h.handled, notification) + return h.returnValue +} + +func (h *TestHandler) GetHandledNotifications() [][]interface{} { + return h.handled +} + +func (h *TestHandler) Reset() { + h.handled = make([][]interface{}, 0) +} + +func TestPushNotificationRegistry(t *testing.T) { + t.Run("NewRegistry", func(t *testing.T) { + registry := NewRegistry() + if registry == nil { + t.Error("NewRegistry should not return nil") + } + + if len(registry.GetRegisteredPushNotificationNames()) != 0 { + t.Error("New registry should have no registered handlers") + } + }) + + t.Run("RegisterHandler", func(t *testing.T) { + registry := NewRegistry() + handler := NewTestHandler("test", true) + + err := registry.RegisterHandler("TEST", handler, false) + if err != nil { + t.Errorf("RegisterHandler should not error: %v", err) + } + + retrievedHandler := registry.GetHandler("TEST") + if retrievedHandler != handler { + t.Error("GetHandler should return the registered handler") + } + }) + + t.Run("UnregisterHandler", func(t *testing.T) { + registry := NewRegistry() + handler := NewTestHandler("test", true) + + registry.RegisterHandler("TEST", handler, false) + + err := registry.UnregisterHandler("TEST") + if err != nil { + t.Errorf("UnregisterHandler should not error: %v", err) + } + + retrievedHandler := registry.GetHandler("TEST") + if retrievedHandler != nil { + t.Error("GetHandler should return nil after unregistering") + } + }) + + t.Run("ProtectedHandler", func(t *testing.T) { + registry := NewRegistry() + handler := NewTestHandler("test", true) + + // Register protected handler + err := registry.RegisterHandler("TEST", handler, true) + if err != nil { + t.Errorf("RegisterHandler should not error: %v", err) + } + + // Try to unregister protected handler + err = registry.UnregisterHandler("TEST") + if err == nil { + t.Error("UnregisterHandler should error for protected handler") + } + + // Handler should still be there + retrievedHandler := registry.GetHandler("TEST") + if retrievedHandler != handler { + t.Error("Protected handler should still be registered") + } + }) +} + +func TestPushNotificationProcessor(t *testing.T) { + t.Run("NewProcessor", func(t *testing.T) { + processor := NewProcessor() + if processor == nil { + t.Error("NewProcessor should not return nil") + } + }) + + t.Run("RegisterAndGetHandler", func(t *testing.T) { + processor := NewProcessor() + handler := NewTestHandler("test", true) + + err := processor.RegisterHandler("TEST", handler, false) + if err != nil { + t.Errorf("RegisterHandler should not error: %v", err) + } + + retrievedHandler := processor.GetHandler("TEST") + if retrievedHandler != handler { + t.Error("GetHandler should return the registered handler") + } + }) +} + +func TestVoidProcessor(t *testing.T) { + t.Run("NewVoidProcessor", func(t *testing.T) { + processor := NewVoidProcessor() + if processor == nil { + t.Error("NewVoidProcessor should not return nil") + } + }) + + t.Run("GetHandler", func(t *testing.T) { + processor := NewVoidProcessor() + handler := processor.GetHandler("TEST") + if handler != nil { + t.Error("VoidProcessor GetHandler should always return nil") + } + }) + + t.Run("RegisterHandler", func(t *testing.T) { + processor := NewVoidProcessor() + handler := NewTestHandler("test", true) + + err := processor.RegisterHandler("TEST", handler, false) + if err == nil { + t.Error("VoidProcessor RegisterHandler should return error") + } + }) + + t.Run("ProcessPendingNotifications", func(t *testing.T) { + processor := NewVoidProcessor() + ctx := context.Background() + handlerCtx := NewPushNotificationHandlerContext(nil, nil, nil, nil, false) + + // VoidProcessor should always succeed and do nothing + err := processor.ProcessPendingNotifications(ctx, handlerCtx, nil) + if err != nil { + t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err) + } + }) +} + +func TestPushNotificationHandlerContext(t *testing.T) { + t.Run("NewHandlerContext", func(t *testing.T) { + client := &Client{} + connPool := &pool.ConnPool{} + pubSub := &PubSub{} + conn := &pool.Conn{} + + ctx := NewPushNotificationHandlerContext(client, connPool, pubSub, conn, true) + if ctx == nil { + t.Error("NewPushNotificationHandlerContext should not return nil") + } + + if ctx.GetClient() != client { + t.Error("GetClient should return the provided client") + } + + if ctx.GetConnPool() != connPool { + t.Error("GetConnPool should return the provided connection pool") + } + + if ctx.GetPubSub() != pubSub { + t.Error("GetPubSub should return the provided PubSub") + } + + if ctx.GetConn() != conn { + t.Error("GetConn should return the provided connection") + } + + if !ctx.IsBlocking() { + t.Error("IsBlocking should return true") + } + }) + + t.Run("TypedGetters", func(t *testing.T) { + client := &Client{} + ctx := NewPushNotificationHandlerContext(client, nil, nil, nil, false) + + // Test regular client getter + regularClient := ctx.GetRegularClient() + if regularClient != client { + t.Error("GetRegularClient should return the client when it's a regular client") + } + + // Test cluster client getter (should be nil for regular client) + clusterClient := ctx.GetClusterClient() + if clusterClient != nil { + t.Error("GetClusterClient should return nil when client is not a cluster client") + } + }) +} + +func TestPushNotificationConstants(t *testing.T) { + t.Run("Constants", func(t *testing.T) { + if PushNotificationMoving != "MOVING" { + t.Error("PushNotificationMoving should be 'MOVING'") + } + + if PushNotificationMigrating != "MIGRATING" { + t.Error("PushNotificationMigrating should be 'MIGRATING'") + } + + if PushNotificationMigrated != "MIGRATED" { + t.Error("PushNotificationMigrated should be 'MIGRATED'") + } + + if PushNotificationFailingOver != "FAILING_OVER" { + t.Error("PushNotificationFailingOver should be 'FAILING_OVER'") + } + + if PushNotificationFailedOver != "FAILED_OVER" { + t.Error("PushNotificationFailedOver should be 'FAILED_OVER'") + } + }) +} diff --git a/pushnotif/types.go b/pushnotif/types.go new file mode 100644 index 00000000..ea7621f1 --- /dev/null +++ b/pushnotif/types.go @@ -0,0 +1,32 @@ +package pushnotif + +import ( + "context" + "github.com/redis/go-redis/v9/internal/proto" + "github.com/redis/go-redis/v9/internal/pushprocessor" +) + +// PushProcessorInterface defines the interface for push notification processors. +type PushProcessorInterface interface { + GetHandler(pushNotificationName string) PushNotificationHandler + ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error + RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error +} + +// RegistryInterface defines the interface for push notification registries. +type RegistryInterface interface { + RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error + UnregisterHandler(pushNotificationName string) error + GetHandler(pushNotificationName string) PushNotificationHandler + GetRegisteredPushNotificationNames() []string +} + +// NewProcessor creates a new push notification processor. +func NewProcessor() PushProcessorInterface { + return pushprocessor.NewProcessor() +} + +// NewVoidProcessor creates a new void push notification processor. +func NewVoidProcessor() PushProcessorInterface { + return pushprocessor.NewVoidProcessor() +} diff --git a/redis.go b/redis.go index 9a06af7b..205caeec 100644 --- a/redis.go +++ b/redis.go @@ -14,7 +14,6 @@ import ( "github.com/redis/go-redis/v9/internal/hscan" "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/proto" - "github.com/redis/go-redis/v9/internal/pushnotif" ) // Scanner internal/hscan.Scanner exposed interface. @@ -1122,9 +1121,7 @@ func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn return cn.WithReader(ctx, 0, func(rd *proto.Reader) error { // Create handler context with client, connection pool, and connection information handlerCtx := c.pushNotificationHandlerContext(cn) - // Convert internal context to public context for the processor - publicCtx := convertInternalToPublicContext(handlerCtx) - return c.pushProcessor.ProcessPendingNotifications(ctx, publicCtx, rd) + return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd) }) } @@ -1137,14 +1134,12 @@ func (c *baseClient) processPendingPushNotificationWithReader(ctx context.Contex // Create handler context with client, connection pool, and connection information handlerCtx := c.pushNotificationHandlerContext(cn) - // Convert internal context to public context for the processor - publicCtx := convertInternalToPublicContext(handlerCtx) - return c.pushProcessor.ProcessPendingNotifications(ctx, publicCtx, rd) + return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd) } // pushNotificationHandlerContext creates a handler context for push notification processing -func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) pushnotif.HandlerContext { - return pushnotif.NewHandlerContext(c, c.connPool, nil, cn, false) +func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) PushNotificationHandlerContext { + return NewPushNotificationHandlerContext(c, c.connPool, nil, cn, false) } diff --git a/sentinel.go b/sentinel.go index d970306f..fa22db7f 100644 --- a/sentinel.go +++ b/sentinel.go @@ -500,7 +500,7 @@ func NewSentinelClient(opt *Options) *SentinelClient { // Initialize push notification processor using shared helper // Use void processor for Sentinel clients - c.pushProcessor = NewVoidProcessorAdapter() + c.pushProcessor = NewVoidPushNotificationProcessor() c.initHooks(hooks{ dial: c.baseClient.dial,