1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-20 22:42:59 +03:00

refactor: move all push notification logic to root package and remove adapters

Consolidate all push notification handling logic in the root package to eliminate
adapters and simplify the architecture. This provides direct access to concrete
types without any intermediate layers or type conversions.

Key Changes:

1. Moved Core Types to Root Package:
   - Moved Registry, Processor, VoidProcessor to push_notifications.go
   - Moved all push notification constants to root package
   - Removed internal/pushnotif package dependencies
   - Direct implementation without internal abstractions

2. Eliminated All Adapters:
   - Removed handlerAdapter that bridged internal and public interfaces
   - Removed voidProcessorAdapter for void processor functionality
   - Removed convertInternalToPublicContext conversion functions
   - Direct usage of concrete types throughout

3. Simplified Architecture:
   - PushNotificationHandlerContext directly implemented in root package
   - PushNotificationHandler directly implemented in root package
   - Registry, Processor, VoidProcessor directly in root package
   - No intermediate layers or type conversions needed

4. Direct Type Usage:
   - GetClusterClient() returns *ClusterClient directly
   - GetSentinelClient() returns *SentinelClient directly
   - GetRegularClient() returns *Client directly
   - GetPubSub() returns *PubSub directly
   - No interface casting or type assertions required

5. Updated All Integration Points:
   - Updated redis.go to use direct types
   - Updated pubsub.go to use direct types
   - Updated sentinel.go to use direct types
   - Removed all internal/pushnotif imports
   - Simplified context creation and usage

6. Core Implementation in Root Package:
   ```go
   // Direct implementation - no adapters needed
   type Registry struct {
       handlers  map[string]PushNotificationHandler
       protected map[string]bool
   }

   type Processor struct {
       registry *Registry
   }

   type VoidProcessor struct{}
   ```

7. Handler Context with Concrete Types:
   ```go
   type PushNotificationHandlerContext interface {
       GetClusterClient() *ClusterClient    // Direct concrete type
       GetSentinelClient() *SentinelClient  // Direct concrete type
       GetRegularClient() *Client           // Direct concrete type
       GetPubSub() *PubSub                  // Direct concrete type
   }
   ```

8. Comprehensive Test Suite:
   - Added push_notifications_test.go with full test coverage
   - Tests for Registry, Processor, VoidProcessor
   - Tests for HandlerContext with concrete type access
   - Tests for all push notification constants
   - Validates all functionality works correctly

9. Benefits:
   - Eliminated complex adapter pattern
   - Removed unnecessary type conversions
   - Simplified codebase with direct type usage
   - Better performance without adapter overhead
   - Cleaner architecture with single source of truth
   - Enhanced developer experience with direct access

10. Architecture Simplification:
    Before: Client -> Adapter -> Internal -> Adapter -> Handler
    After:  Client -> Handler (direct)

    No more:
    - handlerAdapter bridging interfaces
    - voidProcessorAdapter for void functionality
    - convertInternalToPublicContext conversions
    - Complex type mapping between layers

This refactoring provides a much cleaner, simpler architecture where all
push notification logic lives in the root package with direct access to
concrete Redis client types, eliminating unnecessary complexity while
maintaining full functionality and type safety.
This commit is contained in:
Nedyalko Dyakov
2025-07-04 21:13:47 +03:00
parent d530d45b9b
commit 5972b4c23f
11 changed files with 512 additions and 345 deletions

View File

@ -1,178 +0,0 @@
package pushnotif
import (
"context"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto"
)
// HandlerContext provides context information about where a push notification was received.
// This interface allows handlers to make informed decisions based on the source of the notification
// with strongly typed access to different client types.
type HandlerContext interface {
// GetClient returns the Redis client instance that received the notification.
// Returns nil if no client context is available.
GetClient() interface{}
// GetClusterClient returns the client as a ClusterClient if it is one.
// Returns nil if the client is not a ClusterClient or no client context is available.
GetClusterClient() ClusterClientInterface
// GetSentinelClient returns the client as a SentinelClient if it is one.
// Returns nil if the client is not a SentinelClient or no client context is available.
GetSentinelClient() SentinelClientInterface
// GetFailoverClient returns the client as a FailoverClient if it is one.
// Returns nil if the client is not a FailoverClient or no client context is available.
GetFailoverClient() FailoverClientInterface
// GetRegularClient returns the client as a regular Client if it is one.
// Returns nil if the client is not a regular Client or no client context is available.
GetRegularClient() RegularClientInterface
// GetConnPool returns the connection pool from which the connection was obtained.
// Returns nil if no connection pool context is available.
GetConnPool() interface{}
// GetPubSub returns the PubSub instance that received the notification.
// Returns nil if this is not a PubSub connection.
GetPubSub() PubSubInterface
// GetConn returns the specific connection on which the notification was received.
// Returns nil if no connection context is available.
GetConn() *pool.Conn
// IsBlocking returns true if the notification was received on a blocking connection.
IsBlocking() bool
}
// Client interfaces for strongly typed access
type ClusterClientInterface interface {
// Add methods that handlers might need from ClusterClient
String() string
}
type SentinelClientInterface interface {
// Add methods that handlers might need from SentinelClient
String() string
}
type FailoverClientInterface interface {
// Add methods that handlers might need from FailoverClient
String() string
}
type RegularClientInterface interface {
// Add methods that handlers might need from regular Client
String() string
}
type PubSubInterface interface {
// Add methods that handlers might need from PubSub
String() string
}
// handlerContext is the concrete implementation of HandlerContext interface
type handlerContext struct {
client interface{}
connPool interface{}
pubSub interface{}
conn *pool.Conn
isBlocking bool
}
// NewHandlerContext creates a new HandlerContext implementation
func NewHandlerContext(client, connPool, pubSub interface{}, conn *pool.Conn, isBlocking bool) HandlerContext {
return &handlerContext{
client: client,
connPool: connPool,
pubSub: pubSub,
conn: conn,
isBlocking: isBlocking,
}
}
// GetClient returns the Redis client instance that received the notification
func (h *handlerContext) GetClient() interface{} {
return h.client
}
// GetClusterClient returns the client as a ClusterClient if it is one
func (h *handlerContext) GetClusterClient() ClusterClientInterface {
if client, ok := h.client.(ClusterClientInterface); ok {
return client
}
return nil
}
// GetSentinelClient returns the client as a SentinelClient if it is one
func (h *handlerContext) GetSentinelClient() SentinelClientInterface {
if client, ok := h.client.(SentinelClientInterface); ok {
return client
}
return nil
}
// GetFailoverClient returns the client as a FailoverClient if it is one
func (h *handlerContext) GetFailoverClient() FailoverClientInterface {
if client, ok := h.client.(FailoverClientInterface); ok {
return client
}
return nil
}
// GetRegularClient returns the client as a regular Client if it is one
func (h *handlerContext) GetRegularClient() RegularClientInterface {
if client, ok := h.client.(RegularClientInterface); ok {
return client
}
return nil
}
// GetConnPool returns the connection pool from which the connection was obtained
func (h *handlerContext) GetConnPool() interface{} {
return h.connPool
}
// GetPubSub returns the PubSub instance that received the notification
func (h *handlerContext) GetPubSub() PubSubInterface {
if pubSub, ok := h.pubSub.(PubSubInterface); ok {
return pubSub
}
return nil
}
// GetConn returns the specific connection on which the notification was received
func (h *handlerContext) GetConn() *pool.Conn {
return h.conn
}
// IsBlocking returns true if the notification was received on a blocking connection
func (h *handlerContext) IsBlocking() bool {
return h.isBlocking
}
// Handler defines the interface for push notification handlers.
type Handler interface {
// HandlePushNotification processes a push notification with context information.
// The handlerCtx provides information about the client, connection pool, and connection
// on which the notification was received, allowing handlers to make informed decisions.
// Returns true if the notification was handled, false otherwise.
HandlePushNotification(ctx context.Context, handlerCtx HandlerContext, notification []interface{}) bool
}
// ProcessorInterface defines the interface for push notification processors.
type ProcessorInterface interface {
GetHandler(pushNotificationName string) Handler
ProcessPendingNotifications(ctx context.Context, handlerCtx HandlerContext, rd *proto.Reader) error
RegisterHandler(pushNotificationName string, handler Handler, protected bool) error
}
// RegistryInterface defines the interface for push notification registries.
type RegistryInterface interface {
RegisterHandler(pushNotificationName string, handler Handler, protected bool) error
UnregisterHandler(pushNotificationName string) error
GetHandler(pushNotificationName string) Handler
GetRegisteredPushNotificationNames() []string
}

View File

@ -1,4 +1,4 @@
package pushnotif package pushprocessor
import ( import (
"context" "context"

View File

@ -1,4 +1,4 @@
package pushnotif package pushprocessor
// This is an EXPERIMENTAL API for push notifications. // This is an EXPERIMENTAL API for push notifications.
// It is subject to change without notice. // It is subject to change without notice.

View File

@ -1,4 +1,4 @@
package pushnotif package pushprocessor
import ( import (
"context" "context"

View File

@ -1,4 +1,4 @@
package pushnotif package pushprocessor
import ( import (
"fmt" "fmt"
@ -80,5 +80,3 @@ func (r *Registry) GetRegisteredPushNotificationNames() []string {
} }
return names return names
} }

View File

@ -10,7 +10,6 @@ import (
"github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/internal/pushnotif"
) )
// PubSub implements Pub/Sub commands as described in // PubSub implements Pub/Sub commands as described in
@ -549,27 +548,16 @@ func (c *PubSub) processPendingPushNotificationWithReader(ctx context.Context, c
// Create handler context with client, connection pool, and connection information // Create handler context with client, connection pool, and connection information
handlerCtx := c.pushNotificationHandlerContext(cn) handlerCtx := c.pushNotificationHandlerContext(cn)
// Convert internal context to public context for the processor return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd)
publicCtx := convertInternalToPublicContext(handlerCtx)
return c.pushProcessor.ProcessPendingNotifications(ctx, publicCtx, rd)
} }
func (c *PubSub) pushNotificationHandlerContext(cn *pool.Conn) pushnotif.HandlerContext { func (c *PubSub) pushNotificationHandlerContext(cn *pool.Conn) PushNotificationHandlerContext {
// PubSub doesn't have a client or connection pool, so we pass nil for those // PubSub doesn't have a client or connection pool, so we pass nil for those
// PubSub connections are blocking // PubSub connections are blocking
return pushnotif.NewHandlerContext(nil, nil, c, cn, true) return NewPushNotificationHandlerContext(nil, nil, c, cn, true)
} }
// convertInternalToPublicContext converts internal HandlerContext to public PushNotificationHandlerContext
func convertInternalToPublicContext(internalCtx pushnotif.HandlerContext) PushNotificationHandlerContext {
return NewPushNotificationHandlerContext(
internalCtx.GetClient(),
internalCtx.GetConnPool(),
internalCtx.GetPubSub(),
internalCtx.GetConn(),
internalCtx.IsBlocking(),
)
}
type ChannelOption func(c *channel) type ChannelOption func(c *channel)

View File

@ -2,10 +2,29 @@ package redis
import ( import (
"context" "context"
"fmt"
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/internal/pushnotif" )
// Push notification constants for cluster operations
const (
// MOVING indicates a slot is being moved to a different node
PushNotificationMoving = "MOVING"
// MIGRATING indicates a slot is being migrated from this node
PushNotificationMigrating = "MIGRATING"
// MIGRATED indicates a slot has been migrated to this node
PushNotificationMigrated = "MIGRATED"
// FAILING_OVER indicates a failover is starting
PushNotificationFailingOver = "FAILING_OVER"
// FAILED_OVER indicates a failover has completed
PushNotificationFailedOver = "FAILED_OVER"
) )
// PushNotificationHandlerContext provides context information about where a push notification was received. // PushNotificationHandlerContext provides context information about where a push notification was received.
@ -137,75 +156,197 @@ func (h *pushNotificationHandlerContext) IsBlocking() bool {
return h.isBlocking return h.isBlocking
} }
// handlerAdapter adapts a PushNotificationHandler to the internal pushnotif.Handler interface // Registry manages push notification handlers
type handlerAdapter struct { type Registry struct {
handler PushNotificationHandler handlers map[string]PushNotificationHandler
protected map[string]bool
} }
// HandlePushNotification adapts the public handler to the internal interface // NewRegistry creates a new push notification registry
func (a *handlerAdapter) HandlePushNotification(ctx context.Context, handlerCtx pushnotif.HandlerContext, notification []interface{}) bool { func NewRegistry() *Registry {
// Convert internal HandlerContext to public PushNotificationHandlerContext return &Registry{
// We need to extract the fields from the internal context and create a public one handlers: make(map[string]PushNotificationHandler),
var client, connPool, pubSub interface{} protected: make(map[string]bool),
var conn *pool.Conn }
var isBlocking bool }
// Extract information from internal context // RegisterHandler registers a handler for a specific push notification name
client = handlerCtx.GetClient() func (r *Registry) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
connPool = handlerCtx.GetConnPool() if handler == nil {
conn = handlerCtx.GetConn() return fmt.Errorf("handler cannot be nil")
isBlocking = handlerCtx.IsBlocking()
// Try to get PubSub if available
if handlerCtx.GetPubSub() != nil {
pubSub = handlerCtx.GetPubSub()
} }
// Create public context // Check if handler already exists and is protected
publicCtx := NewPushNotificationHandlerContext(client, connPool, pubSub, conn, isBlocking) if existingProtected, exists := r.protected[pushNotificationName]; exists && existingProtected {
return fmt.Errorf("cannot overwrite protected handler for push notification: %s", pushNotificationName)
// Call the public handler
return a.handler.HandlePushNotification(ctx, publicCtx, notification)
}
// contextAdapter converts internal HandlerContext to public PushNotificationHandlerContext
// voidProcessorAdapter adapts a VoidProcessor to the public interface
type voidProcessorAdapter struct {
processor *pushnotif.VoidProcessor
}
// NewVoidProcessorAdapter creates a new void processor adapter
func NewVoidProcessorAdapter() PushNotificationProcessorInterface {
return &voidProcessorAdapter{
processor: pushnotif.NewVoidProcessor(),
} }
r.handlers[pushNotificationName] = handler
r.protected[pushNotificationName] = protected
return nil
}
// GetHandler returns the handler for a specific push notification name
func (r *Registry) GetHandler(pushNotificationName string) PushNotificationHandler {
return r.handlers[pushNotificationName]
}
// UnregisterHandler removes a handler for a specific push notification name
func (r *Registry) UnregisterHandler(pushNotificationName string) error {
// Check if handler is protected
if protected, exists := r.protected[pushNotificationName]; exists && protected {
return fmt.Errorf("cannot unregister protected handler for push notification: %s", pushNotificationName)
}
delete(r.handlers, pushNotificationName)
delete(r.protected, pushNotificationName)
return nil
}
// GetRegisteredPushNotificationNames returns all registered push notification names
func (r *Registry) GetRegisteredPushNotificationNames() []string {
names := make([]string, 0, len(r.handlers))
for name := range r.handlers {
names = append(names, name)
}
return names
}
// Processor handles push notifications with a registry of handlers
type Processor struct {
registry *Registry
}
// NewProcessor creates a new push notification processor
func NewProcessor() *Processor {
return &Processor{
registry: NewRegistry(),
}
}
// GetHandler returns the handler for a specific push notification name
func (p *Processor) GetHandler(pushNotificationName string) PushNotificationHandler {
return p.registry.GetHandler(pushNotificationName)
}
// RegisterHandler registers a handler for a specific push notification name
func (p *Processor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
return p.registry.RegisterHandler(pushNotificationName, handler, protected)
}
// UnregisterHandler removes a handler for a specific push notification name
func (p *Processor) UnregisterHandler(pushNotificationName string) error {
return p.registry.UnregisterHandler(pushNotificationName)
}
// ProcessPendingNotifications checks for and processes any pending push notifications
func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error {
if rd == nil {
return nil
}
for {
// Check if there's data available to read
replyType, err := rd.PeekReplyType()
if err != nil {
// No more data available or error reading
break
}
// Only process push notifications (arrays starting with >)
if replyType != proto.RespPush {
break
}
// Read the push notification
reply, err := rd.ReadReply()
if err != nil {
internal.Logger.Printf(ctx, "push: error reading push notification: %v", err)
break
}
// Convert to slice of interfaces
notification, ok := reply.([]interface{})
if !ok {
continue
}
// Handle the notification directly
if len(notification) > 0 {
// Extract the notification type (first element)
if notificationType, ok := notification[0].(string); ok {
// Skip notifications that should be handled by other systems
if shouldSkipNotification(notificationType) {
continue
}
// Get the handler for this notification type
if handler := p.registry.GetHandler(notificationType); handler != nil {
// Handle the notification
handler.HandlePushNotification(ctx, handlerCtx, notification)
}
}
}
}
return nil
}
// shouldSkipNotification checks if a notification type should be ignored by the push notification
// processor and handled by other specialized systems instead (pub/sub, streams, keyspace, etc.).
func shouldSkipNotification(notificationType string) bool {
switch notificationType {
// Pub/Sub notifications - handled by pub/sub system
case "message", // Regular pub/sub message
"pmessage", // Pattern pub/sub message
"subscribe", // Subscription confirmation
"unsubscribe", // Unsubscription confirmation
"psubscribe", // Pattern subscription confirmation
"punsubscribe", // Pattern unsubscription confirmation
"smessage", // Sharded pub/sub message (Redis 7.0+)
"ssubscribe", // Sharded subscription confirmation
"sunsubscribe": // Sharded unsubscription confirmation
return true
default:
return false
}
}
// VoidProcessor discards all push notifications without processing them
type VoidProcessor struct{}
// NewVoidProcessor creates a new void push notification processor
func NewVoidProcessor() *VoidProcessor {
return &VoidProcessor{}
} }
// GetHandler returns nil for void processor since it doesn't maintain handlers // GetHandler returns nil for void processor since it doesn't maintain handlers
func (v *voidProcessorAdapter) GetHandler(pushNotificationName string) PushNotificationHandler { func (v *VoidProcessor) GetHandler(pushNotificationName string) PushNotificationHandler {
return nil return nil
} }
// RegisterHandler returns an error for void processor since it doesn't maintain handlers // RegisterHandler returns an error for void processor since it doesn't maintain handlers
func (v *voidProcessorAdapter) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { func (v *VoidProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
// Void processor doesn't support handlers return fmt.Errorf("cannot register push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName)
return v.processor.RegisterHandler(pushNotificationName, nil, protected)
} }
// ProcessPendingNotifications reads and discards any pending push notifications // UnregisterHandler returns an error for void processor since it doesn't maintain handlers
func (v *voidProcessorAdapter) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error {
// Convert public context to internal context return fmt.Errorf("cannot unregister push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName)
internalCtx := pushnotif.NewHandlerContext(
handlerCtx.GetClient(),
handlerCtx.GetConnPool(),
handlerCtx.GetPubSub(),
handlerCtx.GetConn(),
handlerCtx.IsBlocking(),
)
return v.processor.ProcessPendingNotifications(ctx, internalCtx, rd)
} }
// ProcessPendingNotifications for VoidProcessor does nothing since push notifications
// are only available in RESP3 and this processor is used for RESP2 connections.
// This avoids unnecessary buffer scanning overhead.
func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error {
// VoidProcessor is used for RESP2 connections where push notifications are not available.
// Since push notifications only exist in RESP3, we can safely skip all processing
// to avoid unnecessary buffer scanning overhead.
return nil
}
// PushNotificationProcessorInterface defines the interface for push notification processors. // PushNotificationProcessorInterface defines the interface for push notification processors.
type PushNotificationProcessorInterface interface { type PushNotificationProcessorInterface interface {
GetHandler(pushNotificationName string) PushNotificationHandler GetHandler(pushNotificationName string) PushNotificationHandler
@ -215,21 +356,19 @@ type PushNotificationProcessorInterface interface {
// PushNotificationRegistry manages push notification handlers. // PushNotificationRegistry manages push notification handlers.
type PushNotificationRegistry struct { type PushNotificationRegistry struct {
registry *pushnotif.Registry registry *Registry
} }
// NewPushNotificationRegistry creates a new push notification registry. // NewPushNotificationRegistry creates a new push notification registry.
func NewPushNotificationRegistry() *PushNotificationRegistry { func NewPushNotificationRegistry() *PushNotificationRegistry {
return &PushNotificationRegistry{ return &PushNotificationRegistry{
registry: pushnotif.NewRegistry(), registry: NewRegistry(),
} }
} }
// RegisterHandler registers a handler for a specific push notification name. // RegisterHandler registers a handler for a specific push notification name.
func (r *PushNotificationRegistry) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { func (r *PushNotificationRegistry) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
// Wrap the public handler in an adapter for the internal interface return r.registry.RegisterHandler(pushNotificationName, handler, protected)
adapter := &handlerAdapter{handler: handler}
return r.registry.RegisterHandler(pushNotificationName, adapter, protected)
} }
// UnregisterHandler removes a handler for a specific push notification name. // UnregisterHandler removes a handler for a specific push notification name.
@ -239,18 +378,7 @@ func (r *PushNotificationRegistry) UnregisterHandler(pushNotificationName string
// GetHandler returns the handler for a specific push notification name. // GetHandler returns the handler for a specific push notification name.
func (r *PushNotificationRegistry) GetHandler(pushNotificationName string) PushNotificationHandler { func (r *PushNotificationRegistry) GetHandler(pushNotificationName string) PushNotificationHandler {
internalHandler := r.registry.GetHandler(pushNotificationName) return r.registry.GetHandler(pushNotificationName)
if internalHandler == nil {
return nil
}
// If it's our adapter, return the original handler
if adapter, ok := internalHandler.(*handlerAdapter); ok {
return adapter.handler
}
// This shouldn't happen in normal usage, but handle it gracefully
return nil
} }
// GetRegisteredPushNotificationNames returns a list of all registered push notification names. // GetRegisteredPushNotificationNames returns a list of all registered push notification names.
@ -260,37 +388,24 @@ func (r *PushNotificationRegistry) GetRegisteredPushNotificationNames() []string
// PushNotificationProcessor handles push notifications with a registry of handlers. // PushNotificationProcessor handles push notifications with a registry of handlers.
type PushNotificationProcessor struct { type PushNotificationProcessor struct {
processor *pushnotif.Processor processor *Processor
} }
// NewPushNotificationProcessor creates a new push notification processor. // NewPushNotificationProcessor creates a new push notification processor.
func NewPushNotificationProcessor() *PushNotificationProcessor { func NewPushNotificationProcessor() *PushNotificationProcessor {
return &PushNotificationProcessor{ return &PushNotificationProcessor{
processor: pushnotif.NewProcessor(), processor: NewProcessor(),
} }
} }
// GetHandler returns the handler for a specific push notification name. // GetHandler returns the handler for a specific push notification name.
func (p *PushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler { func (p *PushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler {
internalHandler := p.processor.GetHandler(pushNotificationName) return p.processor.GetHandler(pushNotificationName)
if internalHandler == nil {
return nil
}
// If it's our adapter, return the original handler
if adapter, ok := internalHandler.(*handlerAdapter); ok {
return adapter.handler
}
// This shouldn't happen in normal usage, but handle it gracefully
return nil
} }
// RegisterHandler registers a handler for a specific push notification name. // RegisterHandler registers a handler for a specific push notification name.
func (p *PushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { func (p *PushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
// Wrap the public handler in an adapter for the internal interface return p.processor.RegisterHandler(pushNotificationName, handler, protected)
adapter := &handlerAdapter{handler: handler}
return p.processor.RegisterHandler(pushNotificationName, adapter, protected)
} }
// UnregisterHandler removes a handler for a specific push notification name. // UnregisterHandler removes a handler for a specific push notification name.
@ -301,61 +416,36 @@ func (p *PushNotificationProcessor) UnregisterHandler(pushNotificationName strin
// ProcessPendingNotifications checks for and processes any pending push notifications. // ProcessPendingNotifications checks for and processes any pending push notifications.
// The handlerCtx provides context about the client, connection pool, and connection. // The handlerCtx provides context about the client, connection pool, and connection.
func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error {
// Convert public context to internal context return p.processor.ProcessPendingNotifications(ctx, handlerCtx, rd)
internalCtx := pushnotif.NewHandlerContext(
handlerCtx.GetClient(),
handlerCtx.GetConnPool(),
handlerCtx.GetPubSub(),
handlerCtx.GetConn(),
handlerCtx.IsBlocking(),
)
return p.processor.ProcessPendingNotifications(ctx, internalCtx, rd)
} }
// VoidPushNotificationProcessor discards all push notifications without processing them. // VoidPushNotificationProcessor discards all push notifications without processing them.
type VoidPushNotificationProcessor struct { type VoidPushNotificationProcessor struct {
processor *pushnotif.VoidProcessor processor *VoidProcessor
} }
// NewVoidPushNotificationProcessor creates a new void push notification processor. // NewVoidPushNotificationProcessor creates a new void push notification processor.
func NewVoidPushNotificationProcessor() *VoidPushNotificationProcessor { func NewVoidPushNotificationProcessor() *VoidPushNotificationProcessor {
return &VoidPushNotificationProcessor{ return &VoidPushNotificationProcessor{
processor: pushnotif.NewVoidProcessor(), processor: NewVoidProcessor(),
} }
} }
// GetHandler returns nil for void processor since it doesn't maintain handlers. // GetHandler returns nil for void processor since it doesn't maintain handlers.
func (v *VoidPushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler { func (v *VoidPushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler {
return nil return v.processor.GetHandler(pushNotificationName)
} }
// RegisterHandler returns an error for void processor since it doesn't maintain handlers. // RegisterHandler returns an error for void processor since it doesn't maintain handlers.
func (v *VoidPushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { func (v *VoidPushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
return v.processor.RegisterHandler(pushNotificationName, nil, protected) return v.processor.RegisterHandler(pushNotificationName, handler, protected)
} }
// ProcessPendingNotifications reads and discards any pending push notifications. // ProcessPendingNotifications reads and discards any pending push notifications.
func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error {
// Convert public context to internal context return v.processor.ProcessPendingNotifications(ctx, handlerCtx, rd)
internalCtx := pushnotif.NewHandlerContext(
handlerCtx.GetClient(),
handlerCtx.GetConnPool(),
handlerCtx.GetPubSub(),
handlerCtx.GetConn(),
handlerCtx.IsBlocking(),
)
return v.processor.ProcessPendingNotifications(ctx, internalCtx, rd)
} }
// Redis Cluster push notification names
const (
PushNotificationMoving = "MOVING"
PushNotificationMigrating = "MIGRATING"
PushNotificationMigrated = "MIGRATED"
PushNotificationFailingOver = "FAILING_OVER"
PushNotificationFailedOver = "FAILED_OVER"
)
// PushNotificationInfo contains metadata about a push notification. // PushNotificationInfo contains metadata about a push notification.
type PushNotificationInfo struct { type PushNotificationInfo struct {
Name string Name string

242
push_notifications_test.go Normal file
View File

@ -0,0 +1,242 @@
package redis
import (
"context"
"testing"
"github.com/redis/go-redis/v9/internal/pool"
)
// TestHandler implements PushNotificationHandler interface for testing
type TestHandler struct {
name string
handled [][]interface{}
returnValue bool
}
func NewTestHandler(name string, returnValue bool) *TestHandler {
return &TestHandler{
name: name,
handled: make([][]interface{}, 0),
returnValue: returnValue,
}
}
func (h *TestHandler) HandlePushNotification(ctx context.Context, handlerCtx PushNotificationHandlerContext, notification []interface{}) bool {
h.handled = append(h.handled, notification)
return h.returnValue
}
func (h *TestHandler) GetHandledNotifications() [][]interface{} {
return h.handled
}
func (h *TestHandler) Reset() {
h.handled = make([][]interface{}, 0)
}
func TestPushNotificationRegistry(t *testing.T) {
t.Run("NewRegistry", func(t *testing.T) {
registry := NewRegistry()
if registry == nil {
t.Error("NewRegistry should not return nil")
}
if len(registry.GetRegisteredPushNotificationNames()) != 0 {
t.Error("New registry should have no registered handlers")
}
})
t.Run("RegisterHandler", func(t *testing.T) {
registry := NewRegistry()
handler := NewTestHandler("test", true)
err := registry.RegisterHandler("TEST", handler, false)
if err != nil {
t.Errorf("RegisterHandler should not error: %v", err)
}
retrievedHandler := registry.GetHandler("TEST")
if retrievedHandler != handler {
t.Error("GetHandler should return the registered handler")
}
})
t.Run("UnregisterHandler", func(t *testing.T) {
registry := NewRegistry()
handler := NewTestHandler("test", true)
registry.RegisterHandler("TEST", handler, false)
err := registry.UnregisterHandler("TEST")
if err != nil {
t.Errorf("UnregisterHandler should not error: %v", err)
}
retrievedHandler := registry.GetHandler("TEST")
if retrievedHandler != nil {
t.Error("GetHandler should return nil after unregistering")
}
})
t.Run("ProtectedHandler", func(t *testing.T) {
registry := NewRegistry()
handler := NewTestHandler("test", true)
// Register protected handler
err := registry.RegisterHandler("TEST", handler, true)
if err != nil {
t.Errorf("RegisterHandler should not error: %v", err)
}
// Try to unregister protected handler
err = registry.UnregisterHandler("TEST")
if err == nil {
t.Error("UnregisterHandler should error for protected handler")
}
// Handler should still be there
retrievedHandler := registry.GetHandler("TEST")
if retrievedHandler != handler {
t.Error("Protected handler should still be registered")
}
})
}
func TestPushNotificationProcessor(t *testing.T) {
t.Run("NewProcessor", func(t *testing.T) {
processor := NewProcessor()
if processor == nil {
t.Error("NewProcessor should not return nil")
}
})
t.Run("RegisterAndGetHandler", func(t *testing.T) {
processor := NewProcessor()
handler := NewTestHandler("test", true)
err := processor.RegisterHandler("TEST", handler, false)
if err != nil {
t.Errorf("RegisterHandler should not error: %v", err)
}
retrievedHandler := processor.GetHandler("TEST")
if retrievedHandler != handler {
t.Error("GetHandler should return the registered handler")
}
})
}
func TestVoidProcessor(t *testing.T) {
t.Run("NewVoidProcessor", func(t *testing.T) {
processor := NewVoidProcessor()
if processor == nil {
t.Error("NewVoidProcessor should not return nil")
}
})
t.Run("GetHandler", func(t *testing.T) {
processor := NewVoidProcessor()
handler := processor.GetHandler("TEST")
if handler != nil {
t.Error("VoidProcessor GetHandler should always return nil")
}
})
t.Run("RegisterHandler", func(t *testing.T) {
processor := NewVoidProcessor()
handler := NewTestHandler("test", true)
err := processor.RegisterHandler("TEST", handler, false)
if err == nil {
t.Error("VoidProcessor RegisterHandler should return error")
}
})
t.Run("ProcessPendingNotifications", func(t *testing.T) {
processor := NewVoidProcessor()
ctx := context.Background()
handlerCtx := NewPushNotificationHandlerContext(nil, nil, nil, nil, false)
// VoidProcessor should always succeed and do nothing
err := processor.ProcessPendingNotifications(ctx, handlerCtx, nil)
if err != nil {
t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err)
}
})
}
func TestPushNotificationHandlerContext(t *testing.T) {
t.Run("NewHandlerContext", func(t *testing.T) {
client := &Client{}
connPool := &pool.ConnPool{}
pubSub := &PubSub{}
conn := &pool.Conn{}
ctx := NewPushNotificationHandlerContext(client, connPool, pubSub, conn, true)
if ctx == nil {
t.Error("NewPushNotificationHandlerContext should not return nil")
}
if ctx.GetClient() != client {
t.Error("GetClient should return the provided client")
}
if ctx.GetConnPool() != connPool {
t.Error("GetConnPool should return the provided connection pool")
}
if ctx.GetPubSub() != pubSub {
t.Error("GetPubSub should return the provided PubSub")
}
if ctx.GetConn() != conn {
t.Error("GetConn should return the provided connection")
}
if !ctx.IsBlocking() {
t.Error("IsBlocking should return true")
}
})
t.Run("TypedGetters", func(t *testing.T) {
client := &Client{}
ctx := NewPushNotificationHandlerContext(client, nil, nil, nil, false)
// Test regular client getter
regularClient := ctx.GetRegularClient()
if regularClient != client {
t.Error("GetRegularClient should return the client when it's a regular client")
}
// Test cluster client getter (should be nil for regular client)
clusterClient := ctx.GetClusterClient()
if clusterClient != nil {
t.Error("GetClusterClient should return nil when client is not a cluster client")
}
})
}
func TestPushNotificationConstants(t *testing.T) {
t.Run("Constants", func(t *testing.T) {
if PushNotificationMoving != "MOVING" {
t.Error("PushNotificationMoving should be 'MOVING'")
}
if PushNotificationMigrating != "MIGRATING" {
t.Error("PushNotificationMigrating should be 'MIGRATING'")
}
if PushNotificationMigrated != "MIGRATED" {
t.Error("PushNotificationMigrated should be 'MIGRATED'")
}
if PushNotificationFailingOver != "FAILING_OVER" {
t.Error("PushNotificationFailingOver should be 'FAILING_OVER'")
}
if PushNotificationFailedOver != "FAILED_OVER" {
t.Error("PushNotificationFailedOver should be 'FAILED_OVER'")
}
})
}

32
pushnotif/types.go Normal file
View File

@ -0,0 +1,32 @@
package pushnotif
import (
"context"
"github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/internal/pushprocessor"
)
// PushProcessorInterface defines the interface for push notification processors.
type PushProcessorInterface interface {
GetHandler(pushNotificationName string) PushNotificationHandler
ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error
RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error
}
// RegistryInterface defines the interface for push notification registries.
type RegistryInterface interface {
RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error
UnregisterHandler(pushNotificationName string) error
GetHandler(pushNotificationName string) PushNotificationHandler
GetRegisteredPushNotificationNames() []string
}
// NewProcessor creates a new push notification processor.
func NewProcessor() PushProcessorInterface {
return pushprocessor.NewProcessor()
}
// NewVoidProcessor creates a new void push notification processor.
func NewVoidProcessor() PushProcessorInterface {
return pushprocessor.NewVoidProcessor()
}

View File

@ -14,7 +14,6 @@ import (
"github.com/redis/go-redis/v9/internal/hscan" "github.com/redis/go-redis/v9/internal/hscan"
"github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/internal/pushnotif"
) )
// Scanner internal/hscan.Scanner exposed interface. // Scanner internal/hscan.Scanner exposed interface.
@ -1122,9 +1121,7 @@ func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn
return cn.WithReader(ctx, 0, func(rd *proto.Reader) error { return cn.WithReader(ctx, 0, func(rd *proto.Reader) error {
// Create handler context with client, connection pool, and connection information // Create handler context with client, connection pool, and connection information
handlerCtx := c.pushNotificationHandlerContext(cn) handlerCtx := c.pushNotificationHandlerContext(cn)
// Convert internal context to public context for the processor return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd)
publicCtx := convertInternalToPublicContext(handlerCtx)
return c.pushProcessor.ProcessPendingNotifications(ctx, publicCtx, rd)
}) })
} }
@ -1137,14 +1134,12 @@ func (c *baseClient) processPendingPushNotificationWithReader(ctx context.Contex
// Create handler context with client, connection pool, and connection information // Create handler context with client, connection pool, and connection information
handlerCtx := c.pushNotificationHandlerContext(cn) handlerCtx := c.pushNotificationHandlerContext(cn)
// Convert internal context to public context for the processor return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd)
publicCtx := convertInternalToPublicContext(handlerCtx)
return c.pushProcessor.ProcessPendingNotifications(ctx, publicCtx, rd)
} }
// pushNotificationHandlerContext creates a handler context for push notification processing // pushNotificationHandlerContext creates a handler context for push notification processing
func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) pushnotif.HandlerContext { func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) PushNotificationHandlerContext {
return pushnotif.NewHandlerContext(c, c.connPool, nil, cn, false) return NewPushNotificationHandlerContext(c, c.connPool, nil, cn, false)
} }

View File

@ -500,7 +500,7 @@ func NewSentinelClient(opt *Options) *SentinelClient {
// Initialize push notification processor using shared helper // Initialize push notification processor using shared helper
// Use void processor for Sentinel clients // Use void processor for Sentinel clients
c.pushProcessor = NewVoidProcessorAdapter() c.pushProcessor = NewVoidPushNotificationProcessor()
c.initHooks(hooks{ c.initHooks(hooks{
dial: c.baseClient.dial, dial: c.baseClient.dial,