1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-28 06:42:00 +03:00

refactor(push): completly change the package structure

This commit is contained in:
Nedyalko Dyakov
2025-07-05 02:52:40 +03:00
parent b4d0ff15fb
commit 84123b1331
13 changed files with 1987 additions and 394 deletions

View File

@ -15,6 +15,7 @@ import (
"github.com/redis/go-redis/v9/auth" "github.com/redis/go-redis/v9/auth"
"github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/push"
) )
// Limiter is the interface of a rate limiter or a circuit breaker. // Limiter is the interface of a rate limiter or a circuit breaker.
@ -222,7 +223,7 @@ type Options struct {
// PushNotificationProcessor is the processor for handling push notifications. // PushNotificationProcessor is the processor for handling push notifications.
// If nil, a default processor will be created for RESP3 connections. // If nil, a default processor will be created for RESP3 connections.
PushNotificationProcessor PushNotificationProcessorInterface PushNotificationProcessor push.NotificationProcessor
} }
func (opt *Options) init() { func (opt *Options) init() {

View File

@ -10,6 +10,7 @@ import (
"github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/push"
) )
// PubSub implements Pub/Sub commands as described in // PubSub implements Pub/Sub commands as described in
@ -40,7 +41,7 @@ type PubSub struct {
allCh *channel allCh *channel
// Push notification processor for handling generic push notifications // Push notification processor for handling generic push notifications
pushProcessor PushNotificationProcessorInterface pushProcessor push.NotificationProcessor
} }
func (c *PubSub) init() { func (c *PubSub) init() {
@ -551,14 +552,13 @@ func (c *PubSub) processPendingPushNotificationWithReader(ctx context.Context, c
return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd) return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd)
} }
func (c *PubSub) pushNotificationHandlerContext(cn *pool.Conn) PushNotificationHandlerContext { func (c *PubSub) pushNotificationHandlerContext(cn *pool.Conn) push.NotificationHandlerContext {
// PubSub doesn't have a client or connection pool, so we pass nil for those // PubSub doesn't have a client or connection pool, so we pass nil for those
// PubSub connections are blocking // PubSub connections are blocking
return NewPushNotificationHandlerContext(nil, nil, c, cn, true) return push.HandlerContext{}
return push.NewNotificationHandlerContext(nil, nil, c, cn, true)
} }
type ChannelOption func(c *channel) type ChannelOption func(c *channel)
// WithChannelSize specifies the Go chan size that is used to buffer incoming messages. // WithChannelSize specifies the Go chan size that is used to buffer incoming messages.

150
push/errors.go Normal file
View File

@ -0,0 +1,150 @@
package push
import (
"errors"
"fmt"
"strings"
)
// Push notification error definitions
// This file contains all error types and messages used by the push notification system
// Common error variables for reuse
var (
// ErrHandlerNil is returned when attempting to register a nil handler
ErrHandlerNil = errors.New("handler cannot be nil")
)
// Registry errors
// ErrHandlerExists creates an error for when attempting to overwrite an existing handler
func ErrHandlerExists(pushNotificationName string) error {
return fmt.Errorf("cannot overwrite existing handler for push notification: %s", pushNotificationName)
}
// ErrProtectedHandler creates an error for when attempting to unregister a protected handler
func ErrProtectedHandler(pushNotificationName string) error {
return fmt.Errorf("cannot unregister protected handler for push notification: %s", pushNotificationName)
}
// VoidProcessor errors
// ErrVoidProcessorRegister creates an error for when attempting to register a handler on void processor
func ErrVoidProcessorRegister(pushNotificationName string) error {
return fmt.Errorf("cannot register push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName)
}
// ErrVoidProcessorUnregister creates an error for when attempting to unregister a handler on void processor
func ErrVoidProcessorUnregister(pushNotificationName string) error {
return fmt.Errorf("cannot unregister push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName)
}
// Error message constants for consistency
const (
// Error message templates
MsgHandlerNil = "handler cannot be nil"
MsgHandlerExists = "cannot overwrite existing handler for push notification: %s"
MsgProtectedHandler = "cannot unregister protected handler for push notification: %s"
MsgVoidProcessorRegister = "cannot register push notification handler '%s': push notifications are disabled (using void processor)"
MsgVoidProcessorUnregister = "cannot unregister push notification handler '%s': push notifications are disabled (using void processor)"
)
// Error type definitions for advanced error handling
// HandlerError represents errors related to handler operations
type HandlerError struct {
Operation string // "register", "unregister", "get"
PushNotificationName string
Reason string
Err error
}
func (e *HandlerError) Error() string {
if e.Err != nil {
return fmt.Sprintf("handler %s failed for '%s': %s (%v)", e.Operation, e.PushNotificationName, e.Reason, e.Err)
}
return fmt.Sprintf("handler %s failed for '%s': %s", e.Operation, e.PushNotificationName, e.Reason)
}
func (e *HandlerError) Unwrap() error {
return e.Err
}
// NewHandlerError creates a new HandlerError
func NewHandlerError(operation, pushNotificationName, reason string, err error) *HandlerError {
return &HandlerError{
Operation: operation,
PushNotificationName: pushNotificationName,
Reason: reason,
Err: err,
}
}
// ProcessorError represents errors related to processor operations
type ProcessorError struct {
ProcessorType string // "processor", "void_processor"
Operation string // "process", "register", "unregister"
Reason string
Err error
}
func (e *ProcessorError) Error() string {
if e.Err != nil {
return fmt.Sprintf("%s %s failed: %s (%v)", e.ProcessorType, e.Operation, e.Reason, e.Err)
}
return fmt.Sprintf("%s %s failed: %s", e.ProcessorType, e.Operation, e.Reason)
}
func (e *ProcessorError) Unwrap() error {
return e.Err
}
// NewProcessorError creates a new ProcessorError
func NewProcessorError(processorType, operation, reason string, err error) *ProcessorError {
return &ProcessorError{
ProcessorType: processorType,
Operation: operation,
Reason: reason,
Err: err,
}
}
// Helper functions for common error scenarios
// IsHandlerNilError checks if an error is due to a nil handler
func IsHandlerNilError(err error) bool {
return errors.Is(err, ErrHandlerNil)
}
// IsHandlerExistsError checks if an error is due to attempting to overwrite an existing handler
func IsHandlerExistsError(err error) bool {
if err == nil {
return false
}
return fmt.Sprintf("%v", err) == fmt.Sprintf(MsgHandlerExists, extractNotificationName(err))
}
// IsProtectedHandlerError checks if an error is due to attempting to unregister a protected handler
func IsProtectedHandlerError(err error) bool {
if err == nil {
return false
}
return fmt.Sprintf("%v", err) == fmt.Sprintf(MsgProtectedHandler, extractNotificationName(err))
}
// IsVoidProcessorError checks if an error is due to void processor operations
func IsVoidProcessorError(err error) bool {
if err == nil {
return false
}
errStr := err.Error()
return strings.Contains(errStr, "push notifications are disabled (using void processor)")
}
// extractNotificationName attempts to extract the notification name from error messages
// This is a helper function for error type checking
func extractNotificationName(err error) string {
// This is a simplified implementation - in practice, you might want more sophisticated parsing
// For now, we return a placeholder since the exact extraction logic depends on the error format
return "unknown"
}

14
push/handler.go Normal file
View File

@ -0,0 +1,14 @@
package push
import (
"context"
)
// NotificationHandler defines the interface for push notification handlers.
type NotificationHandler interface {
// HandlePushNotification processes a push notification with context information.
// The handlerCtx provides information about the client, connection pool, and connection
// on which the notification was received, allowing handlers to make informed decisions.
// Returns an error if the notification could not be handled.
HandlePushNotification(ctx context.Context, handlerCtx NotificationHandlerContext, notification []interface{}) error
}

89
push/handler_context.go Normal file
View File

@ -0,0 +1,89 @@
package push
import (
"github.com/redis/go-redis/v9/internal/pool"
)
// NotificationHandlerContext provides context information about where a push notification was received.
// This interface allows handlers to make informed decisions based on the source of the notification
// with strongly typed access to different client types using concrete types.
type NotificationHandlerContext interface {
// GetClient returns the Redis client instance that received the notification.
// Returns nil if no client context is available.
// It is interface to both allow for future expansion and to avoid
// circular dependencies. The developer is responsible for type assertion.
// It can be one of the following types:
// - *redis.Client
// - *redis.ClusterClient
// - *redis.Conn
GetClient() interface{}
// GetConnPool returns the connection pool from which the connection was obtained.
// Returns nil if no connection pool context is available.
// It is interface to both allow for future expansion and to avoid
// circular dependencies. The developer is responsible for type assertion.
// It can be one of the following types:
// - *pool.ConnPool
// - *pool.SingleConnPool
// - *pool.StickyConnPool
GetConnPool() interface{}
// GetPubSub returns the PubSub instance that received the notification.
// Returns nil if this is not a PubSub connection.
// It is interface to both allow for future expansion and to avoid
// circular dependencies. The developer is responsible for type assertion.
// It can be one of the following types:
// - *redis.PubSub
GetPubSub() interface{}
// GetConn returns the specific connection on which the notification was received.
// Returns nil if no connection context is available.
GetConn() *pool.Conn
// IsBlocking returns true if the notification was received on a blocking connection.
IsBlocking() bool
}
// pushNotificationHandlerContext is the concrete implementation of PushNotificationHandlerContext interface
type pushNotificationHandlerContext struct {
client interface{}
connPool interface{}
pubSub interface{}
conn *pool.Conn
isBlocking bool
}
// NewNotificationHandlerContext creates a new push.NotificationHandlerContext instance
func NewNotificationHandlerContext(client, connPool, pubSub interface{}, conn *pool.Conn, isBlocking bool) NotificationHandlerContext {
return &pushNotificationHandlerContext{
client: client,
connPool: connPool,
pubSub: pubSub,
conn: conn,
isBlocking: isBlocking,
}
}
// GetClient returns the Redis client instance that received the notification
func (h *pushNotificationHandlerContext) GetClient() interface{} {
return h.client
}
// GetConnPool returns the connection pool from which the connection was obtained
func (h *pushNotificationHandlerContext) GetConnPool() interface{} {
return h.connPool
}
func (h *pushNotificationHandlerContext) GetPubSub() interface{} {
return h.pubSub
}
// GetConn returns the specific connection on which the notification was received
func (h *pushNotificationHandlerContext) GetConn() *pool.Conn {
return h.conn
}
// IsBlocking returns true if the notification was received on a blocking connection
func (h *pushNotificationHandlerContext) IsBlocking() bool {
return h.isBlocking
}

View File

@ -1,67 +1,22 @@
package redis package push
import ( import (
"context" "context"
"fmt"
"github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/proto"
) )
// Registry manages push notification handlers // NotificationProcessor defines the interface for push notification processors.
type Registry struct { type NotificationProcessor interface {
handlers map[string]PushNotificationHandler // GetHandler returns the handler for a specific push notification name.
protected map[string]bool GetHandler(pushNotificationName string) NotificationHandler
} // ProcessPendingNotifications checks for and processes any pending push notifications.
ProcessPendingNotifications(ctx context.Context, handlerCtx NotificationHandlerContext, rd *proto.Reader) error
// NewRegistry creates a new push notification registry // RegisterHandler registers a handler for a specific push notification name.
func NewRegistry() *Registry { RegisterHandler(pushNotificationName string, handler NotificationHandler, protected bool) error
return &Registry{ // UnregisterHandler removes a handler for a specific push notification name.
handlers: make(map[string]PushNotificationHandler), UnregisterHandler(pushNotificationName string) error
protected: make(map[string]bool),
}
}
// RegisterHandler registers a handler for a specific push notification name
func (r *Registry) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
if handler == nil {
return fmt.Errorf("handler cannot be nil")
}
// Check if handler already exists and is protected
if existingProtected, exists := r.protected[pushNotificationName]; exists && existingProtected {
return fmt.Errorf("cannot overwrite protected handler for push notification: %s", pushNotificationName)
}
r.handlers[pushNotificationName] = handler
r.protected[pushNotificationName] = protected
return nil
}
// GetHandler returns the handler for a specific push notification name
func (r *Registry) GetHandler(pushNotificationName string) PushNotificationHandler {
return r.handlers[pushNotificationName]
}
// UnregisterHandler removes a handler for a specific push notification name
func (r *Registry) UnregisterHandler(pushNotificationName string) error {
// Check if handler is protected
if protected, exists := r.protected[pushNotificationName]; exists && protected {
return fmt.Errorf("cannot unregister protected handler for push notification: %s", pushNotificationName)
}
delete(r.handlers, pushNotificationName)
delete(r.protected, pushNotificationName)
return nil
}
// GetRegisteredPushNotificationNames returns all registered push notification names
func (r *Registry) GetRegisteredPushNotificationNames() []string {
names := make([]string, 0, len(r.handlers))
for name := range r.handlers {
names = append(names, name)
}
return names
} }
// Processor handles push notifications with a registry of handlers // Processor handles push notifications with a registry of handlers
@ -77,12 +32,12 @@ func NewProcessor() *Processor {
} }
// GetHandler returns the handler for a specific push notification name // GetHandler returns the handler for a specific push notification name
func (p *Processor) GetHandler(pushNotificationName string) PushNotificationHandler { func (p *Processor) GetHandler(pushNotificationName string) NotificationHandler {
return p.registry.GetHandler(pushNotificationName) return p.registry.GetHandler(pushNotificationName)
} }
// RegisterHandler registers a handler for a specific push notification name // RegisterHandler registers a handler for a specific push notification name
func (p *Processor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { func (p *Processor) RegisterHandler(pushNotificationName string, handler NotificationHandler, protected bool) error {
return p.registry.RegisterHandler(pushNotificationName, handler, protected) return p.registry.RegisterHandler(pushNotificationName, handler, protected)
} }
@ -92,7 +47,7 @@ func (p *Processor) UnregisterHandler(pushNotificationName string) error {
} }
// ProcessPendingNotifications checks for and processes any pending push notifications // ProcessPendingNotifications checks for and processes any pending push notifications
func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error { func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx NotificationHandlerContext, rd *proto.Reader) error {
if rd == nil { if rd == nil {
return nil return nil
} }
@ -135,7 +90,10 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx
// Get the handler for this notification type // Get the handler for this notification type
if handler := p.registry.GetHandler(notificationType); handler != nil { if handler := p.registry.GetHandler(notificationType); handler != nil {
// Handle the notification // Handle the notification
handler.HandlePushNotification(ctx, handlerCtx, notification) err := handler.HandlePushNotification(ctx, handlerCtx, notification)
if err != nil {
internal.Logger.Printf(ctx, "push: error handling push notification: %v", err)
}
} }
} }
} }
@ -144,6 +102,69 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx
return nil return nil
} }
// VoidProcessor discards all push notifications without processing them
type VoidProcessor struct{}
// NewVoidProcessor creates a new void push notification processor
func NewVoidProcessor() *VoidProcessor {
return &VoidProcessor{}
}
// GetHandler returns nil for void processor since it doesn't maintain handlers
func (v *VoidProcessor) GetHandler(_ string) NotificationHandler {
return nil
}
// RegisterHandler returns an error for void processor since it doesn't maintain handlers
func (v *VoidProcessor) RegisterHandler(pushNotificationName string, _ NotificationHandler, _ bool) error {
return ErrVoidProcessorRegister(pushNotificationName)
}
// UnregisterHandler returns an error for void processor since it doesn't maintain handlers
func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error {
return ErrVoidProcessorUnregister(pushNotificationName)
}
// ProcessPendingNotifications for VoidProcessor does nothing since push notifications
// are only available in RESP3 and this processor is used for RESP2 connections.
// This avoids unnecessary buffer scanning overhead.
func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, _ NotificationHandlerContext, rd *proto.Reader) error {
// read and discard all push notifications
if rd != nil {
for {
replyType, err := rd.PeekReplyType()
if err != nil {
// No more data available or error reading
break
}
// Only process push notifications (arrays starting with >)
if replyType != proto.RespPush {
break
}
// see if we should skip this notification
notificationName, err := rd.PeekPushNotificationName()
if err != nil {
break
}
if shouldSkipNotification(notificationName) {
// discard the notification
if err := rd.DiscardNext(); err != nil {
break
}
continue
}
// Read the push notification
_, err = rd.ReadReply()
if err != nil {
return nil
}
}
}
return nil
}
// shouldSkipNotification checks if a notification type should be ignored by the push notification // shouldSkipNotification checks if a notification type should be ignored by the push notification
// processor and handled by other specialized systems instead (pub/sub, streams, keyspace, etc.). // processor and handled by other specialized systems instead (pub/sub, streams, keyspace, etc.).
func shouldSkipNotification(notificationType string) bool { func shouldSkipNotification(notificationType string) bool {
@ -163,36 +184,3 @@ func shouldSkipNotification(notificationType string) bool {
return false return false
} }
} }
// VoidProcessor discards all push notifications without processing them
type VoidProcessor struct{}
// NewVoidProcessor creates a new void push notification processor
func NewVoidProcessor() *VoidProcessor {
return &VoidProcessor{}
}
// GetHandler returns nil for void processor since it doesn't maintain handlers
func (v *VoidProcessor) GetHandler(pushNotificationName string) PushNotificationHandler {
return nil
}
// RegisterHandler returns an error for void processor since it doesn't maintain handlers
func (v *VoidProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
return fmt.Errorf("cannot register push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName)
}
// UnregisterHandler returns an error for void processor since it doesn't maintain handlers
func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error {
return fmt.Errorf("cannot unregister push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName)
}
// ProcessPendingNotifications for VoidProcessor does nothing since push notifications
// are only available in RESP3 and this processor is used for RESP2 connections.
// This avoids unnecessary buffer scanning overhead.
func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error {
// VoidProcessor is used for RESP2 connections where push notifications are not available.
// Since push notifications only exist in RESP3, we can safely skip all processing
// to avoid unnecessary buffer scanning overhead.
return nil
}

7
push/push.go Normal file
View File

@ -0,0 +1,7 @@
// Package push provides push notifications for Redis.
// This is an EXPERIMENTAL API for handling push notifications from Redis.
// It is not yet stable and may change in the future.
// Although this is in a public package, in its current form public use is not advised.
// Pending push notifications should be processed before executing any readReply from the connection
// as per RESP3 specification push notifications can be sent at any time.
package push

1554
push/push_test.go Normal file

File diff suppressed because it is too large Load Diff

61
push/registry.go Normal file
View File

@ -0,0 +1,61 @@
package push
import (
"sync"
)
// Registry manages push notification handlers
type Registry struct {
mu sync.RWMutex
handlers map[string]NotificationHandler
protected map[string]bool
}
// NewRegistry creates a new push notification registry
func NewRegistry() *Registry {
return &Registry{
handlers: make(map[string]NotificationHandler),
protected: make(map[string]bool),
}
}
// RegisterHandler registers a handler for a specific push notification name
func (r *Registry) RegisterHandler(pushNotificationName string, handler NotificationHandler, protected bool) error {
if handler == nil {
return ErrHandlerNil
}
r.mu.Lock()
defer r.mu.Unlock()
// Check if handler already exists
if _, exists := r.protected[pushNotificationName]; exists {
return ErrHandlerExists(pushNotificationName)
}
r.handlers[pushNotificationName] = handler
r.protected[pushNotificationName] = protected
return nil
}
// GetHandler returns the handler for a specific push notification name
func (r *Registry) GetHandler(pushNotificationName string) NotificationHandler {
r.mu.RLock()
defer r.mu.RUnlock()
return r.handlers[pushNotificationName]
}
// UnregisterHandler removes a handler for a specific push notification name
func (r *Registry) UnregisterHandler(pushNotificationName string) error {
r.mu.Lock()
defer r.mu.Unlock()
// Check if handler is protected
if protected, exists := r.protected[pushNotificationName]; exists && protected {
return ErrProtectedHandler(pushNotificationName)
}
delete(r.handlers, pushNotificationName)
delete(r.protected, pushNotificationName)
return nil
}

View File

@ -1,125 +0,0 @@
package redis
import (
"github.com/redis/go-redis/v9/internal/pool"
)
// PushNotificationHandlerContext provides context information about where a push notification was received.
// This interface allows handlers to make informed decisions based on the source of the notification
// with strongly typed access to different client types using concrete types.
type PushNotificationHandlerContext interface {
// GetClient returns the Redis client instance that received the notification.
// Returns nil if no client context is available.
GetClient() interface{}
// GetClusterClient returns the client as a ClusterClient if it is one.
// Returns nil if the client is not a ClusterClient or no client context is available.
GetClusterClient() *ClusterClient
// GetSentinelClient returns the client as a SentinelClient if it is one.
// Returns nil if the client is not a SentinelClient or no client context is available.
GetSentinelClient() *SentinelClient
// GetFailoverClient returns the client as a FailoverClient if it is one.
// Returns nil if the client is not a FailoverClient or no client context is available.
GetFailoverClient() *Client
// GetRegularClient returns the client as a regular Client if it is one.
// Returns nil if the client is not a regular Client or no client context is available.
GetRegularClient() *Client
// GetConnPool returns the connection pool from which the connection was obtained.
// Returns nil if no connection pool context is available.
GetConnPool() interface{}
// GetPubSub returns the PubSub instance that received the notification.
// Returns nil if this is not a PubSub connection.
GetPubSub() *PubSub
// GetConn returns the specific connection on which the notification was received.
// Returns nil if no connection context is available.
GetConn() *pool.Conn
// IsBlocking returns true if the notification was received on a blocking connection.
IsBlocking() bool
}
// pushNotificationHandlerContext is the concrete implementation of PushNotificationHandlerContext interface
type pushNotificationHandlerContext struct {
client interface{}
connPool interface{}
pubSub interface{}
conn *pool.Conn
isBlocking bool
}
// NewPushNotificationHandlerContext creates a new PushNotificationHandlerContext implementation
func NewPushNotificationHandlerContext(client, connPool, pubSub interface{}, conn *pool.Conn, isBlocking bool) PushNotificationHandlerContext {
return &pushNotificationHandlerContext{
client: client,
connPool: connPool,
pubSub: pubSub,
conn: conn,
isBlocking: isBlocking,
}
}
// GetClient returns the Redis client instance that received the notification
func (h *pushNotificationHandlerContext) GetClient() interface{} {
return h.client
}
// GetClusterClient returns the client as a ClusterClient if it is one
func (h *pushNotificationHandlerContext) GetClusterClient() *ClusterClient {
if client, ok := h.client.(*ClusterClient); ok {
return client
}
return nil
}
// GetSentinelClient returns the client as a SentinelClient if it is one
func (h *pushNotificationHandlerContext) GetSentinelClient() *SentinelClient {
if client, ok := h.client.(*SentinelClient); ok {
return client
}
return nil
}
// GetFailoverClient returns the client as a FailoverClient if it is one
func (h *pushNotificationHandlerContext) GetFailoverClient() *Client {
if client, ok := h.client.(*Client); ok {
return client
}
return nil
}
// GetRegularClient returns the client as a regular Client if it is one
func (h *pushNotificationHandlerContext) GetRegularClient() *Client {
if client, ok := h.client.(*Client); ok {
return client
}
return nil
}
// GetConnPool returns the connection pool from which the connection was obtained
func (h *pushNotificationHandlerContext) GetConnPool() interface{} {
return h.connPool
}
// GetPubSub returns the PubSub instance that received the notification
func (h *pushNotificationHandlerContext) GetPubSub() *PubSub {
if pubSub, ok := h.pubSub.(*PubSub); ok {
return pubSub
}
return nil
}
// GetConn returns the specific connection on which the notification was received
func (h *pushNotificationHandlerContext) GetConn() *pool.Conn {
return h.conn
}
// IsBlocking returns true if the notification was received on a blocking connection
func (h *pushNotificationHandlerContext) IsBlocking() bool {
return h.isBlocking
}

View File

@ -1,9 +1,7 @@
package redis package redis
import ( import (
"context" "github.com/redis/go-redis/v9/push"
"github.com/redis/go-redis/v9/internal/proto"
) )
// Push notification constants for cluster operations // Push notification constants for cluster operations
@ -24,147 +22,18 @@ const (
PushNotificationFailedOver = "FAILED_OVER" PushNotificationFailedOver = "FAILED_OVER"
) )
// PushNotificationHandlerContext is defined in push_notification_handler_context.go // NewPushNotificationProcessor creates a new push notification processor
// This processor maintains a registry of handlers and processes push notifications
// PushNotificationHandler defines the interface for push notification handlers. // It is used for RESP3 connections where push notifications are available
type PushNotificationHandler interface { func NewPushNotificationProcessor() push.NotificationProcessor {
// HandlePushNotification processes a push notification with context information. return push.NewProcessor()
// The handlerCtx provides information about the client, connection pool, and connection
// on which the notification was received, allowing handlers to make informed decisions.
// Returns true if the notification was handled, false otherwise.
HandlePushNotification(ctx context.Context, handlerCtx PushNotificationHandlerContext, notification []interface{}) bool
} }
// NewPushNotificationHandlerContext is defined in push_notification_handler_context.go // NewVoidPushNotificationProcessor creates a new void push notification processor
// This processor does not maintain any handlers and always returns nil for all operations
// Registry, Processor, and VoidProcessor are defined in push_notification_processor.go // It is used for RESP2 connections where push notifications are not available
// It can also be used to disable push notifications for RESP3 connections, where
// PushNotificationProcessorInterface defines the interface for push notification processors. // it will discard all push notifications without processing them
type PushNotificationProcessorInterface interface { func NewVoidPushNotificationProcessor() push.NotificationProcessor {
GetHandler(pushNotificationName string) PushNotificationHandler return push.NewVoidProcessor()
ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error
RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error
}
// PushNotificationRegistry manages push notification handlers.
type PushNotificationRegistry struct {
registry *Registry
}
// NewPushNotificationRegistry creates a new push notification registry.
func NewPushNotificationRegistry() *PushNotificationRegistry {
return &PushNotificationRegistry{
registry: NewRegistry(),
}
}
// RegisterHandler registers a handler for a specific push notification name.
func (r *PushNotificationRegistry) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
return r.registry.RegisterHandler(pushNotificationName, handler, protected)
}
// UnregisterHandler removes a handler for a specific push notification name.
func (r *PushNotificationRegistry) UnregisterHandler(pushNotificationName string) error {
return r.registry.UnregisterHandler(pushNotificationName)
}
// GetHandler returns the handler for a specific push notification name.
func (r *PushNotificationRegistry) GetHandler(pushNotificationName string) PushNotificationHandler {
return r.registry.GetHandler(pushNotificationName)
}
// GetRegisteredPushNotificationNames returns a list of all registered push notification names.
func (r *PushNotificationRegistry) GetRegisteredPushNotificationNames() []string {
return r.registry.GetRegisteredPushNotificationNames()
}
// PushNotificationProcessor handles push notifications with a registry of handlers.
type PushNotificationProcessor struct {
processor *Processor
}
// NewPushNotificationProcessor creates a new push notification processor.
func NewPushNotificationProcessor() *PushNotificationProcessor {
return &PushNotificationProcessor{
processor: NewProcessor(),
}
}
// GetHandler returns the handler for a specific push notification name.
func (p *PushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler {
return p.processor.GetHandler(pushNotificationName)
}
// RegisterHandler registers a handler for a specific push notification name.
func (p *PushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
return p.processor.RegisterHandler(pushNotificationName, handler, protected)
}
// UnregisterHandler removes a handler for a specific push notification name.
func (p *PushNotificationProcessor) UnregisterHandler(pushNotificationName string) error {
return p.processor.UnregisterHandler(pushNotificationName)
}
// ProcessPendingNotifications checks for and processes any pending push notifications.
// The handlerCtx provides context about the client, connection pool, and connection.
func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error {
return p.processor.ProcessPendingNotifications(ctx, handlerCtx, rd)
}
// VoidPushNotificationProcessor discards all push notifications without processing them.
type VoidPushNotificationProcessor struct {
processor *VoidProcessor
}
// NewVoidPushNotificationProcessor creates a new void push notification processor.
func NewVoidPushNotificationProcessor() *VoidPushNotificationProcessor {
return &VoidPushNotificationProcessor{
processor: NewVoidProcessor(),
}
}
// GetHandler returns nil for void processor since it doesn't maintain handlers.
func (v *VoidPushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler {
return v.processor.GetHandler(pushNotificationName)
}
// RegisterHandler returns an error for void processor since it doesn't maintain handlers.
func (v *VoidPushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
return v.processor.RegisterHandler(pushNotificationName, handler, protected)
}
// ProcessPendingNotifications reads and discards any pending push notifications.
func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error {
return v.processor.ProcessPendingNotifications(ctx, handlerCtx, rd)
}
// PushNotificationInfo contains metadata about a push notification.
type PushNotificationInfo struct {
Name string
Args []interface{}
}
// ParsePushNotificationInfo extracts information from a push notification.
func ParsePushNotificationInfo(notification []interface{}) *PushNotificationInfo {
if len(notification) == 0 {
return nil
}
name, ok := notification[0].(string)
if !ok {
return nil
}
return &PushNotificationInfo{
Name: name,
Args: notification[1:],
}
}
// String returns a string representation of the push notification info.
func (info *PushNotificationInfo) String() string {
if info == nil {
return "<nil>"
}
return info.Name
} }

View File

@ -14,6 +14,7 @@ import (
"github.com/redis/go-redis/v9/internal/hscan" "github.com/redis/go-redis/v9/internal/hscan"
"github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/push"
) )
// Scanner internal/hscan.Scanner exposed interface. // Scanner internal/hscan.Scanner exposed interface.
@ -209,7 +210,7 @@ type baseClient struct {
onClose func() error // hook called when client is closed onClose func() error // hook called when client is closed
// Push notification processing // Push notification processing
pushProcessor PushNotificationProcessorInterface pushProcessor push.NotificationProcessor
} }
func (c *baseClient) clone() *baseClient { func (c *baseClient) clone() *baseClient {
@ -880,7 +881,7 @@ func (c *Client) Options() *Options {
// initializePushProcessor initializes the push notification processor for any client type. // initializePushProcessor initializes the push notification processor for any client type.
// This is a shared helper to avoid duplication across NewClient, NewFailoverClient, and NewSentinelClient. // This is a shared helper to avoid duplication across NewClient, NewFailoverClient, and NewSentinelClient.
func initializePushProcessor(opt *Options) PushNotificationProcessorInterface { func initializePushProcessor(opt *Options) push.NotificationProcessor {
// Always use custom processor if provided // Always use custom processor if provided
if opt.PushNotificationProcessor != nil { if opt.PushNotificationProcessor != nil {
return opt.PushNotificationProcessor return opt.PushNotificationProcessor
@ -899,18 +900,13 @@ func initializePushProcessor(opt *Options) PushNotificationProcessorInterface {
// RegisterPushNotificationHandler registers a handler for a specific push notification name. // RegisterPushNotificationHandler registers a handler for a specific push notification name.
// Returns an error if a handler is already registered for this push notification name. // Returns an error if a handler is already registered for this push notification name.
// If protected is true, the handler cannot be unregistered. // If protected is true, the handler cannot be unregistered.
func (c *Client) RegisterPushNotificationHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { func (c *Client) RegisterPushNotificationHandler(pushNotificationName string, handler push.NotificationHandler, protected bool) error {
return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected) return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected)
} }
// GetPushNotificationProcessor returns the push notification processor.
func (c *Client) GetPushNotificationProcessor() PushNotificationProcessorInterface {
return c.pushProcessor
}
// GetPushNotificationHandler returns the handler for a specific push notification name. // GetPushNotificationHandler returns the handler for a specific push notification name.
// Returns nil if no handler is registered for the given name. // Returns nil if no handler is registered for the given name.
func (c *Client) GetPushNotificationHandler(pushNotificationName string) PushNotificationHandler { func (c *Client) GetPushNotificationHandler(pushNotificationName string) push.NotificationHandler {
return c.pushProcessor.GetHandler(pushNotificationName) return c.pushProcessor.GetHandler(pushNotificationName)
} }
@ -1070,15 +1066,10 @@ func (c *Conn) Process(ctx context.Context, cmd Cmder) error {
// RegisterPushNotificationHandler registers a handler for a specific push notification name. // RegisterPushNotificationHandler registers a handler for a specific push notification name.
// Returns an error if a handler is already registered for this push notification name. // Returns an error if a handler is already registered for this push notification name.
// If protected is true, the handler cannot be unregistered. // If protected is true, the handler cannot be unregistered.
func (c *Conn) RegisterPushNotificationHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { func (c *Conn) RegisterPushNotificationHandler(pushNotificationName string, handler push.NotificationHandler, protected bool) error {
return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected) return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected)
} }
// GetPushNotificationProcessor returns the push notification processor.
func (c *Conn) GetPushNotificationProcessor() PushNotificationProcessorInterface {
return c.pushProcessor
}
func (c *Conn) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) { func (c *Conn) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(ctx, fn) return c.Pipeline().Pipelined(ctx, fn)
} }
@ -1138,8 +1129,6 @@ func (c *baseClient) processPendingPushNotificationWithReader(ctx context.Contex
} }
// pushNotificationHandlerContext creates a handler context for push notification processing // pushNotificationHandlerContext creates a handler context for push notification processing
func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) PushNotificationHandlerContext { func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) push.NotificationHandlerContext {
return NewPushNotificationHandlerContext(c, c.connPool, nil, cn, false) return push.NewNotificationHandlerContext(c, c.connPool, nil, cn, false)
} }

View File

@ -16,6 +16,7 @@ import (
"github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/rand" "github.com/redis/go-redis/v9/internal/rand"
"github.com/redis/go-redis/v9/push"
) )
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@ -511,21 +512,16 @@ func NewSentinelClient(opt *Options) *SentinelClient {
return c return c
} }
// GetPushNotificationProcessor returns the push notification processor.
func (c *SentinelClient) GetPushNotificationProcessor() PushNotificationProcessorInterface {
return c.pushProcessor
}
// GetPushNotificationHandler returns the handler for a specific push notification name. // GetPushNotificationHandler returns the handler for a specific push notification name.
// Returns nil if no handler is registered for the given name. // Returns nil if no handler is registered for the given name.
func (c *SentinelClient) GetPushNotificationHandler(pushNotificationName string) PushNotificationHandler { func (c *SentinelClient) GetPushNotificationHandler(pushNotificationName string) push.NotificationHandler {
return c.pushProcessor.GetHandler(pushNotificationName) return c.pushProcessor.GetHandler(pushNotificationName)
} }
// RegisterPushNotificationHandler registers a handler for a specific push notification name. // RegisterPushNotificationHandler registers a handler for a specific push notification name.
// Returns an error if a handler is already registered for this push notification name. // Returns an error if a handler is already registered for this push notification name.
// If protected is true, the handler cannot be unregistered. // If protected is true, the handler cannot be unregistered.
func (c *SentinelClient) RegisterPushNotificationHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { func (c *SentinelClient) RegisterPushNotificationHandler(pushNotificationName string, handler push.NotificationHandler, protected bool) error {
return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected) return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected)
} }