1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-29 17:41:15 +03:00

feat: enforce single handler per notification type

- Change PushNotificationRegistry to allow only one handler per command
- RegisterHandler methods now return error if handler already exists
- Update UnregisterHandler to remove handler by command only
- Update all client methods to return errors for duplicate registrations
- Update comprehensive test suite to verify single handler behavior
- Add specific test for duplicate handler error scenarios

This prevents handler conflicts and ensures predictable notification
routing with clear error handling for registration conflicts.
This commit is contained in:
Nedyalko Dyakov
2025-06-26 20:38:30 +03:00
parent b02eed63b2
commit 1ff0ded0e3
3 changed files with 146 additions and 110 deletions

View File

@ -2,6 +2,7 @@ package redis
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal"
@ -26,27 +27,29 @@ func (f PushNotificationHandlerFunc) HandlePushNotification(ctx context.Context,
// PushNotificationRegistry manages handlers for different types of push notifications. // PushNotificationRegistry manages handlers for different types of push notifications.
type PushNotificationRegistry struct { type PushNotificationRegistry struct {
mu sync.RWMutex mu sync.RWMutex
handlers map[string][]PushNotificationHandler // command -> handlers handlers map[string]PushNotificationHandler // command -> single handler
global []PushNotificationHandler // global handlers for all notifications global []PushNotificationHandler // global handlers for all notifications
} }
// NewPushNotificationRegistry creates a new push notification registry. // NewPushNotificationRegistry creates a new push notification registry.
func NewPushNotificationRegistry() *PushNotificationRegistry { func NewPushNotificationRegistry() *PushNotificationRegistry {
return &PushNotificationRegistry{ return &PushNotificationRegistry{
handlers: make(map[string][]PushNotificationHandler), handlers: make(map[string]PushNotificationHandler),
global: make([]PushNotificationHandler, 0), global: make([]PushNotificationHandler, 0),
} }
} }
// RegisterHandler registers a handler for a specific push notification command. // RegisterHandler registers a handler for a specific push notification command.
func (r *PushNotificationRegistry) RegisterHandler(command string, handler PushNotificationHandler) { // Returns an error if a handler is already registered for this command.
func (r *PushNotificationRegistry) RegisterHandler(command string, handler PushNotificationHandler) error {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
if r.handlers[command] == nil { if _, exists := r.handlers[command]; exists {
r.handlers[command] = make([]PushNotificationHandler, 0) return fmt.Errorf("handler already registered for command: %s", command)
} }
r.handlers[command] = append(r.handlers[command], handler) r.handlers[command] = handler
return nil
} }
// RegisterGlobalHandler registers a handler that will receive all push notifications. // RegisterGlobalHandler registers a handler that will receive all push notifications.
@ -57,19 +60,12 @@ func (r *PushNotificationRegistry) RegisterGlobalHandler(handler PushNotificatio
r.global = append(r.global, handler) r.global = append(r.global, handler)
} }
// UnregisterHandler removes a handler for a specific command. // UnregisterHandler removes the handler for a specific push notification command.
func (r *PushNotificationRegistry) UnregisterHandler(command string, handler PushNotificationHandler) { func (r *PushNotificationRegistry) UnregisterHandler(command string) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
handlers := r.handlers[command] delete(r.handlers, command)
for i, h := range handlers {
// Compare function pointers (this is a simplified approach)
if &h == &handler {
r.handlers[command] = append(handlers[:i], handlers[i+1:]...)
break
}
}
} }
// HandleNotification processes a push notification by calling all registered handlers. // HandleNotification processes a push notification by calling all registered handlers.
@ -96,12 +92,10 @@ func (r *PushNotificationRegistry) HandleNotification(ctx context.Context, notif
} }
} }
// Call specific handlers // Call specific handler
if handlers, exists := r.handlers[command]; exists { if handler, exists := r.handlers[command]; exists {
for _, handler := range handlers { if handler.HandlePushNotification(ctx, notification) {
if handler.HandlePushNotification(ctx, notification) { handled = true
handled = true
}
} }
} }
@ -207,8 +201,9 @@ func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Cont
} }
// RegisterHandler is a convenience method to register a handler for a specific command. // RegisterHandler is a convenience method to register a handler for a specific command.
func (p *PushNotificationProcessor) RegisterHandler(command string, handler PushNotificationHandler) { // Returns an error if a handler is already registered for this command.
p.registry.RegisterHandler(command, handler) func (p *PushNotificationProcessor) RegisterHandler(command string, handler PushNotificationHandler) error {
return p.registry.RegisterHandler(command, handler)
} }
// RegisterGlobalHandler is a convenience method to register a global handler. // RegisterGlobalHandler is a convenience method to register a global handler.
@ -217,8 +212,9 @@ func (p *PushNotificationProcessor) RegisterGlobalHandler(handler PushNotificati
} }
// RegisterHandlerFunc is a convenience method to register a function as a handler. // RegisterHandlerFunc is a convenience method to register a function as a handler.
func (p *PushNotificationProcessor) RegisterHandlerFunc(command string, handlerFunc func(ctx context.Context, notification []interface{}) bool) { // Returns an error if a handler is already registered for this command.
p.registry.RegisterHandler(command, PushNotificationHandlerFunc(handlerFunc)) func (p *PushNotificationProcessor) RegisterHandlerFunc(command string, handlerFunc func(ctx context.Context, notification []interface{}) bool) error {
return p.registry.RegisterHandler(command, PushNotificationHandlerFunc(handlerFunc))
} }
// RegisterGlobalHandlerFunc is a convenience method to register a function as a global handler. // RegisterGlobalHandlerFunc is a convenience method to register a function as a global handler.

View File

@ -29,7 +29,10 @@ func TestPushNotificationRegistry(t *testing.T) {
return true return true
}) })
registry.RegisterHandler("TEST_COMMAND", handler) err := registry.RegisterHandler("TEST_COMMAND", handler)
if err != nil {
t.Fatalf("Failed to register handler: %v", err)
}
if !registry.HasHandlers() { if !registry.HasHandlers() {
t.Error("Registry should have handlers after registration") t.Error("Registry should have handlers after registration")
@ -80,6 +83,19 @@ func TestPushNotificationRegistry(t *testing.T) {
if !globalHandlerCalled { if !globalHandlerCalled {
t.Error("Global handler should have been called") t.Error("Global handler should have been called")
} }
// Test duplicate handler registration error
duplicateHandler := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
return true
})
err = registry.RegisterHandler("TEST_COMMAND", duplicateHandler)
if err == nil {
t.Error("Expected error when registering duplicate handler")
}
expectedError := "handler already registered for command: TEST_COMMAND"
if err.Error() != expectedError {
t.Errorf("Expected error '%s', got '%s'", expectedError, err.Error())
}
} }
func TestPushNotificationProcessor(t *testing.T) { func TestPushNotificationProcessor(t *testing.T) {
@ -92,7 +108,7 @@ func TestPushNotificationProcessor(t *testing.T) {
// Test registering handlers // Test registering handlers
handlerCalled := false handlerCalled := false
processor.RegisterHandlerFunc("CUSTOM_NOTIFICATION", func(ctx context.Context, notification []interface{}) bool { err := processor.RegisterHandlerFunc("CUSTOM_NOTIFICATION", func(ctx context.Context, notification []interface{}) bool {
handlerCalled = true handlerCalled = true
if len(notification) < 2 { if len(notification) < 2 {
t.Error("Expected at least 2 elements in notification") t.Error("Expected at least 2 elements in notification")
@ -104,6 +120,9 @@ func TestPushNotificationProcessor(t *testing.T) {
} }
return true return true
}) })
if err != nil {
t.Fatalf("Failed to register handler: %v", err)
}
// Test global handler // Test global handler
globalHandlerCalled := false globalHandlerCalled := false
@ -157,10 +176,13 @@ func TestClientPushNotificationIntegration(t *testing.T) {
// Test registering handlers through client // Test registering handlers through client
handlerCalled := false handlerCalled := false
client.RegisterPushNotificationHandlerFunc("CUSTOM_EVENT", func(ctx context.Context, notification []interface{}) bool { err := client.RegisterPushNotificationHandlerFunc("CUSTOM_EVENT", func(ctx context.Context, notification []interface{}) bool {
handlerCalled = true handlerCalled = true
return true return true
}) })
if err != nil {
t.Fatalf("Failed to register handler: %v", err)
}
// Test global handler through client // Test global handler through client
globalHandlerCalled := false globalHandlerCalled := false
@ -232,10 +254,13 @@ func TestPushNotificationEnabledClient(t *testing.T) {
// Test registering a handler // Test registering a handler
handlerCalled := false handlerCalled := false
client.RegisterPushNotificationHandlerFunc("TEST_NOTIFICATION", func(ctx context.Context, notification []interface{}) bool { err := client.RegisterPushNotificationHandlerFunc("TEST_NOTIFICATION", func(ctx context.Context, notification []interface{}) bool {
handlerCalled = true handlerCalled = true
return true return true
}) })
if err != nil {
t.Fatalf("Failed to register handler: %v", err)
}
// Test that the handler works // Test that the handler works
registry := processor.GetRegistry() registry := processor.GetRegistry()
@ -318,11 +343,14 @@ func TestPubSubWithGenericPushNotifications(t *testing.T) {
// Register a handler for custom push notifications // Register a handler for custom push notifications
customNotificationReceived := false customNotificationReceived := false
client.RegisterPushNotificationHandlerFunc("CUSTOM_PUBSUB_EVENT", func(ctx context.Context, notification []interface{}) bool { err := client.RegisterPushNotificationHandlerFunc("CUSTOM_PUBSUB_EVENT", func(ctx context.Context, notification []interface{}) bool {
customNotificationReceived = true customNotificationReceived = true
t.Logf("Received custom push notification in PubSub context: %v", notification) t.Logf("Received custom push notification in PubSub context: %v", notification)
return true return true
}) })
if err != nil {
t.Fatalf("Failed to register handler: %v", err)
}
// Create a PubSub instance // Create a PubSub instance
pubsub := client.Subscribe(context.Background(), "test-channel") pubsub := client.Subscribe(context.Background(), "test-channel")
@ -370,32 +398,28 @@ func TestPushNotificationMessageType(t *testing.T) {
} }
func TestPushNotificationRegistryUnregisterHandler(t *testing.T) { func TestPushNotificationRegistryUnregisterHandler(t *testing.T) {
// Test unregistering handlers (note: current implementation has limitations with function pointer comparison) // Test unregistering handlers
registry := redis.NewPushNotificationRegistry() registry := redis.NewPushNotificationRegistry()
// Register multiple handlers for the same command // Register a handler
handler1Called := false handlerCalled := false
handler1 := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { handler := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
handler1Called = true handlerCalled = true
return true return true
}) })
handler2Called := false err := registry.RegisterHandler("TEST_CMD", handler)
handler2 := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { if err != nil {
handler2Called = true t.Fatalf("Failed to register handler: %v", err)
return true }
})
registry.RegisterHandler("TEST_CMD", handler1) // Verify handler is registered
registry.RegisterHandler("TEST_CMD", handler2)
// Verify both handlers are registered
commands := registry.GetRegisteredCommands() commands := registry.GetRegisteredCommands()
if len(commands) != 1 || commands[0] != "TEST_CMD" { if len(commands) != 1 || commands[0] != "TEST_CMD" {
t.Errorf("Expected ['TEST_CMD'], got %v", commands) t.Errorf("Expected ['TEST_CMD'], got %v", commands)
} }
// Test notification handling with both handlers // Test notification handling
ctx := context.Background() ctx := context.Background()
notification := []interface{}{"TEST_CMD", "data"} notification := []interface{}{"TEST_CMD", "data"}
handled := registry.HandleNotification(ctx, notification) handled := registry.HandleNotification(ctx, notification)
@ -403,31 +427,32 @@ func TestPushNotificationRegistryUnregisterHandler(t *testing.T) {
if !handled { if !handled {
t.Error("Notification should have been handled") t.Error("Notification should have been handled")
} }
if !handler1Called || !handler2Called { if !handlerCalled {
t.Error("Both handlers should have been called") t.Error("Handler should have been called")
} }
// Test that UnregisterHandler doesn't panic (even if it doesn't work perfectly) // Test unregistering the handler
registry.UnregisterHandler("TEST_CMD", handler1) registry.UnregisterHandler("TEST_CMD")
registry.UnregisterHandler("NON_EXISTENT", handler2)
// Note: Due to the current implementation using pointer comparison, // Verify handler is unregistered
// unregistration may not work as expected. This test mainly verifies commands = registry.GetRegisteredCommands()
// that the method doesn't panic and the registry remains functional. if len(commands) != 0 {
t.Errorf("Expected no registered commands after unregister, got %v", commands)
// Reset flags and test that handlers still work }
handler1Called = false
handler2Called = false
// Reset flag and test that handler is no longer called
handlerCalled = false
handled = registry.HandleNotification(ctx, notification) handled = registry.HandleNotification(ctx, notification)
if !handled {
t.Error("Notification should still be handled after unregister attempts") if handled {
t.Error("Notification should not be handled after unregistration")
}
if handlerCalled {
t.Error("Handler should not be called after unregistration")
} }
// The registry should still be functional // Test unregistering non-existent handler (should not panic)
if !registry.HasHandlers() { registry.UnregisterHandler("NON_EXISTENT")
t.Error("Registry should still have handlers")
}
} }
func TestPushNotificationRegistryEdgeCases(t *testing.T) { func TestPushNotificationRegistryEdgeCases(t *testing.T) {
@ -453,51 +478,47 @@ func TestPushNotificationRegistryEdgeCases(t *testing.T) {
} }
// Test unregistering non-existent handler // Test unregistering non-existent handler
dummyHandler := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { registry.UnregisterHandler("NON_EXISTENT")
return true
})
registry.UnregisterHandler("NON_EXISTENT", dummyHandler)
// Should not panic // Should not panic
// Test unregistering from empty command // Test unregistering from empty command
registry.UnregisterHandler("EMPTY_CMD", dummyHandler) registry.UnregisterHandler("EMPTY_CMD")
// Should not panic // Should not panic
} }
func TestPushNotificationRegistryMultipleHandlers(t *testing.T) { func TestPushNotificationRegistryDuplicateHandlerError(t *testing.T) {
registry := redis.NewPushNotificationRegistry() registry := redis.NewPushNotificationRegistry()
// Test multiple handlers for the same command // Test that registering duplicate handlers returns an error
handler1Called := false handler1 := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
handler2Called := false
handler3Called := false
registry.RegisterHandler("MULTI_CMD", redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
handler1Called = true
return true return true
})) })
registry.RegisterHandler("MULTI_CMD", redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { handler2 := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
handler2Called = true return false
return false // Return false to test that other handlers still get called })
}))
registry.RegisterHandler("MULTI_CMD", redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { // Register first handler - should succeed
handler3Called = true err := registry.RegisterHandler("DUPLICATE_CMD", handler1)
return true if err != nil {
})) t.Fatalf("First handler registration should succeed: %v", err)
// Test that all handlers are called
ctx := context.Background()
notification := []interface{}{"MULTI_CMD", "data"}
handled := registry.HandleNotification(ctx, notification)
if !handled {
t.Error("Notification should be handled (at least one handler returned true)")
} }
if !handler1Called || !handler2Called || !handler3Called { // Register second handler for same command - should fail
t.Error("All handlers should have been called") err = registry.RegisterHandler("DUPLICATE_CMD", handler2)
if err == nil {
t.Error("Second handler registration should fail")
}
expectedError := "handler already registered for command: DUPLICATE_CMD"
if err.Error() != expectedError {
t.Errorf("Expected error '%s', got '%s'", expectedError, err.Error())
}
// Verify only one handler is registered
commands := registry.GetRegisteredCommands()
if len(commands) != 1 || commands[0] != "DUPLICATE_CMD" {
t.Errorf("Expected ['DUPLICATE_CMD'], got %v", commands)
} }
} }
@ -514,10 +535,13 @@ func TestPushNotificationRegistryGlobalAndSpecific(t *testing.T) {
})) }))
// Register specific handler // Register specific handler
registry.RegisterHandler("SPECIFIC_CMD", redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { err := registry.RegisterHandler("SPECIFIC_CMD", redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
specificCalled = true specificCalled = true
return true return true
})) }))
if err != nil {
t.Fatalf("Failed to register specific handler: %v", err)
}
// Test with specific command // Test with specific command
ctx := context.Background() ctx := context.Background()
@ -602,7 +626,10 @@ func TestPushNotificationProcessorConvenienceMethods(t *testing.T) {
return true return true
}) })
processor.RegisterHandler("CONV_CMD", handler) err := processor.RegisterHandler("CONV_CMD", handler)
if err != nil {
t.Fatalf("Failed to register handler: %v", err)
}
// Test RegisterGlobalHandler convenience method // Test RegisterGlobalHandler convenience method
globalHandlerCalled := false globalHandlerCalled := false
@ -615,10 +642,13 @@ func TestPushNotificationProcessorConvenienceMethods(t *testing.T) {
// Test RegisterHandlerFunc convenience method // Test RegisterHandlerFunc convenience method
funcHandlerCalled := false funcHandlerCalled := false
processor.RegisterHandlerFunc("FUNC_CMD", func(ctx context.Context, notification []interface{}) bool { err = processor.RegisterHandlerFunc("FUNC_CMD", func(ctx context.Context, notification []interface{}) bool {
funcHandlerCalled = true funcHandlerCalled = true
return true return true
}) })
if err != nil {
t.Fatalf("Failed to register func handler: %v", err)
}
// Test RegisterGlobalHandlerFunc convenience method // Test RegisterGlobalHandlerFunc convenience method
globalFuncHandlerCalled := false globalFuncHandlerCalled := false
@ -669,18 +699,24 @@ func TestClientPushNotificationEdgeCases(t *testing.T) {
}) })
defer client.Close() defer client.Close()
// These should not panic even when processor is nil // These should not panic even when processor is nil and should return nil error
client.RegisterPushNotificationHandler("TEST", redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { err := client.RegisterPushNotificationHandler("TEST", redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
return true return true
})) }))
if err != nil {
t.Errorf("Expected nil error when processor is nil, got: %v", err)
}
client.RegisterGlobalPushNotificationHandler(redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { client.RegisterGlobalPushNotificationHandler(redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
return true return true
})) }))
client.RegisterPushNotificationHandlerFunc("TEST_FUNC", func(ctx context.Context, notification []interface{}) bool { err = client.RegisterPushNotificationHandlerFunc("TEST_FUNC", func(ctx context.Context, notification []interface{}) bool {
return true return true
}) })
if err != nil {
t.Errorf("Expected nil error when processor is nil, got: %v", err)
}
client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
return true return true
@ -821,7 +857,7 @@ func TestPushNotificationRegistryConcurrency(t *testing.T) {
defer func() { done <- true }() defer func() { done <- true }()
for j := 0; j < numOperations; j++ { for j := 0; j < numOperations; j++ {
// Register handler // Register handler (ignore errors in concurrency test)
command := fmt.Sprintf("CMD_%d_%d", id, j) command := fmt.Sprintf("CMD_%d_%d", id, j)
registry.RegisterHandler(command, redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { registry.RegisterHandler(command, redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
return true return true
@ -876,7 +912,7 @@ func TestPushNotificationProcessorConcurrency(t *testing.T) {
defer func() { done <- true }() defer func() { done <- true }()
for j := 0; j < numOperations; j++ { for j := 0; j < numOperations; j++ {
// Register handlers // Register handlers (ignore errors in concurrency test)
command := fmt.Sprintf("PROC_CMD_%d_%d", id, j) command := fmt.Sprintf("PROC_CMD_%d_%d", id, j)
processor.RegisterHandlerFunc(command, func(ctx context.Context, notification []interface{}) bool { processor.RegisterHandlerFunc(command, func(ctx context.Context, notification []interface{}) bool {
return true return true
@ -930,7 +966,7 @@ func TestPushNotificationClientConcurrency(t *testing.T) {
defer func() { done <- true }() defer func() { done <- true }()
for j := 0; j < numOperations; j++ { for j := 0; j < numOperations; j++ {
// Register handlers concurrently // Register handlers concurrently (ignore errors in concurrency test)
command := fmt.Sprintf("CLIENT_CMD_%d_%d", id, j) command := fmt.Sprintf("CLIENT_CMD_%d_%d", id, j)
client.RegisterPushNotificationHandlerFunc(command, func(ctx context.Context, notification []interface{}) bool { client.RegisterPushNotificationHandlerFunc(command, func(ctx context.Context, notification []interface{}) bool {
return true return true

View File

@ -814,10 +814,12 @@ func (c *Client) initializePushProcessor() {
} }
// RegisterPushNotificationHandler registers a handler for a specific push notification command. // RegisterPushNotificationHandler registers a handler for a specific push notification command.
func (c *Client) RegisterPushNotificationHandler(command string, handler PushNotificationHandler) { // Returns an error if a handler is already registered for this command.
func (c *Client) RegisterPushNotificationHandler(command string, handler PushNotificationHandler) error {
if c.pushProcessor != nil { if c.pushProcessor != nil {
c.pushProcessor.RegisterHandler(command, handler) return c.pushProcessor.RegisterHandler(command, handler)
} }
return nil
} }
// RegisterGlobalPushNotificationHandler registers a handler that will receive all push notifications. // RegisterGlobalPushNotificationHandler registers a handler that will receive all push notifications.
@ -828,10 +830,12 @@ func (c *Client) RegisterGlobalPushNotificationHandler(handler PushNotificationH
} }
// RegisterPushNotificationHandlerFunc registers a function as a handler for a specific push notification command. // RegisterPushNotificationHandlerFunc registers a function as a handler for a specific push notification command.
func (c *Client) RegisterPushNotificationHandlerFunc(command string, handlerFunc func(ctx context.Context, notification []interface{}) bool) { // Returns an error if a handler is already registered for this command.
func (c *Client) RegisterPushNotificationHandlerFunc(command string, handlerFunc func(ctx context.Context, notification []interface{}) bool) error {
if c.pushProcessor != nil { if c.pushProcessor != nil {
c.pushProcessor.RegisterHandlerFunc(command, handlerFunc) return c.pushProcessor.RegisterHandlerFunc(command, handlerFunc)
} }
return nil
} }
// RegisterGlobalPushNotificationHandlerFunc registers a function as a global handler for all push notifications. // RegisterGlobalPushNotificationHandlerFunc registers a function as a global handler for all push notifications.