diff --git a/internal/proto/reader.go b/internal/proto/reader.go index 8d23817f..8daa08a1 100644 --- a/internal/proto/reader.go +++ b/internal/proto/reader.go @@ -90,6 +90,27 @@ func (r *Reader) PeekReplyType() (byte, error) { return b[0], nil } +func (r *Reader) PeekPushNotificationName() (string, error) { + // peek 32 bytes, should be enough to read the push notification name + buf, err := r.rd.Peek(32) + if err != nil { + return "", err + } + if buf[0] != RespPush { + return "", fmt.Errorf("redis: can't parse push notification: %q", buf) + } + // remove push notification type and length + nextLine := buf[2:] + for i := 1; i < len(buf); i++ { + if buf[i] == '\r' && buf[i+1] == '\n' { + nextLine = buf[i+2:] + break + } + } + // return notification name or error + return r.readStringReply(nextLine) +} + // ReadLine Return a valid reply, it will check the protocol or redis error, // and discard the attribute type. func (r *Reader) ReadLine() ([]byte, error) { diff --git a/internal/pushnotif/processor.go b/internal/pushnotif/processor.go index 3c86739a..f4e30eac 100644 --- a/internal/pushnotif/processor.go +++ b/internal/pushnotif/processor.go @@ -38,8 +38,6 @@ func (p *Processor) UnregisterHandler(pushNotificationName string) error { return p.registry.UnregisterHandler(pushNotificationName) } - - // ProcessPendingNotifications checks for and processes any pending push notifications. func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { // Check for nil reader @@ -66,6 +64,17 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.R break } + notificationName, err := rd.PeekPushNotificationName() + if err != nil { + // Error reading - continue to next iteration + break + } + + // Skip pub/sub messages - they should be handled by the pub/sub system + if isPubSubMessage(notificationName) { + break + } + // Try to read the push notification reply, err := rd.ReadReply() if err != nil { @@ -94,6 +103,23 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.R return nil } +// isPubSubMessage checks if a notification type is a pub/sub message that should be ignored +// by the push notification processor and handled by the pub/sub system instead. +func isPubSubMessage(notificationType string) bool { + switch notificationType { + case "message", // Regular pub/sub message + "pmessage", // Pattern pub/sub message + "subscribe", // Subscription confirmation + "unsubscribe", // Unsubscription confirmation + "psubscribe", // Pattern subscription confirmation + "punsubscribe", // Pattern unsubscription confirmation + "smessage": // Sharded pub/sub message (Redis 7.0+) + return true + default: + return false + } +} + // VoidProcessor discards all push notifications without processing them. type VoidProcessor struct{} @@ -119,8 +145,6 @@ func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error { return fmt.Errorf("cannot unregister push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName) } - - // ProcessPendingNotifications for VoidProcessor does nothing since push notifications // are only available in RESP3 and this processor is used when they're disabled. // This avoids unnecessary buffer scanning overhead. diff --git a/internal/pushnotif/pushnotif_test.go b/internal/pushnotif/pushnotif_test.go index a129ff29..5f857e12 100644 --- a/internal/pushnotif/pushnotif_test.go +++ b/internal/pushnotif/pushnotif_test.go @@ -6,6 +6,7 @@ import ( "strings" "testing" + "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/proto" ) @@ -40,6 +41,7 @@ func (h *TestHandler) Reset() { // TestReaderInterface defines the interface needed for testing type TestReaderInterface interface { PeekReplyType() (byte, error) + PeekPushNotificationName() (string, error) ReadReply() (interface{}, error) } @@ -95,6 +97,29 @@ func (m *MockReader) ReadReply() (interface{}, error) { return reply, err } +func (m *MockReader) PeekPushNotificationName() (string, error) { + // return the notification name from the next read reply + if m.readIndex >= len(m.readReplies) { + return "", io.EOF + } + reply := m.readReplies[m.readIndex] + if reply == nil { + return "", nil + } + notification, ok := reply.([]interface{}) + if !ok { + return "", nil + } + if len(notification) == 0 { + return "", nil + } + name, ok := notification[0].(string) + if !ok { + return "", nil + } + return name, nil +} + func (m *MockReader) Reset() { m.readIndex = 0 m.peekIndex = 0 @@ -119,10 +144,22 @@ func testProcessPendingNotifications(processor *Processor, ctx context.Context, break } + notificationName, err := reader.PeekPushNotificationName() + if err != nil { + // Error reading - continue to next iteration + break + } + + // Skip pub/sub messages - they should be handled by the pub/sub system + if isPubSubMessage(notificationName) { + break + } + // Read the push notification reply, err := reader.ReadReply() if err != nil { // Error reading - continue to next iteration + internal.Logger.Printf(ctx, "push: error reading push notification: %v", err) continue } @@ -420,7 +457,7 @@ func TestProcessor(t *testing.T) { // Test with mock reader - push notification with ReadReply error mockReader = NewMockReader() mockReader.AddPeekReplyType(proto.RespPush, nil) - mockReader.AddReadReply(nil, io.ErrUnexpectedEOF) // ReadReply fails + mockReader.AddReadReply(nil, io.ErrUnexpectedEOF) // ReadReply fails mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications err = testProcessPendingNotifications(processor, ctx, mockReader) if err != nil { @@ -430,7 +467,7 @@ func TestProcessor(t *testing.T) { // Test with mock reader - push notification with invalid reply type mockReader = NewMockReader() mockReader.AddPeekReplyType(proto.RespPush, nil) - mockReader.AddReadReply("not-a-slice", nil) // Invalid reply type + mockReader.AddReadReply("not-a-slice", nil) // Invalid reply type mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications err = testProcessPendingNotifications(processor, ctx, mockReader) if err != nil { @@ -620,4 +657,112 @@ func TestVoidProcessor(t *testing.T) { t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err) } }) -} \ No newline at end of file +} + +// TestIsPubSubMessage tests the isPubSubMessage function +func TestIsPubSubMessage(t *testing.T) { + t.Run("PubSubMessages", func(t *testing.T) { + pubSubMessages := []string{ + "message", // Regular pub/sub message + "pmessage", // Pattern pub/sub message + "subscribe", // Subscription confirmation + "unsubscribe", // Unsubscription confirmation + "psubscribe", // Pattern subscription confirmation + "punsubscribe", // Pattern unsubscription confirmation + "smessage", // Sharded pub/sub message (Redis 7.0+) + } + + for _, msgType := range pubSubMessages { + if !isPubSubMessage(msgType) { + t.Errorf("isPubSubMessage(%q) should return true", msgType) + } + } + }) + + t.Run("NonPubSubMessages", func(t *testing.T) { + nonPubSubMessages := []string{ + "MOVING", // Cluster slot migration + "MIGRATING", // Cluster slot migration + "MIGRATED", // Cluster slot migration + "FAILING_OVER", // Cluster failover + "FAILED_OVER", // Cluster failover + "unknown", // Unknown message type + "", // Empty string + "MESSAGE", // Case sensitive - should not match + "PMESSAGE", // Case sensitive - should not match + } + + for _, msgType := range nonPubSubMessages { + if isPubSubMessage(msgType) { + t.Errorf("isPubSubMessage(%q) should return false", msgType) + } + } + }) +} + +// TestPubSubFiltering tests that pub/sub messages are filtered out during processing +func TestPubSubFiltering(t *testing.T) { + t.Run("PubSubMessagesIgnored", func(t *testing.T) { + processor := NewProcessor() + handler := NewTestHandler("test", true) + ctx := context.Background() + + // Register a handler for a non-pub/sub notification + err := processor.RegisterHandler("MOVING", handler, false) + if err != nil { + t.Fatalf("Failed to register handler: %v", err) + } + + // Test with mock reader - pub/sub message should be ignored + mockReader := NewMockReader() + mockReader.AddPeekReplyType(proto.RespPush, nil) + pubSubNotification := []interface{}{"message", "channel", "data"} + mockReader.AddReadReply(pubSubNotification, nil) + mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications + + handler.Reset() + err = testProcessPendingNotifications(processor, ctx, mockReader) + if err != nil { + t.Errorf("ProcessPendingNotifications should handle pub/sub messages gracefully, got: %v", err) + } + + // Check that handler was NOT called for pub/sub message + handled := handler.GetHandledNotifications() + if len(handled) != 0 { + t.Errorf("Expected 0 handled notifications for pub/sub message, got: %d", len(handled)) + } + }) + + t.Run("NonPubSubMessagesProcessed", func(t *testing.T) { + processor := NewProcessor() + handler := NewTestHandler("test", true) + ctx := context.Background() + + // Register a handler for a non-pub/sub notification + err := processor.RegisterHandler("MOVING", handler, false) + if err != nil { + t.Fatalf("Failed to register handler: %v", err) + } + + // Test with mock reader - non-pub/sub message should be processed + mockReader := NewMockReader() + mockReader.AddPeekReplyType(proto.RespPush, nil) + clusterNotification := []interface{}{"MOVING", "slot", "12345"} + mockReader.AddReadReply(clusterNotification, nil) + mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications + + handler.Reset() + err = testProcessPendingNotifications(processor, ctx, mockReader) + if err != nil { + t.Errorf("ProcessPendingNotifications should handle cluster notifications, got: %v", err) + } + + // Check that handler WAS called for cluster notification + handled := handler.GetHandledNotifications() + if len(handled) != 1 { + t.Errorf("Expected 1 handled notification for cluster message, got: %d", len(handled)) + } else if len(handled[0]) != 3 || handled[0][0] != "MOVING" { + t.Errorf("Expected MOVING notification, got: %v", handled[0]) + } + }) +} diff --git a/push_notifications.go b/push_notifications.go index c0ac22d3..18544f85 100644 --- a/push_notifications.go +++ b/push_notifications.go @@ -39,11 +39,7 @@ func (r *PushNotificationRegistry) UnregisterHandler(pushNotificationName string // GetHandler returns the handler for a specific push notification name. func (r *PushNotificationRegistry) GetHandler(pushNotificationName string) PushNotificationHandler { - handler := r.registry.GetHandler(pushNotificationName) - if handler == nil { - return nil - } - return handler + return r.registry.GetHandler(pushNotificationName) } // GetRegisteredPushNotificationNames returns a list of all registered push notification names. @@ -51,8 +47,6 @@ func (r *PushNotificationRegistry) GetRegisteredPushNotificationNames() []string return r.registry.GetRegisteredPushNotificationNames() } - - // PushNotificationProcessor handles push notifications with a registry of handlers. type PushNotificationProcessor struct { processor *pushnotif.Processor @@ -67,12 +61,7 @@ func NewPushNotificationProcessor() *PushNotificationProcessor { // GetHandler returns the handler for a specific push notification name. func (p *PushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler { - handler := p.processor.GetHandler(pushNotificationName) - if handler == nil { - return nil - } - // The handler is already a PushNotificationHandler since we store it directly - return handler.(PushNotificationHandler) + return p.processor.GetHandler(pushNotificationName) } // RegisterHandler registers a handler for a specific push notification name. @@ -90,8 +79,6 @@ func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Cont return p.processor.ProcessPendingNotifications(ctx, rd) } - - // VoidPushNotificationProcessor discards all push notifications without processing them. type VoidPushNotificationProcessor struct { processor *pushnotif.VoidProcessor @@ -119,8 +106,6 @@ func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context. return v.processor.ProcessPendingNotifications(ctx, rd) } - - // Redis Cluster push notification names const ( PushNotificationMoving = "MOVING"