diff --git a/example/push-notification-demo/main.go b/example/push-notification-demo/main.go index b3b6804a..9c845aee 100644 --- a/example/push-notification-demo/main.go +++ b/example/push-notification-demo/main.go @@ -18,8 +18,8 @@ func main() { // Example 2: Custom push notification handlers customHandlersExample() - // Example 3: Global push notification handlers - globalHandlersExample() + // Example 3: Multiple specific handlers + multipleSpecificHandlersExample() // Example 4: Custom push notifications customPushNotificationExample() @@ -95,8 +95,8 @@ func customHandlersExample() { fmt.Println(" - SYSTEM_ALERT: Handles system alert notifications") } -func globalHandlersExample() { - fmt.Println("\n=== Global Push Notification Handler Example ===") +func multipleSpecificHandlersExample() { + fmt.Println("\n=== Multiple Specific Handlers Example ===") client := redis.NewClient(&redis.Options{ Addr: "localhost:6379", @@ -105,25 +105,21 @@ func globalHandlersExample() { }) defer client.Close() - // Register a global handler that receives ALL push notifications - client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { - if len(notification) > 0 { - command := notification[0] - fmt.Printf("📡 Global handler received: %v (args: %d)\n", command, len(notification)-1) - } - return true - }) - - // Register specific handlers as well + // Register specific handlers client.RegisterPushNotificationHandlerFunc("SPECIFIC_EVENT", func(ctx context.Context, notification []interface{}) bool { fmt.Printf("🎯 Specific handler for SPECIFIC_EVENT: %v\n", notification) return true }) - fmt.Println("✅ Global and specific handlers registered:") - fmt.Println(" - Global handler will receive ALL push notifications") - fmt.Println(" - Specific handler will receive only SPECIFIC_EVENT notifications") - fmt.Println(" - Both handlers will be called for SPECIFIC_EVENT notifications") + client.RegisterPushNotificationHandlerFunc("ANOTHER_EVENT", func(ctx context.Context, notification []interface{}) bool { + fmt.Printf("🎯 Specific handler for ANOTHER_EVENT: %v\n", notification) + return true + }) + + fmt.Println("✅ Specific handlers registered:") + fmt.Println(" - SPECIFIC_EVENT handler will receive only SPECIFIC_EVENT notifications") + fmt.Println(" - ANOTHER_EVENT handler will receive only ANOTHER_EVENT notifications") + fmt.Println(" - Each notification type has a single dedicated handler") } func customPushNotificationExample() { @@ -143,24 +139,9 @@ func customPushNotificationExample() { return true }) - // Register a global handler to monitor all notifications - client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { - if len(notification) > 0 { - command := notification[0] - switch command { - case "MOVING", "MIGRATING", "MIGRATED": - fmt.Printf("🔄 Cluster notification: %v\n", command) - default: - fmt.Printf("📨 Other notification: %v\n", command) - } - } - return true - }) - fmt.Println("✅ Custom push notifications enabled:") - fmt.Println(" - MOVING, MIGRATING, MIGRATED notifications → Cluster handlers") fmt.Println(" - APPLICATION_EVENT notifications → Custom handler") - fmt.Println(" - All notifications → Global monitoring handler") + fmt.Println(" - Each notification type has a single dedicated handler") } func multipleNotificationTypesExample() { diff --git a/options.go b/options.go index f2fb13fd..02c1cb94 100644 --- a/options.go +++ b/options.go @@ -221,7 +221,11 @@ type Options struct { // When enabled, the client will process RESP3 push notifications and // route them to registered handlers. // - // default: false + // For RESP3 connections (Protocol: 3), push notifications are automatically enabled. + // To disable push notifications for RESP3, use Protocol: 2 instead. + // For RESP2 connections, push notifications are not available. + // + // default: automatically enabled for RESP3, disabled for RESP2 PushNotifications bool // PushNotificationProcessor is the processor for handling push notifications. diff --git a/push_notifications.go b/push_notifications.go index cc1bae90..ec251ed2 100644 --- a/push_notifications.go +++ b/push_notifications.go @@ -28,14 +28,12 @@ func (f PushNotificationHandlerFunc) HandlePushNotification(ctx context.Context, type PushNotificationRegistry struct { mu sync.RWMutex handlers map[string]PushNotificationHandler // command -> single handler - global []PushNotificationHandler // global handlers for all notifications } // NewPushNotificationRegistry creates a new push notification registry. func NewPushNotificationRegistry() *PushNotificationRegistry { return &PushNotificationRegistry{ handlers: make(map[string]PushNotificationHandler), - global: make([]PushNotificationHandler, 0), } } @@ -52,14 +50,6 @@ func (r *PushNotificationRegistry) RegisterHandler(command string, handler PushN return nil } -// RegisterGlobalHandler registers a handler that will receive all push notifications. -func (r *PushNotificationRegistry) RegisterGlobalHandler(handler PushNotificationHandler) { - r.mu.Lock() - defer r.mu.Unlock() - - r.global = append(r.global, handler) -} - // UnregisterHandler removes the handler for a specific push notification command. func (r *PushNotificationRegistry) UnregisterHandler(command string) { r.mu.Lock() @@ -68,7 +58,7 @@ func (r *PushNotificationRegistry) UnregisterHandler(command string) { delete(r.handlers, command) } -// HandleNotification processes a push notification by calling all registered handlers. +// HandleNotification processes a push notification by calling the registered handler. func (r *PushNotificationRegistry) HandleNotification(ctx context.Context, notification []interface{}) bool { if len(notification) == 0 { return false @@ -83,23 +73,12 @@ func (r *PushNotificationRegistry) HandleNotification(ctx context.Context, notif r.mu.RLock() defer r.mu.RUnlock() - handled := false - - // Call global handlers first - for _, handler := range r.global { - if handler.HandlePushNotification(ctx, notification) { - handled = true - } - } - // Call specific handler if handler, exists := r.handlers[command]; exists { - if handler.HandlePushNotification(ctx, notification) { - handled = true - } + return handler.HandlePushNotification(ctx, notification) } - return handled + return false } // GetRegisteredCommands returns a list of commands that have registered handlers. @@ -114,12 +93,12 @@ func (r *PushNotificationRegistry) GetRegisteredCommands() []string { return commands } -// HasHandlers returns true if there are any handlers registered (global or specific). +// HasHandlers returns true if there are any handlers registered. func (r *PushNotificationRegistry) HasHandlers() bool { r.mu.RLock() defer r.mu.RUnlock() - return len(r.global) > 0 || len(r.handlers) > 0 + return len(r.handlers) > 0 } // PushNotificationProcessor handles the processing of push notifications from Redis. @@ -206,22 +185,12 @@ func (p *PushNotificationProcessor) RegisterHandler(command string, handler Push return p.registry.RegisterHandler(command, handler) } -// RegisterGlobalHandler is a convenience method to register a global handler. -func (p *PushNotificationProcessor) RegisterGlobalHandler(handler PushNotificationHandler) { - p.registry.RegisterGlobalHandler(handler) -} - // RegisterHandlerFunc is a convenience method to register a function as a handler. // Returns an error if a handler is already registered for this command. func (p *PushNotificationProcessor) RegisterHandlerFunc(command string, handlerFunc func(ctx context.Context, notification []interface{}) bool) error { return p.registry.RegisterHandler(command, PushNotificationHandlerFunc(handlerFunc)) } -// RegisterGlobalHandlerFunc is a convenience method to register a function as a global handler. -func (p *PushNotificationProcessor) RegisterGlobalHandlerFunc(handlerFunc func(ctx context.Context, notification []interface{}) bool) { - p.registry.RegisterGlobalHandler(PushNotificationHandlerFunc(handlerFunc)) -} - // Common push notification commands const ( // Redis Cluster notifications diff --git a/push_notifications_test.go b/push_notifications_test.go index 2f868584..46f8b089 100644 --- a/push_notifications_test.go +++ b/push_notifications_test.go @@ -56,34 +56,6 @@ func TestPushNotificationRegistry(t *testing.T) { t.Error("Handler should have been called") } - // Test global handler - globalHandlerCalled := false - globalHandler := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { - globalHandlerCalled = true - return true - }) - - registry.RegisterGlobalHandler(globalHandler) - - // Reset flags - handlerCalled = false - globalHandlerCalled = false - - // Handle notification again - handled = registry.HandleNotification(ctx, notification) - - if !handled { - t.Error("Notification should have been handled") - } - - if !handlerCalled { - t.Error("Specific handler should have been called") - } - - if !globalHandlerCalled { - t.Error("Global handler should have been called") - } - // Test duplicate handler registration error duplicateHandler := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { return true @@ -124,13 +96,6 @@ func TestPushNotificationProcessor(t *testing.T) { t.Fatalf("Failed to register handler: %v", err) } - // Test global handler - globalHandlerCalled := false - processor.RegisterGlobalHandlerFunc(func(ctx context.Context, notification []interface{}) bool { - globalHandlerCalled = true - return true - }) - // Simulate handling a notification ctx := context.Background() notification := []interface{}{"CUSTOM_NOTIFICATION", "data"} @@ -144,10 +109,6 @@ func TestPushNotificationProcessor(t *testing.T) { t.Error("Specific handler should have been called") } - if !globalHandlerCalled { - t.Error("Global handler should have been called") - } - // Test disabling processor processor.SetEnabled(false) if processor.IsEnabled() { @@ -184,13 +145,6 @@ func TestClientPushNotificationIntegration(t *testing.T) { t.Fatalf("Failed to register handler: %v", err) } - // Test global handler through client - globalHandlerCalled := false - client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { - globalHandlerCalled = true - return true - }) - // Simulate notification handling ctx := context.Background() notification := []interface{}{"CUSTOM_EVENT", "test_data"} @@ -203,10 +157,6 @@ func TestClientPushNotificationIntegration(t *testing.T) { if !handlerCalled { t.Error("Custom handler should have been called") } - - if !globalHandlerCalled { - t.Error("Global handler should have been called") - } } func TestClientWithoutPushNotifications(t *testing.T) { @@ -224,13 +174,12 @@ func TestClientWithoutPushNotifications(t *testing.T) { } // Registering handlers should not panic - client.RegisterPushNotificationHandlerFunc("TEST", func(ctx context.Context, notification []interface{}) bool { - return true - }) - - client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { + err := client.RegisterPushNotificationHandlerFunc("TEST", func(ctx context.Context, notification []interface{}) bool { return true }) + if err != nil { + t.Errorf("Expected nil error when processor is nil, got: %v", err) + } } func TestPushNotificationEnabledClient(t *testing.T) { @@ -522,18 +471,11 @@ func TestPushNotificationRegistryDuplicateHandlerError(t *testing.T) { } } -func TestPushNotificationRegistryGlobalAndSpecific(t *testing.T) { +func TestPushNotificationRegistrySpecificHandlerOnly(t *testing.T) { registry := redis.NewPushNotificationRegistry() - globalCalled := false specificCalled := false - // Register global handler - registry.RegisterGlobalHandler(redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { - globalCalled = true - return true - })) - // Register specific handler err := registry.RegisterHandler("SPECIFIC_CMD", redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { specificCalled = true @@ -552,28 +494,19 @@ func TestPushNotificationRegistryGlobalAndSpecific(t *testing.T) { t.Error("Notification should be handled") } - if !globalCalled { - t.Error("Global handler should be called") - } - if !specificCalled { t.Error("Specific handler should be called") } - // Reset flags - globalCalled = false + // Reset flag specificCalled = false - // Test with non-specific command + // Test with non-specific command - should not be handled notification = []interface{}{"OTHER_CMD", "data"} handled = registry.HandleNotification(ctx, notification) - if !handled { - t.Error("Notification should be handled by global handler") - } - - if !globalCalled { - t.Error("Global handler should be called for any command") + if handled { + t.Error("Notification should not be handled without specific handler") } if specificCalled { @@ -631,15 +564,6 @@ func TestPushNotificationProcessorConvenienceMethods(t *testing.T) { t.Fatalf("Failed to register handler: %v", err) } - // Test RegisterGlobalHandler convenience method - globalHandlerCalled := false - globalHandler := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { - globalHandlerCalled = true - return true - }) - - processor.RegisterGlobalHandler(globalHandler) - // Test RegisterHandlerFunc convenience method funcHandlerCalled := false err = processor.RegisterHandlerFunc("FUNC_CMD", func(ctx context.Context, notification []interface{}) bool { @@ -650,14 +574,7 @@ func TestPushNotificationProcessorConvenienceMethods(t *testing.T) { t.Fatalf("Failed to register func handler: %v", err) } - // Test RegisterGlobalHandlerFunc convenience method - globalFuncHandlerCalled := false - processor.RegisterGlobalHandlerFunc(func(ctx context.Context, notification []interface{}) bool { - globalFuncHandlerCalled = true - return true - }) - - // Test that all handlers work + // Test that handlers work ctx := context.Background() // Test specific handler @@ -668,15 +585,13 @@ func TestPushNotificationProcessorConvenienceMethods(t *testing.T) { t.Error("Notification should be handled") } - if !handlerCalled || !globalHandlerCalled || !globalFuncHandlerCalled { - t.Error("Handler, global handler, and global func handler should all be called") + if !handlerCalled { + t.Error("Handler should be called") } // Reset flags handlerCalled = false - globalHandlerCalled = false funcHandlerCalled = false - globalFuncHandlerCalled = false // Test func handler notification = []interface{}{"FUNC_CMD", "data"} @@ -686,8 +601,8 @@ func TestPushNotificationProcessorConvenienceMethods(t *testing.T) { t.Error("Notification should be handled") } - if !funcHandlerCalled || !globalHandlerCalled || !globalFuncHandlerCalled { - t.Error("Func handler, global handler, and global func handler should all be called") + if !funcHandlerCalled { + t.Error("Func handler should be called") } } @@ -707,10 +622,6 @@ func TestClientPushNotificationEdgeCases(t *testing.T) { t.Errorf("Expected nil error when processor is nil, got: %v", err) } - client.RegisterGlobalPushNotificationHandler(redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { - return true - })) - err = client.RegisterPushNotificationHandlerFunc("TEST_FUNC", func(ctx context.Context, notification []interface{}) bool { return true }) @@ -718,10 +629,6 @@ func TestClientPushNotificationEdgeCases(t *testing.T) { t.Errorf("Expected nil error when processor is nil, got: %v", err) } - client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { - return true - }) - // GetPushNotificationProcessor should return nil processor := client.GetPushNotificationProcessor() if processor != nil { @@ -867,13 +774,6 @@ func TestPushNotificationRegistryConcurrency(t *testing.T) { notification := []interface{}{command, "data"} registry.HandleNotification(context.Background(), notification) - // Register global handler occasionally - if j%10 == 0 { - registry.RegisterGlobalHandler(redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { - return true - })) - } - // Check registry state registry.HasHandlers() registry.GetRegisteredCommands() @@ -972,13 +872,6 @@ func TestPushNotificationClientConcurrency(t *testing.T) { return true }) - // Register global handlers occasionally - if j%5 == 0 { - client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool { - return true - }) - } - // Access processor processor := client.GetPushNotificationProcessor() if processor != nil { diff --git a/redis.go b/redis.go index c7a6701e..0f6f8051 100644 --- a/redis.go +++ b/redis.go @@ -755,6 +755,12 @@ func NewClient(opt *Options) *Client { } opt.init() + // Enable push notifications by default for RESP3 + // Only override if no custom processor is provided + if opt.Protocol == 3 && opt.PushNotificationProcessor == nil { + opt.PushNotifications = true + } + c := Client{ baseClient: &baseClient{ opt: opt, @@ -803,13 +809,12 @@ func (c *Client) Options() *Options { // initializePushProcessor initializes the push notification processor. func (c *Client) initializePushProcessor() { - // Initialize push processor if enabled - if c.opt.PushNotifications { - if c.opt.PushNotificationProcessor != nil { - c.pushProcessor = c.opt.PushNotificationProcessor - } else { - c.pushProcessor = NewPushNotificationProcessor(true) - } + // Always use custom processor if provided + if c.opt.PushNotificationProcessor != nil { + c.pushProcessor = c.opt.PushNotificationProcessor + } else if c.opt.PushNotifications { + // Create default processor only if push notifications are enabled + c.pushProcessor = NewPushNotificationProcessor(true) } } @@ -822,13 +827,6 @@ func (c *Client) RegisterPushNotificationHandler(command string, handler PushNot return nil } -// RegisterGlobalPushNotificationHandler registers a handler that will receive all push notifications. -func (c *Client) RegisterGlobalPushNotificationHandler(handler PushNotificationHandler) { - if c.pushProcessor != nil { - c.pushProcessor.RegisterGlobalHandler(handler) - } -} - // RegisterPushNotificationHandlerFunc registers a function as a handler for a specific push notification command. // Returns an error if a handler is already registered for this command. func (c *Client) RegisterPushNotificationHandlerFunc(command string, handlerFunc func(ctx context.Context, notification []interface{}) bool) error { @@ -838,13 +836,6 @@ func (c *Client) RegisterPushNotificationHandlerFunc(command string, handlerFunc return nil } -// RegisterGlobalPushNotificationHandlerFunc registers a function as a global handler for all push notifications. -func (c *Client) RegisterGlobalPushNotificationHandlerFunc(handlerFunc func(ctx context.Context, notification []interface{}) bool) { - if c.pushProcessor != nil { - c.pushProcessor.RegisterGlobalHandlerFunc(handlerFunc) - } -} - // GetPushNotificationProcessor returns the push notification processor. func (c *Client) GetPushNotificationProcessor() *PushNotificationProcessor { return c.pushProcessor