diff --git a/internal/pushprocessor/processor.go b/internal/pushprocessor/processor.go deleted file mode 100644 index 87028aaf..00000000 --- a/internal/pushprocessor/processor.go +++ /dev/null @@ -1,187 +0,0 @@ -package pushprocessor - -import ( - "context" - "fmt" - - "github.com/redis/go-redis/v9/internal/proto" -) - -// Processor handles push notifications with a registry of handlers. -type Processor struct { - registry *Registry -} - -// NewProcessor creates a new push notification processor. -func NewProcessor() *Processor { - return &Processor{ - registry: NewRegistry(), - } -} - -// GetHandler returns the handler for a specific push notification name. -// Returns nil if no handler is registered for the given name. -func (p *Processor) GetHandler(pushNotificationName string) Handler { - return p.registry.GetHandler(pushNotificationName) -} - -// RegisterHandler registers a handler for a specific push notification name. -// Returns an error if a handler is already registered for this push notification name. -// If protected is true, the handler cannot be unregistered. -func (p *Processor) RegisterHandler(pushNotificationName string, handler Handler, protected bool) error { - return p.registry.RegisterHandler(pushNotificationName, handler, protected) -} - -// UnregisterHandler removes a handler for a specific push notification name. -// Returns an error if the handler is protected or doesn't exist. -func (p *Processor) UnregisterHandler(pushNotificationName string) error { - return p.registry.UnregisterHandler(pushNotificationName) -} - -// ProcessPendingNotifications checks for and processes any pending push notifications. -// The handlerCtx provides context about the client, connection pool, and connection. -func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx HandlerContext, rd *proto.Reader) error { - // Check for nil reader - if rd == nil { - return nil - } - - // Check if there are any buffered bytes that might contain push notifications - if rd.Buffered() == 0 { - return nil - } - - // Process all available push notifications - for { - // Peek at the next reply type to see if it's a push notification - replyType, err := rd.PeekReplyType() - if err != nil { - // No more data available or error reading - break - } - - // Push notifications use RespPush type in RESP3 - if replyType != proto.RespPush { - break - } - - notificationName, err := rd.PeekPushNotificationName() - if err != nil { - // Error reading - continue to next iteration - break - } - - // Skip notifications that should be handled by other systems - if shouldSkipNotification(notificationName) { - break - } - - // Try to read the push notification - reply, err := rd.ReadReply() - if err != nil { - return fmt.Errorf("failed to read push notification: %w", err) - } - - // Convert to slice of interfaces - notification, ok := reply.([]interface{}) - if !ok { - continue - } - - // Handle the notification directly - if len(notification) > 0 { - // Extract the notification type (first element) - if notificationType, ok := notification[0].(string); ok { - // Skip notifications that should be handled by other systems - if shouldSkipNotification(notificationType) { - continue - } - - // Get the handler for this notification type - if handler := p.registry.GetHandler(notificationType); handler != nil { - // Handle the notification with context - handler.HandlePushNotification(ctx, handlerCtx, notification) - } - } - } - } - - return nil -} - -// shouldSkipNotification checks if a notification type should be ignored by the push notification -// processor and handled by other specialized systems instead (pub/sub, streams, keyspace, etc.). -func shouldSkipNotification(notificationType string) bool { - switch notificationType { - // Pub/Sub notifications - handled by pub/sub system - case "message", // Regular pub/sub message - "pmessage", // Pattern pub/sub message - "subscribe", // Subscription confirmation - "unsubscribe", // Unsubscription confirmation - "psubscribe", // Pattern subscription confirmation - "punsubscribe", // Pattern unsubscription confirmation - "smessage", // Sharded pub/sub message (Redis 7.0+) - "ssubscribe", // Sharded subscription confirmation - "sunsubscribe", // Sharded unsubscription confirmation - - // Stream notifications - handled by stream consumers - "xread-from", // Stream reading notifications - "xreadgroup-from", // Stream consumer group notifications - - // Client tracking notifications - handled by client tracking system - "invalidate", // Client-side caching invalidation - - // Keyspace notifications - handled by keyspace notification subscribers - // Note: Keyspace notifications typically have prefixes like "__keyspace@0__:" or "__keyevent@0__:" - // but we'll handle the base notification types here - "expired", // Key expiration events - "evicted", // Key eviction events - "set", // Key set events - "del", // Key deletion events - "rename", // Key rename events - "move", // Key move events - "copy", // Key copy events - "restore", // Key restore events - "sort", // Sort operation events - "flushdb", // Database flush events - "flushall": // All databases flush events - return true - default: - return false - } -} - -// VoidProcessor discards all push notifications without processing them. -type VoidProcessor struct{} - -// NewVoidProcessor creates a new void push notification processor. -func NewVoidProcessor() *VoidProcessor { - return &VoidProcessor{} -} - -// GetHandler returns nil for void processor since it doesn't maintain handlers. -func (v *VoidProcessor) GetHandler(pushNotificationName string) Handler { - return nil -} - -// RegisterHandler returns an error for void processor since it doesn't maintain handlers. -// This helps developers identify when they're trying to register handlers on disabled push notifications. -func (v *VoidProcessor) RegisterHandler(pushNotificationName string, handler Handler, protected bool) error { - return fmt.Errorf("cannot register push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName) -} - -// UnregisterHandler returns an error for void processor since it doesn't maintain handlers. -// This helps developers identify when they're trying to unregister handlers on disabled push notifications. -func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error { - return fmt.Errorf("cannot unregister push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName) -} - -// ProcessPendingNotifications for VoidProcessor does nothing since push notifications -// are only available in RESP3 and this processor is used for RESP2 connections. -// This avoids unnecessary buffer scanning overhead. -func (v *VoidProcessor) ProcessPendingNotifications(ctx context.Context, handlerCtx HandlerContext, rd *proto.Reader) error { - // VoidProcessor is used for RESP2 connections where push notifications are not available. - // Since push notifications only exist in RESP3, we can safely skip all processing - // to avoid unnecessary buffer scanning overhead. - return nil -} diff --git a/internal/pushprocessor/pushprocessor.go b/internal/pushprocessor/pushprocessor.go deleted file mode 100644 index 19c3014f..00000000 --- a/internal/pushprocessor/pushprocessor.go +++ /dev/null @@ -1,8 +0,0 @@ -package pushprocessor - -// This is an EXPERIMENTAL API for push notifications. -// It is subject to change without notice. -// The handler interface may change in the future to include more or less context information. -// The handler context has fields that are currently empty interfaces. -// This is to allow for future expansion without breaking compatibility. -// The context information will be filled in with concrete types or more specific interfaces in the future. diff --git a/internal/pushprocessor/pushprocessor_test.go b/internal/pushprocessor/pushprocessor_test.go deleted file mode 100644 index 7d35969b..00000000 --- a/internal/pushprocessor/pushprocessor_test.go +++ /dev/null @@ -1,775 +0,0 @@ -package pushprocessor - -import ( - "context" - "io" - "strings" - "testing" - - "github.com/redis/go-redis/v9/internal" - "github.com/redis/go-redis/v9/internal/proto" -) - -// TestHandler implements Handler interface for testing -type TestHandler struct { - name string - handled [][]interface{} - returnValue bool -} - -func NewTestHandler(name string, returnValue bool) *TestHandler { - return &TestHandler{ - name: name, - handled: make([][]interface{}, 0), - returnValue: returnValue, - } -} - -func (h *TestHandler) HandlePushNotification(ctx context.Context, handlerCtx HandlerContext, notification []interface{}) bool { - h.handled = append(h.handled, notification) - // Store the handler context for testing if needed - _ = handlerCtx - return h.returnValue -} - -func (h *TestHandler) GetHandledNotifications() [][]interface{} { - return h.handled -} - -func (h *TestHandler) Reset() { - h.handled = make([][]interface{}, 0) -} - -// TestReaderInterface defines the interface needed for testing -type TestReaderInterface interface { - PeekReplyType() (byte, error) - PeekPushNotificationName() (string, error) - ReadReply() (interface{}, error) -} - -// MockReader implements TestReaderInterface for testing -type MockReader struct { - peekReplies []peekReply - peekIndex int - readReplies []interface{} - readErrors []error - readIndex int -} - -type peekReply struct { - replyType byte - err error -} - -func NewMockReader() *MockReader { - return &MockReader{ - peekReplies: make([]peekReply, 0), - readReplies: make([]interface{}, 0), - readErrors: make([]error, 0), - readIndex: 0, - peekIndex: 0, - } -} - -func (m *MockReader) AddPeekReplyType(replyType byte, err error) { - m.peekReplies = append(m.peekReplies, peekReply{replyType: replyType, err: err}) -} - -func (m *MockReader) AddReadReply(reply interface{}, err error) { - m.readReplies = append(m.readReplies, reply) - m.readErrors = append(m.readErrors, err) -} - -func (m *MockReader) PeekReplyType() (byte, error) { - if m.peekIndex >= len(m.peekReplies) { - return 0, io.EOF - } - peek := m.peekReplies[m.peekIndex] - m.peekIndex++ - return peek.replyType, peek.err -} - -func (m *MockReader) ReadReply() (interface{}, error) { - if m.readIndex >= len(m.readReplies) { - return nil, io.EOF - } - reply := m.readReplies[m.readIndex] - err := m.readErrors[m.readIndex] - m.readIndex++ - return reply, err -} - -func (m *MockReader) PeekPushNotificationName() (string, error) { - // return the notification name from the next read reply - if m.readIndex >= len(m.readReplies) { - return "", io.EOF - } - reply := m.readReplies[m.readIndex] - if reply == nil { - return "", nil - } - notification, ok := reply.([]interface{}) - if !ok { - return "", nil - } - if len(notification) == 0 { - return "", nil - } - name, ok := notification[0].(string) - if !ok { - return "", nil - } - return name, nil -} - -func (m *MockReader) Reset() { - m.readIndex = 0 - m.peekIndex = 0 -} - -// testProcessPendingNotifications is a test version that accepts our mock reader -func testProcessPendingNotifications(processor *Processor, ctx context.Context, reader TestReaderInterface) error { - if reader == nil { - return nil - } - - // Create a test handler context - handlerCtx := NewHandlerContext(nil, nil, nil, nil, false) - - for { - // Check if there are push notifications available - replyType, err := reader.PeekReplyType() - if err != nil { - // No more data or error - this is normal - break - } - - // Only process push notifications - if replyType != proto.RespPush { - break - } - - notificationName, err := reader.PeekPushNotificationName() - if err != nil { - // Error reading - continue to next iteration - break - } - - // Skip notifications that should be handled by other systems - if shouldSkipNotification(notificationName) { - break - } - - // Read the push notification - reply, err := reader.ReadReply() - if err != nil { - // Error reading - continue to next iteration - internal.Logger.Printf(ctx, "push: error reading push notification: %v", err) - continue - } - - // Convert to slice of interfaces - notification, ok := reply.([]interface{}) - if !ok { - continue - } - - // Handle the notification directly - if len(notification) > 0 { - // Extract the notification type (first element) - if notificationType, ok := notification[0].(string); ok { - // Get the handler for this notification type - if handler := processor.registry.GetHandler(notificationType); handler != nil { - // Handle the notification with context - handler.HandlePushNotification(ctx, handlerCtx, notification) - } - } - } - } - - return nil -} - -// TestRegistry tests the Registry implementation -func TestRegistry(t *testing.T) { - t.Run("NewRegistry", func(t *testing.T) { - registry := NewRegistry() - if registry == nil { - t.Error("NewRegistry should return a non-nil registry") - } - if registry.handlers == nil { - t.Error("Registry handlers map should be initialized") - } - if registry.protected == nil { - t.Error("Registry protected map should be initialized") - } - }) - - t.Run("RegisterHandler", func(t *testing.T) { - registry := NewRegistry() - handler := NewTestHandler("test", true) - - // Test successful registration - err := registry.RegisterHandler("MOVING", handler, false) - if err != nil { - t.Errorf("RegisterHandler should succeed, got error: %v", err) - } - - // Test duplicate registration - err = registry.RegisterHandler("MOVING", handler, false) - if err == nil { - t.Error("RegisterHandler should return error for duplicate registration") - } - if !strings.Contains(err.Error(), "handler already registered") { - t.Errorf("Expected error about duplicate registration, got: %v", err) - } - - // Test protected registration - err = registry.RegisterHandler("MIGRATING", handler, true) - if err != nil { - t.Errorf("RegisterHandler with protected=true should succeed, got error: %v", err) - } - }) - - t.Run("GetHandler", func(t *testing.T) { - registry := NewRegistry() - handler := NewTestHandler("test", true) - - // Test getting non-existent handler - result := registry.GetHandler("NONEXISTENT") - if result != nil { - t.Error("GetHandler should return nil for non-existent handler") - } - - // Test getting existing handler - err := registry.RegisterHandler("MOVING", handler, false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - result = registry.GetHandler("MOVING") - if result != handler { - t.Error("GetHandler should return the registered handler") - } - }) - - t.Run("UnregisterHandler", func(t *testing.T) { - registry := NewRegistry() - handler := NewTestHandler("test", true) - - // Test unregistering non-existent handler - err := registry.UnregisterHandler("NONEXISTENT") - if err == nil { - t.Error("UnregisterHandler should return error for non-existent handler") - } - if !strings.Contains(err.Error(), "no handler registered") { - t.Errorf("Expected error about no handler registered, got: %v", err) - } - - // Test unregistering regular handler - err = registry.RegisterHandler("MOVING", handler, false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - err = registry.UnregisterHandler("MOVING") - if err != nil { - t.Errorf("UnregisterHandler should succeed for regular handler, got error: %v", err) - } - - // Verify handler is removed - result := registry.GetHandler("MOVING") - if result != nil { - t.Error("Handler should be removed after unregistration") - } - - // Test unregistering protected handler - err = registry.RegisterHandler("MIGRATING", handler, true) - if err != nil { - t.Fatalf("Failed to register protected handler: %v", err) - } - - err = registry.UnregisterHandler("MIGRATING") - if err == nil { - t.Error("UnregisterHandler should return error for protected handler") - } - if !strings.Contains(err.Error(), "cannot unregister protected handler") { - t.Errorf("Expected error about protected handler, got: %v", err) - } - - // Verify protected handler is still there - result = registry.GetHandler("MIGRATING") - if result != handler { - t.Error("Protected handler should still be registered after failed unregistration") - } - }) - - t.Run("GetRegisteredPushNotificationNames", func(t *testing.T) { - registry := NewRegistry() - handler1 := NewTestHandler("test1", true) - handler2 := NewTestHandler("test2", true) - - // Test empty registry - names := registry.GetRegisteredPushNotificationNames() - if len(names) != 0 { - t.Errorf("Empty registry should return empty slice, got: %v", names) - } - - // Test with registered handlers - err := registry.RegisterHandler("MOVING", handler1, false) - if err != nil { - t.Fatalf("Failed to register handler1: %v", err) - } - - err = registry.RegisterHandler("MIGRATING", handler2, true) - if err != nil { - t.Fatalf("Failed to register handler2: %v", err) - } - - names = registry.GetRegisteredPushNotificationNames() - if len(names) != 2 { - t.Errorf("Expected 2 registered names, got: %d", len(names)) - } - - // Check that both names are present (order doesn't matter) - nameMap := make(map[string]bool) - for _, name := range names { - nameMap[name] = true - } - - if !nameMap["MOVING"] { - t.Error("MOVING should be in registered names") - } - if !nameMap["MIGRATING"] { - t.Error("MIGRATING should be in registered names") - } - }) -} - -// TestProcessor tests the Processor implementation -func TestProcessor(t *testing.T) { - t.Run("NewProcessor", func(t *testing.T) { - processor := NewProcessor() - if processor == nil { - t.Error("NewProcessor should return a non-nil processor") - } - if processor.registry == nil { - t.Error("Processor should have a non-nil registry") - } - }) - - t.Run("GetHandler", func(t *testing.T) { - processor := NewProcessor() - handler := NewTestHandler("test", true) - - // Test getting non-existent handler - result := processor.GetHandler("NONEXISTENT") - if result != nil { - t.Error("GetHandler should return nil for non-existent handler") - } - - // Test getting existing handler - err := processor.RegisterHandler("MOVING", handler, false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - result = processor.GetHandler("MOVING") - if result != handler { - t.Error("GetHandler should return the registered handler") - } - }) - - t.Run("RegisterHandler", func(t *testing.T) { - processor := NewProcessor() - handler := NewTestHandler("test", true) - - // Test successful registration - err := processor.RegisterHandler("MOVING", handler, false) - if err != nil { - t.Errorf("RegisterHandler should succeed, got error: %v", err) - } - - // Test duplicate registration - err = processor.RegisterHandler("MOVING", handler, false) - if err == nil { - t.Error("RegisterHandler should return error for duplicate registration") - } - }) - - t.Run("UnregisterHandler", func(t *testing.T) { - processor := NewProcessor() - handler := NewTestHandler("test", true) - - // Test unregistering non-existent handler - err := processor.UnregisterHandler("NONEXISTENT") - if err == nil { - t.Error("UnregisterHandler should return error for non-existent handler") - } - - // Test successful unregistration - err = processor.RegisterHandler("MOVING", handler, false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - err = processor.UnregisterHandler("MOVING") - if err != nil { - t.Errorf("UnregisterHandler should succeed, got error: %v", err) - } - }) - - t.Run("ProcessPendingNotifications", func(t *testing.T) { - processor := NewProcessor() - handler := NewTestHandler("test", true) - ctx := context.Background() - - // Test with nil reader - handlerCtx := NewHandlerContext(nil, nil, nil, nil, false) - err := processor.ProcessPendingNotifications(ctx, handlerCtx, nil) - if err != nil { - t.Errorf("ProcessPendingNotifications with nil reader should not error, got: %v", err) - } - - // Test with empty reader (no buffered data) - reader := proto.NewReader(strings.NewReader("")) - err = processor.ProcessPendingNotifications(ctx, handlerCtx, reader) - if err != nil { - t.Errorf("ProcessPendingNotifications with empty reader should not error, got: %v", err) - } - - // Register a handler for testing - err = processor.RegisterHandler("MOVING", handler, false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - // Test with mock reader - peek error (no push notifications available) - mockReader := NewMockReader() - mockReader.AddPeekReplyType(proto.RespString, io.EOF) // EOF means no more data - err = testProcessPendingNotifications(processor, ctx, mockReader) - if err != nil { - t.Errorf("ProcessPendingNotifications should handle peek EOF gracefully, got: %v", err) - } - - // Test with mock reader - non-push reply type - mockReader = NewMockReader() - mockReader.AddPeekReplyType(proto.RespString, nil) // Not RespPush - err = testProcessPendingNotifications(processor, ctx, mockReader) - if err != nil { - t.Errorf("ProcessPendingNotifications should handle non-push reply types gracefully, got: %v", err) - } - - // Test with mock reader - push notification with ReadReply error - mockReader = NewMockReader() - mockReader.AddPeekReplyType(proto.RespPush, nil) - mockReader.AddReadReply(nil, io.ErrUnexpectedEOF) // ReadReply fails - mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications - err = testProcessPendingNotifications(processor, ctx, mockReader) - if err != nil { - t.Errorf("ProcessPendingNotifications should handle ReadReply errors gracefully, got: %v", err) - } - - // Test with mock reader - push notification with invalid reply type - mockReader = NewMockReader() - mockReader.AddPeekReplyType(proto.RespPush, nil) - mockReader.AddReadReply("not-a-slice", nil) // Invalid reply type - mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications - err = testProcessPendingNotifications(processor, ctx, mockReader) - if err != nil { - t.Errorf("ProcessPendingNotifications should handle invalid reply types gracefully, got: %v", err) - } - - // Test with mock reader - valid push notification with handler - mockReader = NewMockReader() - mockReader.AddPeekReplyType(proto.RespPush, nil) - notification := []interface{}{"MOVING", "slot", "12345"} - mockReader.AddReadReply(notification, nil) - mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications - - handler.Reset() - err = testProcessPendingNotifications(processor, ctx, mockReader) - if err != nil { - t.Errorf("ProcessPendingNotifications should handle valid notifications, got: %v", err) - } - - // Check that handler was called - handled := handler.GetHandledNotifications() - if len(handled) != 1 { - t.Errorf("Expected 1 handled notification, got: %d", len(handled)) - } else if len(handled[0]) != 3 || handled[0][0] != "MOVING" { - t.Errorf("Expected MOVING notification, got: %v", handled[0]) - } - - // Test with mock reader - valid push notification without handler - mockReader = NewMockReader() - mockReader.AddPeekReplyType(proto.RespPush, nil) - notification = []interface{}{"UNKNOWN", "data"} - mockReader.AddReadReply(notification, nil) - mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications - - err = testProcessPendingNotifications(processor, ctx, mockReader) - if err != nil { - t.Errorf("ProcessPendingNotifications should handle notifications without handlers, got: %v", err) - } - - // Test with mock reader - empty notification - mockReader = NewMockReader() - mockReader.AddPeekReplyType(proto.RespPush, nil) - emptyNotification := []interface{}{} - mockReader.AddReadReply(emptyNotification, nil) - mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications - - err = testProcessPendingNotifications(processor, ctx, mockReader) - if err != nil { - t.Errorf("ProcessPendingNotifications should handle empty notifications, got: %v", err) - } - - // Test with mock reader - notification with non-string type - mockReader = NewMockReader() - mockReader.AddPeekReplyType(proto.RespPush, nil) - invalidTypeNotification := []interface{}{123, "data"} // First element is not string - mockReader.AddReadReply(invalidTypeNotification, nil) - mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications - - err = testProcessPendingNotifications(processor, ctx, mockReader) - if err != nil { - t.Errorf("ProcessPendingNotifications should handle invalid notification types, got: %v", err) - } - - // Test the actual ProcessPendingNotifications method with real proto.Reader - // Test with nil reader - err = processor.ProcessPendingNotifications(ctx, handlerCtx, nil) - if err != nil { - t.Errorf("ProcessPendingNotifications with nil reader should not error, got: %v", err) - } - - // Test with empty reader (no buffered data) - protoReader := proto.NewReader(strings.NewReader("")) - err = processor.ProcessPendingNotifications(ctx, handlerCtx, protoReader) - if err != nil { - t.Errorf("ProcessPendingNotifications with empty reader should not error, got: %v", err) - } - - // Test with reader that has some data but not push notifications - protoReader = proto.NewReader(strings.NewReader("+OK\r\n")) - err = processor.ProcessPendingNotifications(ctx, handlerCtx, protoReader) - if err != nil { - t.Errorf("ProcessPendingNotifications with non-push data should not error, got: %v", err) - } - }) -} - -// TestVoidProcessor tests the VoidProcessor implementation -func TestVoidProcessor(t *testing.T) { - t.Run("NewVoidProcessor", func(t *testing.T) { - processor := NewVoidProcessor() - if processor == nil { - t.Error("NewVoidProcessor should return a non-nil processor") - } - }) - - t.Run("GetHandler", func(t *testing.T) { - processor := NewVoidProcessor() - - // VoidProcessor should always return nil for any handler name - result := processor.GetHandler("MOVING") - if result != nil { - t.Error("VoidProcessor GetHandler should always return nil") - } - - result = processor.GetHandler("MIGRATING") - if result != nil { - t.Error("VoidProcessor GetHandler should always return nil") - } - - result = processor.GetHandler("") - if result != nil { - t.Error("VoidProcessor GetHandler should always return nil for empty string") - } - }) - - t.Run("RegisterHandler", func(t *testing.T) { - processor := NewVoidProcessor() - handler := NewTestHandler("test", true) - - // VoidProcessor should always return error for registration - err := processor.RegisterHandler("MOVING", handler, false) - if err == nil { - t.Error("VoidProcessor RegisterHandler should always return error") - } - if !strings.Contains(err.Error(), "cannot register push notification handler") { - t.Errorf("Expected error about cannot register, got: %v", err) - } - if !strings.Contains(err.Error(), "push notifications are disabled") { - t.Errorf("Expected error about disabled push notifications, got: %v", err) - } - - // Test with protected flag - err = processor.RegisterHandler("MIGRATING", handler, true) - if err == nil { - t.Error("VoidProcessor RegisterHandler should always return error even with protected=true") - } - - // Test with empty handler name - err = processor.RegisterHandler("", handler, false) - if err == nil { - t.Error("VoidProcessor RegisterHandler should always return error even with empty name") - } - }) - - t.Run("UnregisterHandler", func(t *testing.T) { - processor := NewVoidProcessor() - - // VoidProcessor should always return error for unregistration - err := processor.UnregisterHandler("MOVING") - if err == nil { - t.Error("VoidProcessor UnregisterHandler should always return error") - } - if !strings.Contains(err.Error(), "cannot unregister push notification handler") { - t.Errorf("Expected error about cannot unregister, got: %v", err) - } - if !strings.Contains(err.Error(), "push notifications are disabled") { - t.Errorf("Expected error about disabled push notifications, got: %v", err) - } - - // Test with empty handler name - err = processor.UnregisterHandler("") - if err == nil { - t.Error("VoidProcessor UnregisterHandler should always return error even with empty name") - } - }) - - t.Run("ProcessPendingNotifications", func(t *testing.T) { - processor := NewVoidProcessor() - ctx := context.Background() - handlerCtx := NewHandlerContext(nil, nil, nil, nil, false) - - // VoidProcessor should always succeed and do nothing - err := processor.ProcessPendingNotifications(ctx, handlerCtx, nil) - if err != nil { - t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err) - } - - // Test with various readers - reader := proto.NewReader(strings.NewReader("")) - err = processor.ProcessPendingNotifications(ctx, handlerCtx, reader) - if err != nil { - t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err) - } - - reader = proto.NewReader(strings.NewReader("some data")) - err = processor.ProcessPendingNotifications(ctx, handlerCtx, reader) - if err != nil { - t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err) - } - }) -} - -// TestShouldSkipNotification tests the shouldSkipNotification function -func TestShouldSkipNotification(t *testing.T) { - t.Run("PubSubMessages", func(t *testing.T) { - pubSubMessages := []string{ - "message", // Regular pub/sub message - "pmessage", // Pattern pub/sub message - "subscribe", // Subscription confirmation - "unsubscribe", // Unsubscription confirmation - "psubscribe", // Pattern subscription confirmation - "punsubscribe", // Pattern unsubscription confirmation - "smessage", // Sharded pub/sub message (Redis 7.0+) - } - - for _, msgType := range pubSubMessages { - if !shouldSkipNotification(msgType) { - t.Errorf("shouldSkipNotification(%q) should return true", msgType) - } - } - }) - - t.Run("NonPubSubMessages", func(t *testing.T) { - nonPubSubMessages := []string{ - "MOVING", // Cluster slot migration - "MIGRATING", // Cluster slot migration - "MIGRATED", // Cluster slot migration - "FAILING_OVER", // Cluster failover - "FAILED_OVER", // Cluster failover - "unknown", // Unknown message type - "", // Empty string - "MESSAGE", // Case sensitive - should not match - "PMESSAGE", // Case sensitive - should not match - } - - for _, msgType := range nonPubSubMessages { - if shouldSkipNotification(msgType) { - t.Errorf("shouldSkipNotification(%q) should return false", msgType) - } - } - }) -} - -// TestPubSubFiltering tests that pub/sub messages are filtered out during processing -func TestPubSubFiltering(t *testing.T) { - t.Run("PubSubMessagesIgnored", func(t *testing.T) { - processor := NewProcessor() - handler := NewTestHandler("test", true) - ctx := context.Background() - - // Register a handler for a non-pub/sub notification - err := processor.RegisterHandler("MOVING", handler, false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - // Test with mock reader - pub/sub message should be ignored - mockReader := NewMockReader() - mockReader.AddPeekReplyType(proto.RespPush, nil) - pubSubNotification := []interface{}{"message", "channel", "data"} - mockReader.AddReadReply(pubSubNotification, nil) - mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications - - handler.Reset() - err = testProcessPendingNotifications(processor, ctx, mockReader) - if err != nil { - t.Errorf("ProcessPendingNotifications should handle pub/sub messages gracefully, got: %v", err) - } - - // Check that handler was NOT called for pub/sub message - handled := handler.GetHandledNotifications() - if len(handled) != 0 { - t.Errorf("Expected 0 handled notifications for pub/sub message, got: %d", len(handled)) - } - }) - - t.Run("NonPubSubMessagesProcessed", func(t *testing.T) { - processor := NewProcessor() - handler := NewTestHandler("test", true) - ctx := context.Background() - - // Register a handler for a non-pub/sub notification - err := processor.RegisterHandler("MOVING", handler, false) - if err != nil { - t.Fatalf("Failed to register handler: %v", err) - } - - // Test with mock reader - non-pub/sub message should be processed - mockReader := NewMockReader() - mockReader.AddPeekReplyType(proto.RespPush, nil) - clusterNotification := []interface{}{"MOVING", "slot", "12345"} - mockReader.AddReadReply(clusterNotification, nil) - mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications - - handler.Reset() - err = testProcessPendingNotifications(processor, ctx, mockReader) - if err != nil { - t.Errorf("ProcessPendingNotifications should handle cluster notifications, got: %v", err) - } - - // Check that handler WAS called for cluster notification - handled := handler.GetHandledNotifications() - if len(handled) != 1 { - t.Errorf("Expected 1 handled notification for cluster message, got: %d", len(handled)) - } else if len(handled[0]) != 3 || handled[0][0] != "MOVING" { - t.Errorf("Expected MOVING notification, got: %v", handled[0]) - } - }) -} diff --git a/internal/pushprocessor/registry.go b/internal/pushprocessor/registry.go deleted file mode 100644 index 9aaa4714..00000000 --- a/internal/pushprocessor/registry.go +++ /dev/null @@ -1,82 +0,0 @@ -package pushprocessor - -import ( - "fmt" - "sync" -) - -// Registry manages push notification handlers. -type Registry struct { - mu sync.RWMutex - handlers map[string]Handler - protected map[string]bool -} - -// NewRegistry creates a new push notification registry. -func NewRegistry() *Registry { - return &Registry{ - handlers: make(map[string]Handler), - protected: make(map[string]bool), - } -} - -// RegisterHandler registers a handler for a specific push notification name. -// Returns an error if a handler is already registered for this push notification name. -// If protected is true, the handler cannot be unregistered. -func (r *Registry) RegisterHandler(pushNotificationName string, handler Handler, protected bool) error { - r.mu.Lock() - defer r.mu.Unlock() - - if _, exists := r.handlers[pushNotificationName]; exists { - return fmt.Errorf("handler already registered for push notification: %s", pushNotificationName) - } - - r.handlers[pushNotificationName] = handler - r.protected[pushNotificationName] = protected - return nil -} - -// UnregisterHandler removes a handler for a specific push notification name. -// Returns an error if the handler is protected or doesn't exist. -func (r *Registry) UnregisterHandler(pushNotificationName string) error { - r.mu.Lock() - defer r.mu.Unlock() - - _, exists := r.handlers[pushNotificationName] - if !exists { - return fmt.Errorf("no handler registered for push notification: %s", pushNotificationName) - } - - if r.protected[pushNotificationName] { - return fmt.Errorf("cannot unregister protected handler for push notification: %s", pushNotificationName) - } - - delete(r.handlers, pushNotificationName) - delete(r.protected, pushNotificationName) - return nil -} - -// GetHandler returns the handler for a specific push notification name. -// Returns nil if no handler is registered for the given name. -func (r *Registry) GetHandler(pushNotificationName string) Handler { - r.mu.RLock() - defer r.mu.RUnlock() - - handler, exists := r.handlers[pushNotificationName] - if !exists { - return nil - } - return handler -} - -// GetRegisteredPushNotificationNames returns a list of all registered push notification names. -func (r *Registry) GetRegisteredPushNotificationNames() []string { - r.mu.RLock() - defer r.mu.RUnlock() - - names := make([]string, 0, len(r.handlers)) - for name := range r.handlers { - names = append(names, name) - } - return names -} diff --git a/pushnotif/types.go b/pushnotif/types.go deleted file mode 100644 index ea7621f1..00000000 --- a/pushnotif/types.go +++ /dev/null @@ -1,32 +0,0 @@ -package pushnotif - -import ( - "context" - "github.com/redis/go-redis/v9/internal/proto" - "github.com/redis/go-redis/v9/internal/pushprocessor" -) - -// PushProcessorInterface defines the interface for push notification processors. -type PushProcessorInterface interface { - GetHandler(pushNotificationName string) PushNotificationHandler - ProcessPendingNotifications(ctx context.Context, handlerCtx PushNotificationHandlerContext, rd *proto.Reader) error - RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error -} - -// RegistryInterface defines the interface for push notification registries. -type RegistryInterface interface { - RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error - UnregisterHandler(pushNotificationName string) error - GetHandler(pushNotificationName string) PushNotificationHandler - GetRegisteredPushNotificationNames() []string -} - -// NewProcessor creates a new push notification processor. -func NewProcessor() PushProcessorInterface { - return pushprocessor.NewProcessor() -} - -// NewVoidProcessor creates a new void push notification processor. -func NewVoidProcessor() PushProcessorInterface { - return pushprocessor.NewVoidProcessor() -}