1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-18 00:20:57 +03:00

feat: remove global handlers and enable push notifications by default

- Remove all global push notification handler functionality
- Simplify registry to support only single handler per notification type
- Enable push notifications by default for RESP3 connections
- Update comprehensive test suite to remove global handler tests
- Update demo to show multiple specific handlers instead of global handlers
- Always respect custom processors regardless of PushNotifications flag

Push notifications are now automatically enabled for RESP3 and each
notification type has a single dedicated handler for predictable behavior.
This commit is contained in:
Nedyalko Dyakov
2025-06-26 21:03:19 +03:00
parent 1ff0ded0e3
commit e6e2cead66
5 changed files with 51 additions and 213 deletions

View File

@ -18,8 +18,8 @@ func main() {
// Example 2: Custom push notification handlers // Example 2: Custom push notification handlers
customHandlersExample() customHandlersExample()
// Example 3: Global push notification handlers // Example 3: Multiple specific handlers
globalHandlersExample() multipleSpecificHandlersExample()
// Example 4: Custom push notifications // Example 4: Custom push notifications
customPushNotificationExample() customPushNotificationExample()
@ -95,8 +95,8 @@ func customHandlersExample() {
fmt.Println(" - SYSTEM_ALERT: Handles system alert notifications") fmt.Println(" - SYSTEM_ALERT: Handles system alert notifications")
} }
func globalHandlersExample() { func multipleSpecificHandlersExample() {
fmt.Println("\n=== Global Push Notification Handler Example ===") fmt.Println("\n=== Multiple Specific Handlers Example ===")
client := redis.NewClient(&redis.Options{ client := redis.NewClient(&redis.Options{
Addr: "localhost:6379", Addr: "localhost:6379",
@ -105,25 +105,21 @@ func globalHandlersExample() {
}) })
defer client.Close() defer client.Close()
// Register a global handler that receives ALL push notifications // Register specific handlers
client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
if len(notification) > 0 {
command := notification[0]
fmt.Printf("📡 Global handler received: %v (args: %d)\n", command, len(notification)-1)
}
return true
})
// Register specific handlers as well
client.RegisterPushNotificationHandlerFunc("SPECIFIC_EVENT", func(ctx context.Context, notification []interface{}) bool { client.RegisterPushNotificationHandlerFunc("SPECIFIC_EVENT", func(ctx context.Context, notification []interface{}) bool {
fmt.Printf("🎯 Specific handler for SPECIFIC_EVENT: %v\n", notification) fmt.Printf("🎯 Specific handler for SPECIFIC_EVENT: %v\n", notification)
return true return true
}) })
fmt.Println("✅ Global and specific handlers registered:") client.RegisterPushNotificationHandlerFunc("ANOTHER_EVENT", func(ctx context.Context, notification []interface{}) bool {
fmt.Println(" - Global handler will receive ALL push notifications") fmt.Printf("🎯 Specific handler for ANOTHER_EVENT: %v\n", notification)
fmt.Println(" - Specific handler will receive only SPECIFIC_EVENT notifications") return true
fmt.Println(" - Both handlers will be called for SPECIFIC_EVENT notifications") })
fmt.Println("✅ Specific handlers registered:")
fmt.Println(" - SPECIFIC_EVENT handler will receive only SPECIFIC_EVENT notifications")
fmt.Println(" - ANOTHER_EVENT handler will receive only ANOTHER_EVENT notifications")
fmt.Println(" - Each notification type has a single dedicated handler")
} }
func customPushNotificationExample() { func customPushNotificationExample() {
@ -143,24 +139,9 @@ func customPushNotificationExample() {
return true return true
}) })
// Register a global handler to monitor all notifications
client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
if len(notification) > 0 {
command := notification[0]
switch command {
case "MOVING", "MIGRATING", "MIGRATED":
fmt.Printf("🔄 Cluster notification: %v\n", command)
default:
fmt.Printf("📨 Other notification: %v\n", command)
}
}
return true
})
fmt.Println("✅ Custom push notifications enabled:") fmt.Println("✅ Custom push notifications enabled:")
fmt.Println(" - MOVING, MIGRATING, MIGRATED notifications → Cluster handlers")
fmt.Println(" - APPLICATION_EVENT notifications → Custom handler") fmt.Println(" - APPLICATION_EVENT notifications → Custom handler")
fmt.Println(" - All notifications → Global monitoring handler") fmt.Println(" - Each notification type has a single dedicated handler")
} }
func multipleNotificationTypesExample() { func multipleNotificationTypesExample() {

View File

@ -221,7 +221,11 @@ type Options struct {
// When enabled, the client will process RESP3 push notifications and // When enabled, the client will process RESP3 push notifications and
// route them to registered handlers. // route them to registered handlers.
// //
// default: false // For RESP3 connections (Protocol: 3), push notifications are automatically enabled.
// To disable push notifications for RESP3, use Protocol: 2 instead.
// For RESP2 connections, push notifications are not available.
//
// default: automatically enabled for RESP3, disabled for RESP2
PushNotifications bool PushNotifications bool
// PushNotificationProcessor is the processor for handling push notifications. // PushNotificationProcessor is the processor for handling push notifications.

View File

@ -28,14 +28,12 @@ func (f PushNotificationHandlerFunc) HandlePushNotification(ctx context.Context,
type PushNotificationRegistry struct { type PushNotificationRegistry struct {
mu sync.RWMutex mu sync.RWMutex
handlers map[string]PushNotificationHandler // command -> single handler handlers map[string]PushNotificationHandler // command -> single handler
global []PushNotificationHandler // global handlers for all notifications
} }
// NewPushNotificationRegistry creates a new push notification registry. // NewPushNotificationRegistry creates a new push notification registry.
func NewPushNotificationRegistry() *PushNotificationRegistry { func NewPushNotificationRegistry() *PushNotificationRegistry {
return &PushNotificationRegistry{ return &PushNotificationRegistry{
handlers: make(map[string]PushNotificationHandler), handlers: make(map[string]PushNotificationHandler),
global: make([]PushNotificationHandler, 0),
} }
} }
@ -52,14 +50,6 @@ func (r *PushNotificationRegistry) RegisterHandler(command string, handler PushN
return nil return nil
} }
// RegisterGlobalHandler registers a handler that will receive all push notifications.
func (r *PushNotificationRegistry) RegisterGlobalHandler(handler PushNotificationHandler) {
r.mu.Lock()
defer r.mu.Unlock()
r.global = append(r.global, handler)
}
// UnregisterHandler removes the handler for a specific push notification command. // UnregisterHandler removes the handler for a specific push notification command.
func (r *PushNotificationRegistry) UnregisterHandler(command string) { func (r *PushNotificationRegistry) UnregisterHandler(command string) {
r.mu.Lock() r.mu.Lock()
@ -68,7 +58,7 @@ func (r *PushNotificationRegistry) UnregisterHandler(command string) {
delete(r.handlers, command) delete(r.handlers, command)
} }
// HandleNotification processes a push notification by calling all registered handlers. // HandleNotification processes a push notification by calling the registered handler.
func (r *PushNotificationRegistry) HandleNotification(ctx context.Context, notification []interface{}) bool { func (r *PushNotificationRegistry) HandleNotification(ctx context.Context, notification []interface{}) bool {
if len(notification) == 0 { if len(notification) == 0 {
return false return false
@ -83,23 +73,12 @@ func (r *PushNotificationRegistry) HandleNotification(ctx context.Context, notif
r.mu.RLock() r.mu.RLock()
defer r.mu.RUnlock() defer r.mu.RUnlock()
handled := false
// Call global handlers first
for _, handler := range r.global {
if handler.HandlePushNotification(ctx, notification) {
handled = true
}
}
// Call specific handler // Call specific handler
if handler, exists := r.handlers[command]; exists { if handler, exists := r.handlers[command]; exists {
if handler.HandlePushNotification(ctx, notification) { return handler.HandlePushNotification(ctx, notification)
handled = true
}
} }
return handled return false
} }
// GetRegisteredCommands returns a list of commands that have registered handlers. // GetRegisteredCommands returns a list of commands that have registered handlers.
@ -114,12 +93,12 @@ func (r *PushNotificationRegistry) GetRegisteredCommands() []string {
return commands return commands
} }
// HasHandlers returns true if there are any handlers registered (global or specific). // HasHandlers returns true if there are any handlers registered.
func (r *PushNotificationRegistry) HasHandlers() bool { func (r *PushNotificationRegistry) HasHandlers() bool {
r.mu.RLock() r.mu.RLock()
defer r.mu.RUnlock() defer r.mu.RUnlock()
return len(r.global) > 0 || len(r.handlers) > 0 return len(r.handlers) > 0
} }
// PushNotificationProcessor handles the processing of push notifications from Redis. // PushNotificationProcessor handles the processing of push notifications from Redis.
@ -206,22 +185,12 @@ func (p *PushNotificationProcessor) RegisterHandler(command string, handler Push
return p.registry.RegisterHandler(command, handler) return p.registry.RegisterHandler(command, handler)
} }
// RegisterGlobalHandler is a convenience method to register a global handler.
func (p *PushNotificationProcessor) RegisterGlobalHandler(handler PushNotificationHandler) {
p.registry.RegisterGlobalHandler(handler)
}
// RegisterHandlerFunc is a convenience method to register a function as a handler. // RegisterHandlerFunc is a convenience method to register a function as a handler.
// Returns an error if a handler is already registered for this command. // Returns an error if a handler is already registered for this command.
func (p *PushNotificationProcessor) RegisterHandlerFunc(command string, handlerFunc func(ctx context.Context, notification []interface{}) bool) error { func (p *PushNotificationProcessor) RegisterHandlerFunc(command string, handlerFunc func(ctx context.Context, notification []interface{}) bool) error {
return p.registry.RegisterHandler(command, PushNotificationHandlerFunc(handlerFunc)) return p.registry.RegisterHandler(command, PushNotificationHandlerFunc(handlerFunc))
} }
// RegisterGlobalHandlerFunc is a convenience method to register a function as a global handler.
func (p *PushNotificationProcessor) RegisterGlobalHandlerFunc(handlerFunc func(ctx context.Context, notification []interface{}) bool) {
p.registry.RegisterGlobalHandler(PushNotificationHandlerFunc(handlerFunc))
}
// Common push notification commands // Common push notification commands
const ( const (
// Redis Cluster notifications // Redis Cluster notifications

View File

@ -56,34 +56,6 @@ func TestPushNotificationRegistry(t *testing.T) {
t.Error("Handler should have been called") t.Error("Handler should have been called")
} }
// Test global handler
globalHandlerCalled := false
globalHandler := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
globalHandlerCalled = true
return true
})
registry.RegisterGlobalHandler(globalHandler)
// Reset flags
handlerCalled = false
globalHandlerCalled = false
// Handle notification again
handled = registry.HandleNotification(ctx, notification)
if !handled {
t.Error("Notification should have been handled")
}
if !handlerCalled {
t.Error("Specific handler should have been called")
}
if !globalHandlerCalled {
t.Error("Global handler should have been called")
}
// Test duplicate handler registration error // Test duplicate handler registration error
duplicateHandler := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { duplicateHandler := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
return true return true
@ -124,13 +96,6 @@ func TestPushNotificationProcessor(t *testing.T) {
t.Fatalf("Failed to register handler: %v", err) t.Fatalf("Failed to register handler: %v", err)
} }
// Test global handler
globalHandlerCalled := false
processor.RegisterGlobalHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
globalHandlerCalled = true
return true
})
// Simulate handling a notification // Simulate handling a notification
ctx := context.Background() ctx := context.Background()
notification := []interface{}{"CUSTOM_NOTIFICATION", "data"} notification := []interface{}{"CUSTOM_NOTIFICATION", "data"}
@ -144,10 +109,6 @@ func TestPushNotificationProcessor(t *testing.T) {
t.Error("Specific handler should have been called") t.Error("Specific handler should have been called")
} }
if !globalHandlerCalled {
t.Error("Global handler should have been called")
}
// Test disabling processor // Test disabling processor
processor.SetEnabled(false) processor.SetEnabled(false)
if processor.IsEnabled() { if processor.IsEnabled() {
@ -184,13 +145,6 @@ func TestClientPushNotificationIntegration(t *testing.T) {
t.Fatalf("Failed to register handler: %v", err) t.Fatalf("Failed to register handler: %v", err)
} }
// Test global handler through client
globalHandlerCalled := false
client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
globalHandlerCalled = true
return true
})
// Simulate notification handling // Simulate notification handling
ctx := context.Background() ctx := context.Background()
notification := []interface{}{"CUSTOM_EVENT", "test_data"} notification := []interface{}{"CUSTOM_EVENT", "test_data"}
@ -203,10 +157,6 @@ func TestClientPushNotificationIntegration(t *testing.T) {
if !handlerCalled { if !handlerCalled {
t.Error("Custom handler should have been called") t.Error("Custom handler should have been called")
} }
if !globalHandlerCalled {
t.Error("Global handler should have been called")
}
} }
func TestClientWithoutPushNotifications(t *testing.T) { func TestClientWithoutPushNotifications(t *testing.T) {
@ -224,13 +174,12 @@ func TestClientWithoutPushNotifications(t *testing.T) {
} }
// Registering handlers should not panic // Registering handlers should not panic
client.RegisterPushNotificationHandlerFunc("TEST", func(ctx context.Context, notification []interface{}) bool { err := client.RegisterPushNotificationHandlerFunc("TEST", func(ctx context.Context, notification []interface{}) bool {
return true
})
client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
return true return true
}) })
if err != nil {
t.Errorf("Expected nil error when processor is nil, got: %v", err)
}
} }
func TestPushNotificationEnabledClient(t *testing.T) { func TestPushNotificationEnabledClient(t *testing.T) {
@ -522,18 +471,11 @@ func TestPushNotificationRegistryDuplicateHandlerError(t *testing.T) {
} }
} }
func TestPushNotificationRegistryGlobalAndSpecific(t *testing.T) { func TestPushNotificationRegistrySpecificHandlerOnly(t *testing.T) {
registry := redis.NewPushNotificationRegistry() registry := redis.NewPushNotificationRegistry()
globalCalled := false
specificCalled := false specificCalled := false
// Register global handler
registry.RegisterGlobalHandler(redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
globalCalled = true
return true
}))
// Register specific handler // Register specific handler
err := registry.RegisterHandler("SPECIFIC_CMD", redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { err := registry.RegisterHandler("SPECIFIC_CMD", redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
specificCalled = true specificCalled = true
@ -552,28 +494,19 @@ func TestPushNotificationRegistryGlobalAndSpecific(t *testing.T) {
t.Error("Notification should be handled") t.Error("Notification should be handled")
} }
if !globalCalled {
t.Error("Global handler should be called")
}
if !specificCalled { if !specificCalled {
t.Error("Specific handler should be called") t.Error("Specific handler should be called")
} }
// Reset flags // Reset flag
globalCalled = false
specificCalled = false specificCalled = false
// Test with non-specific command // Test with non-specific command - should not be handled
notification = []interface{}{"OTHER_CMD", "data"} notification = []interface{}{"OTHER_CMD", "data"}
handled = registry.HandleNotification(ctx, notification) handled = registry.HandleNotification(ctx, notification)
if !handled { if handled {
t.Error("Notification should be handled by global handler") t.Error("Notification should not be handled without specific handler")
}
if !globalCalled {
t.Error("Global handler should be called for any command")
} }
if specificCalled { if specificCalled {
@ -631,15 +564,6 @@ func TestPushNotificationProcessorConvenienceMethods(t *testing.T) {
t.Fatalf("Failed to register handler: %v", err) t.Fatalf("Failed to register handler: %v", err)
} }
// Test RegisterGlobalHandler convenience method
globalHandlerCalled := false
globalHandler := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
globalHandlerCalled = true
return true
})
processor.RegisterGlobalHandler(globalHandler)
// Test RegisterHandlerFunc convenience method // Test RegisterHandlerFunc convenience method
funcHandlerCalled := false funcHandlerCalled := false
err = processor.RegisterHandlerFunc("FUNC_CMD", func(ctx context.Context, notification []interface{}) bool { err = processor.RegisterHandlerFunc("FUNC_CMD", func(ctx context.Context, notification []interface{}) bool {
@ -650,14 +574,7 @@ func TestPushNotificationProcessorConvenienceMethods(t *testing.T) {
t.Fatalf("Failed to register func handler: %v", err) t.Fatalf("Failed to register func handler: %v", err)
} }
// Test RegisterGlobalHandlerFunc convenience method // Test that handlers work
globalFuncHandlerCalled := false
processor.RegisterGlobalHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
globalFuncHandlerCalled = true
return true
})
// Test that all handlers work
ctx := context.Background() ctx := context.Background()
// Test specific handler // Test specific handler
@ -668,15 +585,13 @@ func TestPushNotificationProcessorConvenienceMethods(t *testing.T) {
t.Error("Notification should be handled") t.Error("Notification should be handled")
} }
if !handlerCalled || !globalHandlerCalled || !globalFuncHandlerCalled { if !handlerCalled {
t.Error("Handler, global handler, and global func handler should all be called") t.Error("Handler should be called")
} }
// Reset flags // Reset flags
handlerCalled = false handlerCalled = false
globalHandlerCalled = false
funcHandlerCalled = false funcHandlerCalled = false
globalFuncHandlerCalled = false
// Test func handler // Test func handler
notification = []interface{}{"FUNC_CMD", "data"} notification = []interface{}{"FUNC_CMD", "data"}
@ -686,8 +601,8 @@ func TestPushNotificationProcessorConvenienceMethods(t *testing.T) {
t.Error("Notification should be handled") t.Error("Notification should be handled")
} }
if !funcHandlerCalled || !globalHandlerCalled || !globalFuncHandlerCalled { if !funcHandlerCalled {
t.Error("Func handler, global handler, and global func handler should all be called") t.Error("Func handler should be called")
} }
} }
@ -707,10 +622,6 @@ func TestClientPushNotificationEdgeCases(t *testing.T) {
t.Errorf("Expected nil error when processor is nil, got: %v", err) t.Errorf("Expected nil error when processor is nil, got: %v", err)
} }
client.RegisterGlobalPushNotificationHandler(redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
return true
}))
err = client.RegisterPushNotificationHandlerFunc("TEST_FUNC", func(ctx context.Context, notification []interface{}) bool { err = client.RegisterPushNotificationHandlerFunc("TEST_FUNC", func(ctx context.Context, notification []interface{}) bool {
return true return true
}) })
@ -718,10 +629,6 @@ func TestClientPushNotificationEdgeCases(t *testing.T) {
t.Errorf("Expected nil error when processor is nil, got: %v", err) t.Errorf("Expected nil error when processor is nil, got: %v", err)
} }
client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
return true
})
// GetPushNotificationProcessor should return nil // GetPushNotificationProcessor should return nil
processor := client.GetPushNotificationProcessor() processor := client.GetPushNotificationProcessor()
if processor != nil { if processor != nil {
@ -867,13 +774,6 @@ func TestPushNotificationRegistryConcurrency(t *testing.T) {
notification := []interface{}{command, "data"} notification := []interface{}{command, "data"}
registry.HandleNotification(context.Background(), notification) registry.HandleNotification(context.Background(), notification)
// Register global handler occasionally
if j%10 == 0 {
registry.RegisterGlobalHandler(redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
return true
}))
}
// Check registry state // Check registry state
registry.HasHandlers() registry.HasHandlers()
registry.GetRegisteredCommands() registry.GetRegisteredCommands()
@ -972,13 +872,6 @@ func TestPushNotificationClientConcurrency(t *testing.T) {
return true return true
}) })
// Register global handlers occasionally
if j%5 == 0 {
client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
return true
})
}
// Access processor // Access processor
processor := client.GetPushNotificationProcessor() processor := client.GetPushNotificationProcessor()
if processor != nil { if processor != nil {

View File

@ -755,6 +755,12 @@ func NewClient(opt *Options) *Client {
} }
opt.init() opt.init()
// Enable push notifications by default for RESP3
// Only override if no custom processor is provided
if opt.Protocol == 3 && opt.PushNotificationProcessor == nil {
opt.PushNotifications = true
}
c := Client{ c := Client{
baseClient: &baseClient{ baseClient: &baseClient{
opt: opt, opt: opt,
@ -803,15 +809,14 @@ func (c *Client) Options() *Options {
// initializePushProcessor initializes the push notification processor. // initializePushProcessor initializes the push notification processor.
func (c *Client) initializePushProcessor() { func (c *Client) initializePushProcessor() {
// Initialize push processor if enabled // Always use custom processor if provided
if c.opt.PushNotifications {
if c.opt.PushNotificationProcessor != nil { if c.opt.PushNotificationProcessor != nil {
c.pushProcessor = c.opt.PushNotificationProcessor c.pushProcessor = c.opt.PushNotificationProcessor
} else { } else if c.opt.PushNotifications {
// Create default processor only if push notifications are enabled
c.pushProcessor = NewPushNotificationProcessor(true) c.pushProcessor = NewPushNotificationProcessor(true)
} }
} }
}
// RegisterPushNotificationHandler registers a handler for a specific push notification command. // RegisterPushNotificationHandler registers a handler for a specific push notification command.
// Returns an error if a handler is already registered for this command. // Returns an error if a handler is already registered for this command.
@ -822,13 +827,6 @@ func (c *Client) RegisterPushNotificationHandler(command string, handler PushNot
return nil return nil
} }
// RegisterGlobalPushNotificationHandler registers a handler that will receive all push notifications.
func (c *Client) RegisterGlobalPushNotificationHandler(handler PushNotificationHandler) {
if c.pushProcessor != nil {
c.pushProcessor.RegisterGlobalHandler(handler)
}
}
// RegisterPushNotificationHandlerFunc registers a function as a handler for a specific push notification command. // RegisterPushNotificationHandlerFunc registers a function as a handler for a specific push notification command.
// Returns an error if a handler is already registered for this command. // Returns an error if a handler is already registered for this command.
func (c *Client) RegisterPushNotificationHandlerFunc(command string, handlerFunc func(ctx context.Context, notification []interface{}) bool) error { func (c *Client) RegisterPushNotificationHandlerFunc(command string, handlerFunc func(ctx context.Context, notification []interface{}) bool) error {
@ -838,13 +836,6 @@ func (c *Client) RegisterPushNotificationHandlerFunc(command string, handlerFunc
return nil return nil
} }
// RegisterGlobalPushNotificationHandlerFunc registers a function as a global handler for all push notifications.
func (c *Client) RegisterGlobalPushNotificationHandlerFunc(handlerFunc func(ctx context.Context, notification []interface{}) bool) {
if c.pushProcessor != nil {
c.pushProcessor.RegisterGlobalHandlerFunc(handlerFunc)
}
}
// GetPushNotificationProcessor returns the push notification processor. // GetPushNotificationProcessor returns the push notification processor.
func (c *Client) GetPushNotificationProcessor() *PushNotificationProcessor { func (c *Client) GetPushNotificationProcessor() *PushNotificationProcessor {
return c.pushProcessor return c.pushProcessor