diff --git a/internal/pushnotif/processor.go b/internal/pushnotif/processor.go index f4e30eac..4476ecb8 100644 --- a/internal/pushnotif/processor.go +++ b/internal/pushnotif/processor.go @@ -70,8 +70,8 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.R break } - // Skip pub/sub messages - they should be handled by the pub/sub system - if isPubSubMessage(notificationName) { + // Skip notifications that should be handled by other systems + if shouldSkipNotification(notificationName) { break } @@ -91,6 +91,11 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.R if len(notification) > 0 { // Extract the notification type (first element) if notificationType, ok := notification[0].(string); ok { + // Skip notifications that should be handled by other systems + if shouldSkipNotification(notificationType) { + continue + } + // Get the handler for this notification type if handler := p.registry.GetHandler(notificationType); handler != nil { // Handle the notification @@ -103,17 +108,42 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.R return nil } -// isPubSubMessage checks if a notification type is a pub/sub message that should be ignored -// by the push notification processor and handled by the pub/sub system instead. -func isPubSubMessage(notificationType string) bool { +// shouldSkipNotification checks if a notification type should be ignored by the push notification +// processor and handled by other specialized systems instead (pub/sub, streams, keyspace, etc.). +func shouldSkipNotification(notificationType string) bool { switch notificationType { + // Pub/Sub notifications - handled by pub/sub system case "message", // Regular pub/sub message "pmessage", // Pattern pub/sub message "subscribe", // Subscription confirmation "unsubscribe", // Unsubscription confirmation "psubscribe", // Pattern subscription confirmation "punsubscribe", // Pattern unsubscription confirmation - "smessage": // Sharded pub/sub message (Redis 7.0+) + "smessage", // Sharded pub/sub message (Redis 7.0+) + "ssubscribe", // Sharded subscription confirmation + "sunsubscribe", // Sharded unsubscription confirmation + + // Stream notifications - handled by stream consumers + "xread-from", // Stream reading notifications + "xreadgroup-from", // Stream consumer group notifications + + // Client tracking notifications - handled by client tracking system + "invalidate", // Client-side caching invalidation + + // Keyspace notifications - handled by keyspace notification subscribers + // Note: Keyspace notifications typically have prefixes like "__keyspace@0__:" or "__keyevent@0__:" + // but we'll handle the base notification types here + "expired", // Key expiration events + "evicted", // Key eviction events + "set", // Key set events + "del", // Key deletion events + "rename", // Key rename events + "move", // Key move events + "copy", // Key copy events + "restore", // Key restore events + "sort", // Sort operation events + "flushdb", // Database flush events + "flushall": // All databases flush events return true default: return false diff --git a/internal/pushnotif/pushnotif_test.go b/internal/pushnotif/pushnotif_test.go index 5f857e12..3fa84e88 100644 --- a/internal/pushnotif/pushnotif_test.go +++ b/internal/pushnotif/pushnotif_test.go @@ -150,8 +150,8 @@ func testProcessPendingNotifications(processor *Processor, ctx context.Context, break } - // Skip pub/sub messages - they should be handled by the pub/sub system - if isPubSubMessage(notificationName) { + // Skip notifications that should be handled by other systems + if shouldSkipNotification(notificationName) { break } @@ -659,8 +659,8 @@ func TestVoidProcessor(t *testing.T) { }) } -// TestIsPubSubMessage tests the isPubSubMessage function -func TestIsPubSubMessage(t *testing.T) { +// TestShouldSkipNotification tests the shouldSkipNotification function +func TestShouldSkipNotification(t *testing.T) { t.Run("PubSubMessages", func(t *testing.T) { pubSubMessages := []string{ "message", // Regular pub/sub message @@ -673,8 +673,8 @@ func TestIsPubSubMessage(t *testing.T) { } for _, msgType := range pubSubMessages { - if !isPubSubMessage(msgType) { - t.Errorf("isPubSubMessage(%q) should return true", msgType) + if !shouldSkipNotification(msgType) { + t.Errorf("shouldSkipNotification(%q) should return true", msgType) } } }) @@ -693,8 +693,8 @@ func TestIsPubSubMessage(t *testing.T) { } for _, msgType := range nonPubSubMessages { - if isPubSubMessage(msgType) { - t.Errorf("isPubSubMessage(%q) should return false", msgType) + if shouldSkipNotification(msgType) { + t.Errorf("shouldSkipNotification(%q) should return false", msgType) } } })