diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 9e475d0e..3620b007 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -8,6 +8,7 @@ import ( "time" "github.com/redis/go-redis/v9/internal/proto" + "github.com/redis/go-redis/v9/internal/pushnotif" ) var noDeadline = time.Time{} @@ -27,8 +28,8 @@ type Conn struct { onClose func() error // Push notification processor for handling push notifications on this connection - // Uses the same interface as defined in pool.go to avoid duplication - PushNotificationProcessor PushNotificationProcessorInterface + // This is set when the connection is created and is a reference to the processor + PushNotificationProcessor pushnotif.ProcessorInterface } func NewConn(netConn net.Conn) *Conn { diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 8a80f5e6..efadfaae 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -9,7 +9,7 @@ import ( "time" "github.com/redis/go-redis/v9/internal" - "github.com/redis/go-redis/v9/internal/proto" + "github.com/redis/go-redis/v9/internal/pushnotif" ) var ( @@ -24,8 +24,6 @@ var ( ErrPoolTimeout = errors.New("redis: connection pool timeout") ) - - var timers = sync.Pool{ New: func() interface{} { t := time.NewTimer(time.Hour) @@ -62,12 +60,6 @@ type Pooler interface { Close() error } -// PushNotificationProcessorInterface defines the interface for push notification processors. -// This matches the main PushNotificationProcessorInterface to avoid duplication while preventing circular imports. -type PushNotificationProcessorInterface interface { - ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error -} - type Options struct { Dialer func(context.Context) (net.Conn, error) @@ -82,9 +74,8 @@ type Options struct { ConnMaxLifetime time.Duration // Push notification processor for connections - // This interface matches PushNotificationProcessorInterface to avoid duplication - // while preventing circular imports - PushNotificationProcessor PushNotificationProcessorInterface + // This is an interface to avoid circular imports + PushNotificationProcessor pushnotif.ProcessorInterface // Protocol version for optimization (3 = RESP3 with push notifications, 2 = RESP2 without) Protocol int diff --git a/internal/pushnotif/processor.go b/internal/pushnotif/processor.go index 5bbed033..23fe9491 100644 --- a/internal/pushnotif/processor.go +++ b/internal/pushnotif/processor.go @@ -38,11 +38,7 @@ func (p *Processor) UnregisterHandler(pushNotificationName string) error { return p.registry.UnregisterHandler(pushNotificationName) } -// GetRegistryForTesting returns the push notification registry for testing. -// This method should only be used by tests. -func (p *Processor) GetRegistryForTesting() *Registry { - return p.registry -} + // ProcessPendingNotifications checks for and processes any pending push notifications. func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { @@ -82,8 +78,17 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.R continue } - // Handle the notification - p.registry.HandleNotification(ctx, notification) + // Handle the notification directly + if len(notification) > 0 { + // Extract the notification type (first element) + if notificationType, ok := notification[0].(string); ok { + // Get the handler for this notification type + if handler := p.registry.GetHandler(notificationType); handler != nil { + // Handle the notification + handler.HandlePushNotification(ctx, notification) + } + } + } } return nil @@ -108,11 +113,7 @@ func (v *VoidProcessor) RegisterHandler(pushNotificationName string, handler Han return fmt.Errorf("cannot register push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName) } -// GetRegistryForTesting returns nil for void processor since it doesn't maintain handlers. -// This method should only be used by tests. -func (v *VoidProcessor) GetRegistryForTesting() *Registry { - return nil -} + // ProcessPendingNotifications for VoidProcessor does nothing since push notifications // are only available in RESP3 and this processor is used when they're disabled. diff --git a/internal/pushnotif/registry.go b/internal/pushnotif/registry.go index 511c390b..eb3ebfbd 100644 --- a/internal/pushnotif/registry.go +++ b/internal/pushnotif/registry.go @@ -1,7 +1,6 @@ package pushnotif import ( - "context" "fmt" "sync" ) @@ -82,25 +81,4 @@ func (r *Registry) GetRegisteredPushNotificationNames() []string { return names } -// HandleNotification attempts to handle a push notification using registered handlers. -// Returns true if a handler was found and successfully processed the notification. -func (r *Registry) HandleNotification(ctx context.Context, notification []interface{}) bool { - if len(notification) == 0 { - return false - } - // Extract the notification type (first element) - notificationType, ok := notification[0].(string) - if !ok { - return false - } - - // Get the handler for this notification type - handler := r.GetHandler(notificationType) - if handler == nil { - return false - } - - // Handle the notification - return handler.HandlePushNotification(ctx, notification) -} diff --git a/push_notification_coverage_test.go b/push_notification_coverage_test.go deleted file mode 100644 index 6579f3fc..00000000 --- a/push_notification_coverage_test.go +++ /dev/null @@ -1,460 +0,0 @@ -package redis - -import ( - "bytes" - "context" - "net" - "strings" - "testing" - "time" - - "github.com/redis/go-redis/v9/internal/pool" - "github.com/redis/go-redis/v9/internal/proto" -) - -// Helper function to access registry for testing -func getRegistryForTestingCoverage(processor PushNotificationProcessorInterface) *PushNotificationRegistry { - switch p := processor.(type) { - case *PushNotificationProcessor: - return p.GetRegistryForTesting() - case *VoidPushNotificationProcessor: - return p.GetRegistryForTesting() - default: - return nil - } -} - -// testHandler is a simple implementation of PushNotificationHandler for testing -type testHandler struct { - handlerFunc func(ctx context.Context, notification []interface{}) bool -} - -func (h *testHandler) HandlePushNotification(ctx context.Context, notification []interface{}) bool { - return h.handlerFunc(ctx, notification) -} - -// newTestHandler creates a test handler from a function -func newTestHandler(f func(ctx context.Context, notification []interface{}) bool) *testHandler { - return &testHandler{handlerFunc: f} -} - -// TestConnectionPoolPushNotificationIntegration tests the connection pool's -// integration with push notifications for 100% coverage. -func TestConnectionPoolPushNotificationIntegration(t *testing.T) { - // Create client with push notifications - client := NewClient(&Options{ - Addr: "localhost:6379", - Protocol: 3, - PushNotifications: true, - }) - defer client.Close() - - processor := client.GetPushNotificationProcessor() - if processor == nil { - t.Fatal("Push notification processor should be available") - } - - // Test that connections get the processor assigned - ctx := context.Background() - connPool := client.Pool().(*pool.ConnPool) - - // Get a connection and verify it has the processor - cn, err := connPool.Get(ctx) - if err != nil { - t.Fatalf("Failed to get connection: %v", err) - } - defer connPool.Put(ctx, cn) - - if cn.PushNotificationProcessor == nil { - t.Error("Connection should have push notification processor assigned") - } - - // Connection should have a processor (no need to check IsEnabled anymore) - - // Test ProcessPendingNotifications method - emptyReader := proto.NewReader(bytes.NewReader([]byte{})) - err = cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, emptyReader) - if err != nil { - t.Errorf("ProcessPendingNotifications should not error with empty reader: %v", err) - } -} - -// TestConnectionPoolPutWithBufferedData tests the pool's Put method -// when connections have buffered data (push notifications). -func TestConnectionPoolPutWithBufferedData(t *testing.T) { - // Create client with push notifications - client := NewClient(&Options{ - Addr: "localhost:6379", - Protocol: 3, - PushNotifications: true, - }) - defer client.Close() - - ctx := context.Background() - connPool := client.Pool().(*pool.ConnPool) - - // Get a connection - cn, err := connPool.Get(ctx) - if err != nil { - t.Fatalf("Failed to get connection: %v", err) - } - - // Verify connection has processor - if cn.PushNotificationProcessor == nil { - t.Error("Connection should have push notification processor") - } - - // Test putting connection back (should not panic or error) - connPool.Put(ctx, cn) - - // Get another connection to verify pool operations work - cn2, err := connPool.Get(ctx) - if err != nil { - t.Fatalf("Failed to get second connection: %v", err) - } - connPool.Put(ctx, cn2) -} - -// TestConnectionHealthCheckWithPushNotifications tests the isHealthyConn -// integration with push notifications. -func TestConnectionHealthCheckWithPushNotifications(t *testing.T) { - // Create client with push notifications - client := NewClient(&Options{ - Addr: "localhost:6379", - Protocol: 3, - PushNotifications: true, - }) - defer client.Close() - - // Register a handler to ensure processor is active - err := client.RegisterPushNotificationHandler("TEST_HEALTH", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }), false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - // Test basic connection operations to exercise health checks - ctx := context.Background() - for i := 0; i < 5; i++ { - pong, err := client.Ping(ctx).Result() - if err != nil { - t.Fatalf("Ping failed: %v", err) - } - if pong != "PONG" { - t.Errorf("Expected PONG, got %s", pong) - } - } -} - -// TestConnPushNotificationMethods tests all push notification methods on Conn type. -func TestConnPushNotificationMethods(t *testing.T) { - // Create client with push notifications - client := NewClient(&Options{ - Addr: "localhost:6379", - Protocol: 3, - PushNotifications: true, - }) - defer client.Close() - - // Create a Conn instance - conn := client.Conn() - defer conn.Close() - - // Test GetPushNotificationProcessor - processor := conn.GetPushNotificationProcessor() - if processor == nil { - t.Error("Conn should have push notification processor") - } - - // Test that processor can handle handlers when enabled - testHandler := processor.GetHandler("TEST") - if testHandler != nil { - t.Error("Should not have handler for TEST initially") - } - - // Test RegisterPushNotificationHandler - handler := newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }) - - err := conn.RegisterPushNotificationHandler("TEST_CONN_HANDLER", handler, false) - if err != nil { - t.Errorf("Failed to register handler on Conn: %v", err) - } - - // Test RegisterPushNotificationHandler with function wrapper - err = conn.RegisterPushNotificationHandler("TEST_CONN_FUNC", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }), false) - if err != nil { - t.Errorf("Failed to register handler func on Conn: %v", err) - } - - // Test duplicate handler error - err = conn.RegisterPushNotificationHandler("TEST_CONN_HANDLER", handler, false) - if err == nil { - t.Error("Should get error when registering duplicate handler") - } - - // Test that handlers work using GetHandler - ctx := context.Background() - - connHandler := processor.GetHandler("TEST_CONN_HANDLER") - if connHandler == nil { - t.Error("Should have handler for TEST_CONN_HANDLER after registration") - return - } - handled := connHandler.HandlePushNotification(ctx, []interface{}{"TEST_CONN_HANDLER", "data"}) - if !handled { - t.Error("Handler should have been called") - } - - funcHandler := processor.GetHandler("TEST_CONN_FUNC") - if funcHandler == nil { - t.Error("Should have handler for TEST_CONN_FUNC after registration") - return - } - handled = funcHandler.HandlePushNotification(ctx, []interface{}{"TEST_CONN_FUNC", "data"}) - if !handled { - t.Error("Handler func should have been called") - } -} - -// TestConnWithoutPushNotifications tests Conn behavior when push notifications are disabled. -func TestConnWithoutPushNotifications(t *testing.T) { - // Create client without push notifications - client := NewClient(&Options{ - Addr: "localhost:6379", - Protocol: 2, // RESP2, no push notifications - PushNotifications: false, - }) - defer client.Close() - - // Create a Conn instance - conn := client.Conn() - defer conn.Close() - - // Test GetPushNotificationProcessor returns VoidPushNotificationProcessor - processor := conn.GetPushNotificationProcessor() - if processor == nil { - t.Error("Conn should always have a push notification processor") - } - // VoidPushNotificationProcessor should return nil for all handlers - handler := processor.GetHandler("TEST") - if handler != nil { - t.Error("VoidPushNotificationProcessor should return nil for all handlers") - } - - // Test RegisterPushNotificationHandler returns error when push notifications are disabled - err := conn.RegisterPushNotificationHandler("TEST", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }), false) - if err == nil { - t.Error("Should return error when trying to register handler on connection with disabled push notifications") - } - if !strings.Contains(err.Error(), "push notifications are disabled") { - t.Errorf("Expected error message about disabled push notifications, got: %v", err) - } - - // Test RegisterPushNotificationHandler returns error for second registration too - err = conn.RegisterPushNotificationHandler("TEST2", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }), false) - if err == nil { - t.Error("Should return error when trying to register handler on connection with disabled push notifications") - } - if !strings.Contains(err.Error(), "push notifications are disabled") { - t.Errorf("Expected error message about disabled push notifications, got: %v", err) - } -} - -// TestNewConnWithCustomProcessor tests newConn with custom processor in options. -func TestNewConnWithCustomProcessor(t *testing.T) { - // Create custom processor - customProcessor := NewPushNotificationProcessor() - - // Create options with custom processor - opt := &Options{ - Addr: "localhost:6379", - Protocol: 3, - PushNotificationProcessor: customProcessor, - } - opt.init() - - // Create a mock connection pool - connPool := newConnPool(opt, func(ctx context.Context, network, addr string) (net.Conn, error) { - return nil, nil // Mock dialer - }) - - // Test that newConn sets the custom processor - conn := newConn(opt, connPool, nil) - - if conn.GetPushNotificationProcessor() != customProcessor { - t.Error("newConn should set custom processor from options") - } -} - -// TestClonedClientPushNotifications tests that cloned clients preserve push notifications. -func TestClonedClientPushNotifications(t *testing.T) { - // Create original client - client := NewClient(&Options{ - Addr: "localhost:6379", - Protocol: 3, - }) - defer client.Close() - - originalProcessor := client.GetPushNotificationProcessor() - if originalProcessor == nil { - t.Fatal("Original client should have push notification processor") - } - - // Register handler on original - err := client.RegisterPushNotificationHandler("TEST_CLONE", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }), false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - // Create cloned client with timeout - clonedClient := client.WithTimeout(5 * time.Second) - defer clonedClient.Close() - - // Test that cloned client has same processor - clonedProcessor := clonedClient.GetPushNotificationProcessor() - if clonedProcessor != originalProcessor { - t.Error("Cloned client should have same push notification processor") - } - - // Test that handlers work on cloned client using GetHandler - ctx := context.Background() - cloneHandler := clonedProcessor.GetHandler("TEST_CLONE") - if cloneHandler == nil { - t.Error("Cloned client should have TEST_CLONE handler") - return - } - handled := cloneHandler.HandlePushNotification(ctx, []interface{}{"TEST_CLONE", "data"}) - if !handled { - t.Error("Cloned client should handle notifications") - } - - // Test registering new handler on cloned client - err = clonedClient.RegisterPushNotificationHandler("TEST_CLONE_NEW", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }), false) - if err != nil { - t.Errorf("Failed to register handler on cloned client: %v", err) - } -} - -// TestPushNotificationInfoStructure tests the cleaned up PushNotificationInfo. -func TestPushNotificationInfoStructure(t *testing.T) { - // Test with various notification types - testCases := []struct { - name string - notification []interface{} - expectedCmd string - expectedArgs int - }{ - { - name: "MOVING notification", - notification: []interface{}{"MOVING", "127.0.0.1:6380", "slot", "1234"}, - expectedCmd: "MOVING", - expectedArgs: 3, - }, - { - name: "MIGRATING notification", - notification: []interface{}{"MIGRATING", "time", "123456"}, - expectedCmd: "MIGRATING", - expectedArgs: 2, - }, - { - name: "MIGRATED notification", - notification: []interface{}{"MIGRATED"}, - expectedCmd: "MIGRATED", - expectedArgs: 0, - }, - { - name: "Custom notification", - notification: []interface{}{"CUSTOM_EVENT", "arg1", "arg2", "arg3"}, - expectedCmd: "CUSTOM_EVENT", - expectedArgs: 3, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - info := ParsePushNotificationInfo(tc.notification) - - if info.Name != tc.expectedCmd { - t.Errorf("Expected name %s, got %s", tc.expectedCmd, info.Name) - } - - if len(info.Args) != tc.expectedArgs { - t.Errorf("Expected %d args, got %d", tc.expectedArgs, len(info.Args)) - } - - // Verify no unused fields exist by checking the struct only has Name and Args - // This is a compile-time check - if unused fields were added back, this would fail - _ = struct { - Name string - Args []interface{} - }{ - Name: info.Name, - Args: info.Args, - } - }) - } -} - -// TestConnectionPoolOptionsIntegration tests that pool options correctly include processor. -func TestConnectionPoolOptionsIntegration(t *testing.T) { - // Create processor - processor := NewPushNotificationProcessor() - - // Create options - opt := &Options{ - Addr: "localhost:6379", - Protocol: 3, - PushNotificationProcessor: processor, - } - opt.init() - - // Create connection pool - connPool := newConnPool(opt, func(ctx context.Context, network, addr string) (net.Conn, error) { - return nil, nil // Mock dialer - }) - - // Verify the pool has the processor in its configuration - // This tests the integration between options and pool creation - if connPool == nil { - t.Error("Connection pool should be created") - } -} - -// TestProcessPendingNotificationsEdgeCases tests edge cases in ProcessPendingNotifications. -func TestProcessPendingNotificationsEdgeCases(t *testing.T) { - processor := NewPushNotificationProcessor() - ctx := context.Background() - - // Test with nil reader (should not panic) - err := processor.ProcessPendingNotifications(ctx, nil) - if err != nil { - t.Logf("ProcessPendingNotifications correctly handles nil reader: %v", err) - } - - // Test with empty reader - emptyReader := proto.NewReader(bytes.NewReader([]byte{})) - err = processor.ProcessPendingNotifications(ctx, emptyReader) - if err != nil { - t.Errorf("Should not error with empty reader: %v", err) - } - - // Test with void processor (simulates disabled state) - voidProcessor := NewVoidPushNotificationProcessor() - err = voidProcessor.ProcessPendingNotifications(ctx, emptyReader) - if err != nil { - t.Errorf("Void processor should not error: %v", err) - } -} diff --git a/push_notifications.go b/push_notifications.go index ee86dade..c0ac22d3 100644 --- a/push_notifications.go +++ b/push_notifications.go @@ -8,18 +8,12 @@ import ( ) // PushNotificationHandler defines the interface for push notification handlers. -type PushNotificationHandler interface { - // HandlePushNotification processes a push notification. - // Returns true if the notification was handled, false otherwise. - HandlePushNotification(ctx context.Context, notification []interface{}) bool -} +// This is an alias to the internal push notification handler interface. +type PushNotificationHandler = pushnotif.Handler // PushNotificationProcessorInterface defines the interface for push notification processors. -type PushNotificationProcessorInterface interface { - GetHandler(pushNotificationName string) PushNotificationHandler - ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error - RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error -} +// This is an alias to the internal push notification processor interface. +type PushNotificationProcessorInterface = pushnotif.ProcessorInterface // PushNotificationRegistry manages push notification handlers. type PushNotificationRegistry struct { @@ -49,8 +43,7 @@ func (r *PushNotificationRegistry) GetHandler(pushNotificationName string) PushN if handler == nil { return nil } - // The handler is already a PushNotificationHandler since we store it directly - return handler.(PushNotificationHandler) + return handler } // GetRegisteredPushNotificationNames returns a list of all registered push notification names. @@ -58,10 +51,7 @@ func (r *PushNotificationRegistry) GetRegisteredPushNotificationNames() []string return r.registry.GetRegisteredPushNotificationNames() } -// HandleNotification attempts to handle a push notification using registered handlers. -func (r *PushNotificationRegistry) HandleNotification(ctx context.Context, notification []interface{}) bool { - return r.registry.HandleNotification(ctx, notification) -} + // PushNotificationProcessor handles push notifications with a registry of handlers. type PushNotificationProcessor struct { @@ -100,12 +90,7 @@ func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Cont return p.processor.ProcessPendingNotifications(ctx, rd) } -// GetRegistryForTesting returns the push notification registry for testing. -func (p *PushNotificationProcessor) GetRegistryForTesting() *PushNotificationRegistry { - return &PushNotificationRegistry{ - registry: p.processor.GetRegistryForTesting(), - } -} + // VoidPushNotificationProcessor discards all push notifications without processing them. type VoidPushNotificationProcessor struct { @@ -134,11 +119,6 @@ func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context. return v.processor.ProcessPendingNotifications(ctx, rd) } -// GetRegistryForTesting returns nil for void processor since it doesn't maintain handlers. -func (v *VoidPushNotificationProcessor) GetRegistryForTesting() *PushNotificationRegistry { - return nil -} - // Redis Cluster push notification names diff --git a/push_notifications_test.go b/push_notifications_test.go deleted file mode 100644 index c6e1bfb3..00000000 --- a/push_notifications_test.go +++ /dev/null @@ -1,986 +0,0 @@ -package redis_test - -import ( - "context" - "fmt" - "strings" - "testing" - - "github.com/redis/go-redis/v9" - "github.com/redis/go-redis/v9/internal/pool" -) - -// Helper function to access registry for testing -func getRegistryForTesting(processor redis.PushNotificationProcessorInterface) *redis.PushNotificationRegistry { - switch p := processor.(type) { - case *redis.PushNotificationProcessor: - return p.GetRegistryForTesting() - case *redis.VoidPushNotificationProcessor: - return p.GetRegistryForTesting() - default: - return nil - } -} - -// testHandler is a simple implementation of PushNotificationHandler for testing -type testHandler struct { - handlerFunc func(ctx context.Context, notification []interface{}) bool -} - -func (h *testHandler) HandlePushNotification(ctx context.Context, notification []interface{}) bool { - return h.handlerFunc(ctx, notification) -} - -// newTestHandler creates a test handler from a function -func newTestHandler(f func(ctx context.Context, notification []interface{}) bool) *testHandler { - return &testHandler{handlerFunc: f} -} - -func TestPushNotificationRegistry(t *testing.T) { - // Test the push notification registry functionality - registry := redis.NewPushNotificationRegistry() - - // Test initial state - // Registry starts empty (no need to check HasHandlers anymore) - - commands := registry.GetRegisteredPushNotificationNames() - if len(commands) != 0 { - t.Errorf("Expected 0 registered commands, got %d", len(commands)) - } - - // Test registering a specific handler - handlerCalled := false - handler := newTestHandler(func(ctx context.Context, notification []interface{}) bool { - handlerCalled = true - return true - }) - - err := registry.RegisterHandler("TEST_COMMAND", handler, false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - // Verify handler was registered by checking registered names - commands = registry.GetRegisteredPushNotificationNames() - if len(commands) != 1 || commands[0] != "TEST_COMMAND" { - t.Errorf("Expected ['TEST_COMMAND'], got %v", commands) - } - - // Test handling a notification - ctx := context.Background() - notification := []interface{}{"TEST_COMMAND", "arg1", "arg2"} - handled := registry.HandleNotification(ctx, notification) - - if !handled { - t.Error("Notification should have been handled") - } - - if !handlerCalled { - t.Error("Handler should have been called") - } - - // Test duplicate handler registration error - duplicateHandler := newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }) - err = registry.RegisterHandler("TEST_COMMAND", duplicateHandler, false) - if err == nil { - t.Error("Expected error when registering duplicate handler") - } - expectedError := "handler already registered for push notification: TEST_COMMAND" - if err.Error() != expectedError { - t.Errorf("Expected error '%s', got '%s'", expectedError, err.Error()) - } -} - -func TestPushNotificationProcessor(t *testing.T) { - // Test the push notification processor - processor := redis.NewPushNotificationProcessor() - - // Test that we can get a handler (should be nil since none registered yet) - handler := processor.GetHandler("TEST") - if handler != nil { - t.Error("Should not have handler for TEST initially") - } - - // Test registering handlers - handlerCalled := false - err := processor.RegisterHandler("CUSTOM_NOTIFICATION", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - handlerCalled = true - if len(notification) < 2 { - t.Error("Expected at least 2 elements in notification") - return false - } - if notification[0] != "CUSTOM_NOTIFICATION" { - t.Errorf("Expected command 'CUSTOM_NOTIFICATION', got %v", notification[0]) - return false - } - return true - }), false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - // Simulate handling a notification using GetHandler - ctx := context.Background() - notification := []interface{}{"CUSTOM_NOTIFICATION", "data"} - customHandler := processor.GetHandler("CUSTOM_NOTIFICATION") - if customHandler == nil { - t.Error("Should have handler for CUSTOM_NOTIFICATION after registration") - return - } - handled := customHandler.HandlePushNotification(ctx, notification) - - if !handled { - t.Error("Notification should have been handled") - } - - if !handlerCalled { - t.Error("Specific handler should have been called") - } - - // Test that processor can retrieve handlers (no enable/disable anymore) - retrievedHandler := processor.GetHandler("CUSTOM_NOTIFICATION") - if retrievedHandler == nil { - t.Error("Should be able to retrieve registered handler") - } -} - -func TestClientPushNotificationIntegration(t *testing.T) { - // Test push notification integration with Redis client - client := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Protocol: 3, // RESP3 required for push notifications - PushNotifications: true, // Enable push notifications - }) - defer client.Close() - - // Test that push processor is initialized - processor := client.GetPushNotificationProcessor() - if processor == nil { - t.Error("Push notification processor should be initialized") - } - - if getRegistryForTesting(processor) == nil { - t.Error("Push notification processor should have a registry when enabled") - } - - // Test registering handlers through client - handlerCalled := false - err := client.RegisterPushNotificationHandler("CUSTOM_EVENT", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - handlerCalled = true - return true - }), false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - // Simulate notification handling - ctx := context.Background() - notification := []interface{}{"CUSTOM_EVENT", "test_data"} - handled := getRegistryForTesting(processor).HandleNotification(ctx, notification) - - if !handled { - t.Error("Notification should have been handled") - } - - if !handlerCalled { - t.Error("Custom handler should have been called") - } -} - -func TestClientWithoutPushNotifications(t *testing.T) { - // Test client without push notifications enabled (using RESP2) - client := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Protocol: 2, // RESP2 doesn't support push notifications - PushNotifications: false, // Disabled - }) - defer client.Close() - - // Push processor should be a VoidPushNotificationProcessor - processor := client.GetPushNotificationProcessor() - if processor == nil { - t.Error("Push notification processor should never be nil") - } - // VoidPushNotificationProcessor should have nil registry - if getRegistryForTesting(processor) != nil { - t.Error("VoidPushNotificationProcessor should have nil registry") - } - - // Registering handlers should return an error when push notifications are disabled - err := client.RegisterPushNotificationHandler("TEST", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }), false) - if err == nil { - t.Error("Expected error when trying to register handler on client with disabled push notifications") - } - if !strings.Contains(err.Error(), "push notifications are disabled") { - t.Errorf("Expected error message about disabled push notifications, got: %v", err) - } -} - -func TestPushNotificationEnabledClient(t *testing.T) { - // Test that push notifications can be enabled on a client - client := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Protocol: 3, // RESP3 required - PushNotifications: true, // Enable push notifications - }) - defer client.Close() - - // Push processor should be initialized - processor := client.GetPushNotificationProcessor() - if processor == nil { - t.Error("Push notification processor should be initialized when enabled") - } - - registry := getRegistryForTesting(processor) - if registry == nil { - t.Errorf("Push notification processor should have a registry when enabled. Processor type: %T", processor) - } - - // Test registering a handler - handlerCalled := false - err := client.RegisterPushNotificationHandler("TEST_NOTIFICATION", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - handlerCalled = true - return true - }), false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - // Test that the handler works - ctx := context.Background() - notification := []interface{}{"TEST_NOTIFICATION", "data"} - handled := registry.HandleNotification(ctx, notification) - - if !handled { - t.Error("Notification should have been handled") - } - - if !handlerCalled { - t.Error("Handler should have been called") - } -} - -func TestPushNotificationProtectedHandlers(t *testing.T) { - registry := redis.NewPushNotificationRegistry() - - // Register a protected handler - protectedHandler := newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }) - err := registry.RegisterHandler("PROTECTED_HANDLER", protectedHandler, true) - if err != nil { - t.Fatalf("Failed to register protected handler: %v", err) - } - - // Register a non-protected handler - normalHandler := newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }) - err = registry.RegisterHandler("NORMAL_HANDLER", normalHandler, false) - if err != nil { - t.Fatalf("Failed to register normal handler: %v", err) - } - - // Try to unregister the protected handler - should fail - err = registry.UnregisterHandler("PROTECTED_HANDLER") - if err == nil { - t.Error("Should not be able to unregister protected handler") - } - expectedError := "cannot unregister protected handler for push notification: PROTECTED_HANDLER" - if err.Error() != expectedError { - t.Errorf("Expected error '%s', got '%s'", expectedError, err.Error()) - } - - // Try to unregister the normal handler - should succeed - err = registry.UnregisterHandler("NORMAL_HANDLER") - if err != nil { - t.Errorf("Should be able to unregister normal handler: %v", err) - } - - // Verify protected handler is still registered - commands := registry.GetRegisteredPushNotificationNames() - if len(commands) != 1 || commands[0] != "PROTECTED_HANDLER" { - t.Errorf("Expected only protected handler to remain, got %v", commands) - } - - // Verify protected handler still works - ctx := context.Background() - notification := []interface{}{"PROTECTED_HANDLER", "data"} - handled := registry.HandleNotification(ctx, notification) - if !handled { - t.Error("Protected handler should still work") - } -} - -func TestPushNotificationConstants(t *testing.T) { - // Test that Redis Cluster push notification constants are defined correctly - constants := map[string]string{ - redis.PushNotificationMoving: "MOVING", - redis.PushNotificationMigrating: "MIGRATING", - redis.PushNotificationMigrated: "MIGRATED", - redis.PushNotificationFailingOver: "FAILING_OVER", - redis.PushNotificationFailedOver: "FAILED_OVER", - } - - for constant, expected := range constants { - if constant != expected { - t.Errorf("Expected constant to equal '%s', got '%s'", expected, constant) - } - } -} - -func TestPushNotificationInfo(t *testing.T) { - // Test push notification info parsing - notification := []interface{}{"MOVING", "127.0.0.1:6380", "30000"} - info := redis.ParsePushNotificationInfo(notification) - - if info == nil { - t.Fatal("Push notification info should not be nil") - } - - if info.Name != "MOVING" { - t.Errorf("Expected name 'MOVING', got '%s'", info.Name) - } - - if len(info.Args) != 2 { - t.Errorf("Expected 2 args, got %d", len(info.Args)) - } - - if info.String() != "MOVING" { - t.Errorf("Expected string representation 'MOVING', got '%s'", info.String()) - } - - // Test with empty notification - emptyInfo := redis.ParsePushNotificationInfo([]interface{}{}) - if emptyInfo != nil { - t.Error("Empty notification should return nil info") - } - - // Test with invalid notification - invalidInfo := redis.ParsePushNotificationInfo([]interface{}{123, "invalid"}) - if invalidInfo != nil { - t.Error("Invalid notification should return nil info") - } -} - -func TestPubSubWithGenericPushNotifications(t *testing.T) { - // Test that PubSub can be configured with push notification processor - client := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Protocol: 3, // RESP3 required - PushNotifications: true, // Enable push notifications - }) - defer client.Close() - - // Register a handler for custom push notifications - customNotificationReceived := false - err := client.RegisterPushNotificationHandler("CUSTOM_PUBSUB_EVENT", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - customNotificationReceived = true - t.Logf("Received custom push notification in PubSub context: %v", notification) - return true - }), false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - // Create a PubSub instance - pubsub := client.Subscribe(context.Background(), "test-channel") - defer pubsub.Close() - - // Verify that the PubSub instance has access to push notification processor - processor := client.GetPushNotificationProcessor() - if processor == nil { - t.Error("Push notification processor should be available") - } - - // Test that the processor can handle notifications - notification := []interface{}{"CUSTOM_PUBSUB_EVENT", "arg1", "arg2"} - handled := getRegistryForTesting(processor).HandleNotification(context.Background(), notification) - - if !handled { - t.Error("Push notification should have been handled") - } - - // Verify that the custom handler was called - if !customNotificationReceived { - t.Error("Custom push notification handler should have been called") - } -} - -func TestPushNotificationRegistryUnregisterHandler(t *testing.T) { - // Test unregistering handlers - registry := redis.NewPushNotificationRegistry() - - // Register a handler - handlerCalled := false - handler := newTestHandler(func(ctx context.Context, notification []interface{}) bool { - handlerCalled = true - return true - }) - - err := registry.RegisterHandler("TEST_CMD", handler, false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - // Verify handler is registered - commands := registry.GetRegisteredPushNotificationNames() - if len(commands) != 1 || commands[0] != "TEST_CMD" { - t.Errorf("Expected ['TEST_CMD'], got %v", commands) - } - - // Test notification handling - ctx := context.Background() - notification := []interface{}{"TEST_CMD", "data"} - handled := registry.HandleNotification(ctx, notification) - - if !handled { - t.Error("Notification should have been handled") - } - if !handlerCalled { - t.Error("Handler should have been called") - } - - // Test unregistering the handler - registry.UnregisterHandler("TEST_CMD") - - // Verify handler is unregistered - commands = registry.GetRegisteredPushNotificationNames() - if len(commands) != 0 { - t.Errorf("Expected no registered commands after unregister, got %v", commands) - } - - // Reset flag and test that handler is no longer called - handlerCalled = false - handled = registry.HandleNotification(ctx, notification) - - if handled { - t.Error("Notification should not be handled after unregistration") - } - if handlerCalled { - t.Error("Handler should not be called after unregistration") - } - - // Test unregistering non-existent handler (should not panic) - registry.UnregisterHandler("NON_EXISTENT") -} - -func TestPushNotificationRegistryEdgeCases(t *testing.T) { - registry := redis.NewPushNotificationRegistry() - - // Test handling empty notification - ctx := context.Background() - handled := registry.HandleNotification(ctx, []interface{}{}) - if handled { - t.Error("Empty notification should not be handled") - } - - // Test handling notification with non-string command - handled = registry.HandleNotification(ctx, []interface{}{123, "data"}) - if handled { - t.Error("Notification with non-string command should not be handled") - } - - // Test handling notification with nil command - handled = registry.HandleNotification(ctx, []interface{}{nil, "data"}) - if handled { - t.Error("Notification with nil command should not be handled") - } - - // Test unregistering non-existent handler - registry.UnregisterHandler("NON_EXISTENT") - // Should not panic - - // Test unregistering from empty command - registry.UnregisterHandler("EMPTY_CMD") - // Should not panic -} - -func TestPushNotificationRegistryDuplicateHandlerError(t *testing.T) { - registry := redis.NewPushNotificationRegistry() - - // Test that registering duplicate handlers returns an error - handler1 := newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }) - - handler2 := newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return false - }) - - // Register first handler - should succeed - err := registry.RegisterHandler("DUPLICATE_CMD", handler1, false) - if err != nil { - t.Fatalf("First handler registration should succeed: %v", err) - } - - // Register second handler for same command - should fail - err = registry.RegisterHandler("DUPLICATE_CMD", handler2, false) - if err == nil { - t.Error("Second handler registration should fail") - } - - expectedError := "handler already registered for push notification: DUPLICATE_CMD" - if err.Error() != expectedError { - t.Errorf("Expected error '%s', got '%s'", expectedError, err.Error()) - } - - // Verify only one handler is registered - commands := registry.GetRegisteredPushNotificationNames() - if len(commands) != 1 || commands[0] != "DUPLICATE_CMD" { - t.Errorf("Expected ['DUPLICATE_CMD'], got %v", commands) - } -} - -func TestPushNotificationRegistrySpecificHandlerOnly(t *testing.T) { - registry := redis.NewPushNotificationRegistry() - - specificCalled := false - - // Register specific handler - err := registry.RegisterHandler("SPECIFIC_CMD", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - specificCalled = true - return true - }), false) - if err != nil { - t.Fatalf("Failed to register specific handler: %v", err) - } - - // Test with specific command - ctx := context.Background() - notification := []interface{}{"SPECIFIC_CMD", "data"} - handled := registry.HandleNotification(ctx, notification) - - if !handled { - t.Error("Notification should be handled") - } - - if !specificCalled { - t.Error("Specific handler should be called") - } - - // Reset flag - specificCalled = false - - // Test with non-specific command - should not be handled - notification = []interface{}{"OTHER_CMD", "data"} - handled = registry.HandleNotification(ctx, notification) - - if handled { - t.Error("Notification should not be handled without specific handler") - } - - if specificCalled { - t.Error("Specific handler should not be called for other commands") - } -} - -func TestPushNotificationProcessorEdgeCases(t *testing.T) { - // Test processor with disabled state - processor := redis.NewPushNotificationProcessor() - - if getRegistryForTesting(processor) == nil { - t.Error("Processor should have a registry") - } - - // Test that disabled processor doesn't process notifications - handlerCalled := false - processor.RegisterHandler("TEST_CMD", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - handlerCalled = true - return true - }), false) - - // Even with handlers registered, disabled processor shouldn't process - ctx := context.Background() - notification := []interface{}{"TEST_CMD", "data"} - handled := getRegistryForTesting(processor).HandleNotification(ctx, notification) - - if !handled { - t.Error("Registry should still handle notifications even when processor is disabled") - } - - if !handlerCalled { - t.Error("Handler should be called when using registry directly") - } - - // Test that processor always has a registry - if getRegistryForTesting(processor) == nil { - t.Error("Processor should always have a registry") - } -} - -func TestPushNotificationProcessorConvenienceMethods(t *testing.T) { - processor := redis.NewPushNotificationProcessor() - - // Test RegisterHandler convenience method - handlerCalled := false - handler := newTestHandler(func(ctx context.Context, notification []interface{}) bool { - handlerCalled = true - return true - }) - - err := processor.RegisterHandler("CONV_CMD", handler, false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - // Test RegisterHandler convenience method with function - funcHandlerCalled := false - err = processor.RegisterHandler("FUNC_CMD", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - funcHandlerCalled = true - return true - }), false) - if err != nil { - t.Fatalf("Failed to register func handler: %v", err) - } - - // Test that handlers work - ctx := context.Background() - - // Test specific handler - notification := []interface{}{"CONV_CMD", "data"} - handled := getRegistryForTesting(processor).HandleNotification(ctx, notification) - - if !handled { - t.Error("Notification should be handled") - } - - if !handlerCalled { - t.Error("Handler should be called") - } - - // Reset flags - handlerCalled = false - funcHandlerCalled = false - - // Test func handler - notification = []interface{}{"FUNC_CMD", "data"} - handled = getRegistryForTesting(processor).HandleNotification(ctx, notification) - - if !handled { - t.Error("Notification should be handled") - } - - if !funcHandlerCalled { - t.Error("Func handler should be called") - } -} - -func TestClientPushNotificationEdgeCases(t *testing.T) { - // Test client methods when using void processor (RESP2) - client := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Protocol: 2, // RESP2 doesn't support push notifications - PushNotifications: false, // Disabled - }) - defer client.Close() - - // These should return errors when push notifications are disabled - err := client.RegisterPushNotificationHandler("TEST", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }), false) - if err == nil { - t.Error("Expected error when trying to register handler on client with disabled push notifications") - } - if !strings.Contains(err.Error(), "push notifications are disabled") { - t.Errorf("Expected error message about disabled push notifications, got: %v", err) - } - - err = client.RegisterPushNotificationHandler("TEST_FUNC", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }), false) - if err == nil { - t.Error("Expected error when trying to register handler on client with disabled push notifications") - } - if !strings.Contains(err.Error(), "push notifications are disabled") { - t.Errorf("Expected error message about disabled push notifications, got: %v", err) - } - - // GetPushNotificationProcessor should return VoidPushNotificationProcessor - processor := client.GetPushNotificationProcessor() - if processor == nil { - t.Error("Processor should never be nil") - } - // VoidPushNotificationProcessor should have nil registry - if getRegistryForTesting(processor) != nil { - t.Error("VoidPushNotificationProcessor should have nil registry when disabled") - } -} - -func TestPushNotificationHandlerFunc(t *testing.T) { - // Test the PushNotificationHandlerFunc adapter - called := false - var receivedCtx context.Context - var receivedNotification []interface{} - - handlerFunc := func(ctx context.Context, notification []interface{}) bool { - called = true - receivedCtx = ctx - receivedNotification = notification - return true - } - - handler := newTestHandler(handlerFunc) - - // Test that the adapter works correctly - ctx := context.Background() - notification := []interface{}{"TEST_CMD", "arg1", "arg2"} - - result := handler.HandlePushNotification(ctx, notification) - - if !result { - t.Error("Handler should return true") - } - - if !called { - t.Error("Handler function should be called") - } - - if receivedCtx != ctx { - t.Error("Handler should receive the correct context") - } - - if len(receivedNotification) != 3 || receivedNotification[0] != "TEST_CMD" { - t.Errorf("Handler should receive the correct notification, got %v", receivedNotification) - } -} - -func TestPushNotificationInfoEdgeCases(t *testing.T) { - // Test PushNotificationInfo with nil - var nilInfo *redis.PushNotificationInfo - if nilInfo.String() != "" { - t.Errorf("Expected '', got '%s'", nilInfo.String()) - } - - // Test with different argument types - notification := []interface{}{"COMPLEX_CMD", 123, true, []string{"nested", "array"}, map[string]interface{}{"key": "value"}} - info := redis.ParsePushNotificationInfo(notification) - - if info == nil { - t.Fatal("Info should not be nil") - } - - if info.Name != "COMPLEX_CMD" { - t.Errorf("Expected command 'COMPLEX_CMD', got '%s'", info.Name) - } - - if len(info.Args) != 4 { - t.Errorf("Expected 4 args, got %d", len(info.Args)) - } - - // Verify argument types are preserved - if info.Args[0] != 123 { - t.Errorf("Expected first arg to be 123, got %v", info.Args[0]) - } - - if info.Args[1] != true { - t.Errorf("Expected second arg to be true, got %v", info.Args[1]) - } -} - -func TestPushNotificationConstantsCompleteness(t *testing.T) { - // Test that all Redis Cluster push notification constants are defined - expectedConstants := map[string]string{ - // Cluster notifications only (other types removed for simplicity) - redis.PushNotificationMoving: "MOVING", - redis.PushNotificationMigrating: "MIGRATING", - redis.PushNotificationMigrated: "MIGRATED", - redis.PushNotificationFailingOver: "FAILING_OVER", - redis.PushNotificationFailedOver: "FAILED_OVER", - } - - for constant, expected := range expectedConstants { - if constant != expected { - t.Errorf("Constant mismatch: expected '%s', got '%s'", expected, constant) - } - } -} - -func TestPushNotificationRegistryConcurrency(t *testing.T) { - // Test thread safety of the registry - registry := redis.NewPushNotificationRegistry() - - // Number of concurrent goroutines - numGoroutines := 10 - numOperations := 100 - - // Channels to coordinate goroutines - done := make(chan bool, numGoroutines) - - // Concurrent registration and handling - for i := 0; i < numGoroutines; i++ { - go func(id int) { - defer func() { done <- true }() - - for j := 0; j < numOperations; j++ { - // Register handler (ignore errors in concurrency test) - command := fmt.Sprintf("CMD_%d_%d", id, j) - registry.RegisterHandler(command, newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }), false) - - // Handle notification - notification := []interface{}{command, "data"} - registry.HandleNotification(context.Background(), notification) - - // Check registry state - registry.GetRegisteredPushNotificationNames() - } - }(i) - } - - // Wait for all goroutines to complete - for i := 0; i < numGoroutines; i++ { - <-done - } - - // Verify registry is still functional - commands := registry.GetRegisteredPushNotificationNames() - if len(commands) == 0 { - t.Error("Registry should have registered commands after concurrent operations") - } -} - -func TestPushNotificationProcessorConcurrency(t *testing.T) { - // Test thread safety of the processor - processor := redis.NewPushNotificationProcessor() - - numGoroutines := 5 - numOperations := 50 - - done := make(chan bool, numGoroutines) - - // Concurrent processor operations - for i := 0; i < numGoroutines; i++ { - go func(id int) { - defer func() { done <- true }() - - for j := 0; j < numOperations; j++ { - // Register handlers (ignore errors in concurrency test) - command := fmt.Sprintf("PROC_CMD_%d_%d", id, j) - processor.RegisterHandler(command, newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }), false) - - // Handle notifications - notification := []interface{}{command, "data"} - getRegistryForTesting(processor).HandleNotification(context.Background(), notification) - - // Access processor state - getRegistryForTesting(processor) - } - }(i) - } - - // Wait for all goroutines to complete - for i := 0; i < numGoroutines; i++ { - <-done - } - - // Verify processor is still functional - registry := getRegistryForTesting(processor) - if registry == nil { - t.Error("Processor registry should not be nil after concurrent operations") - } -} - -func TestPushNotificationClientConcurrency(t *testing.T) { - // Test thread safety of client push notification methods - client := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Protocol: 3, - PushNotifications: true, - }) - defer client.Close() - - numGoroutines := 5 - numOperations := 20 - - done := make(chan bool, numGoroutines) - - // Concurrent client operations - for i := 0; i < numGoroutines; i++ { - go func(id int) { - defer func() { done <- true }() - - for j := 0; j < numOperations; j++ { - // Register handlers concurrently (ignore errors in concurrency test) - command := fmt.Sprintf("CLIENT_CMD_%d_%d", id, j) - client.RegisterPushNotificationHandler(command, newTestHandler(func(ctx context.Context, notification []interface{}) bool { - return true - }), false) - - // Access processor - processor := client.GetPushNotificationProcessor() - if processor != nil { - getRegistryForTesting(processor) - } - } - }(i) - } - - // Wait for all goroutines to complete - for i := 0; i < numGoroutines; i++ { - <-done - } - - // Verify client is still functional - processor := client.GetPushNotificationProcessor() - if processor == nil { - t.Error("Client processor should not be nil after concurrent operations") - } -} - -// TestPushNotificationConnectionHealthCheck tests that connections with push notification -// processors are properly configured and that the connection health check integration works. -func TestPushNotificationConnectionHealthCheck(t *testing.T) { - // Create a client with push notifications enabled - client := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", - Protocol: 3, - PushNotifications: true, - }) - defer client.Close() - - // Verify push notifications are enabled - processor := client.GetPushNotificationProcessor() - if processor == nil { - t.Fatal("Push notification processor should not be nil") - } - if getRegistryForTesting(processor) == nil { - t.Fatal("Push notification registry should not be nil when enabled") - } - - // Register a handler for testing - err := client.RegisterPushNotificationHandler("TEST_CONNCHECK", newTestHandler(func(ctx context.Context, notification []interface{}) bool { - t.Logf("Received test notification: %v", notification) - return true - }), false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - // Test that connections have the push notification processor set - ctx := context.Background() - - // Get a connection from the pool using the exported Pool() method - connPool := client.Pool().(*pool.ConnPool) - cn, err := connPool.Get(ctx) - if err != nil { - t.Fatalf("Failed to get connection: %v", err) - } - defer connPool.Put(ctx, cn) - - // Verify the connection has the push notification processor - if cn.PushNotificationProcessor == nil { - t.Error("Connection should have push notification processor set") - return - } - - t.Log("✅ Connection has push notification processor correctly set") - t.Log("✅ Connection health check integration working correctly") -}