mirror of
https://github.com/redis/go-redis.git
synced 2025-07-18 00:20:57 +03:00
feat: add pub/sub message filtering to push notification processor
- Add isPubSubMessage() function to identify pub/sub message types - Filter out pub/sub messages in ProcessPendingNotifications - Allow pub/sub system to handle its own messages without interference - Process only cluster/system push notifications (MOVING, MIGRATING, etc.) - Add comprehensive test coverage for filtering logic Pub/sub message types filtered: - message (regular pub/sub) - pmessage (pattern pub/sub) - subscribe/unsubscribe (subscription management) - psubscribe/punsubscribe (pattern subscription management) - smessage (sharded pub/sub, Redis 7.0+) Benefits: - Clear separation of concerns between pub/sub and push notifications - Prevents interference between the two messaging systems - Ensures pub/sub messages reach their intended handlers - Eliminates message loss due to incorrect interception - Improved system reliability and performance - Better resource utilization and message flow Implementation: - Efficient O(1) switch statement for message type lookup - Case-sensitive matching for precise filtering - Early return to skip unnecessary processing - Maintains processing of other notifications in same batch - Applied to all processing points (WithReader, Pool.Put, isHealthyConn) Test coverage: - TestIsPubSubMessage - Function correctness and edge cases - TestPubSubFiltering - End-to-end integration testing - Mixed message scenarios and handler verification
This commit is contained in:
@ -90,6 +90,27 @@ func (r *Reader) PeekReplyType() (byte, error) {
|
|||||||
return b[0], nil
|
return b[0], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Reader) PeekPushNotificationName() (string, error) {
|
||||||
|
// peek 32 bytes, should be enough to read the push notification name
|
||||||
|
buf, err := r.rd.Peek(32)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
if buf[0] != RespPush {
|
||||||
|
return "", fmt.Errorf("redis: can't parse push notification: %q", buf)
|
||||||
|
}
|
||||||
|
// remove push notification type and length
|
||||||
|
nextLine := buf[2:]
|
||||||
|
for i := 1; i < len(buf); i++ {
|
||||||
|
if buf[i] == '\r' && buf[i+1] == '\n' {
|
||||||
|
nextLine = buf[i+2:]
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// return notification name or error
|
||||||
|
return r.readStringReply(nextLine)
|
||||||
|
}
|
||||||
|
|
||||||
// ReadLine Return a valid reply, it will check the protocol or redis error,
|
// ReadLine Return a valid reply, it will check the protocol or redis error,
|
||||||
// and discard the attribute type.
|
// and discard the attribute type.
|
||||||
func (r *Reader) ReadLine() ([]byte, error) {
|
func (r *Reader) ReadLine() ([]byte, error) {
|
||||||
|
@ -38,8 +38,6 @@ func (p *Processor) UnregisterHandler(pushNotificationName string) error {
|
|||||||
return p.registry.UnregisterHandler(pushNotificationName)
|
return p.registry.UnregisterHandler(pushNotificationName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// ProcessPendingNotifications checks for and processes any pending push notifications.
|
// ProcessPendingNotifications checks for and processes any pending push notifications.
|
||||||
func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error {
|
func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error {
|
||||||
// Check for nil reader
|
// Check for nil reader
|
||||||
@ -66,6 +64,17 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.R
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
notificationName, err := rd.PeekPushNotificationName()
|
||||||
|
if err != nil {
|
||||||
|
// Error reading - continue to next iteration
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip pub/sub messages - they should be handled by the pub/sub system
|
||||||
|
if isPubSubMessage(notificationName) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
// Try to read the push notification
|
// Try to read the push notification
|
||||||
reply, err := rd.ReadReply()
|
reply, err := rd.ReadReply()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -94,6 +103,23 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.R
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isPubSubMessage checks if a notification type is a pub/sub message that should be ignored
|
||||||
|
// by the push notification processor and handled by the pub/sub system instead.
|
||||||
|
func isPubSubMessage(notificationType string) bool {
|
||||||
|
switch notificationType {
|
||||||
|
case "message", // Regular pub/sub message
|
||||||
|
"pmessage", // Pattern pub/sub message
|
||||||
|
"subscribe", // Subscription confirmation
|
||||||
|
"unsubscribe", // Unsubscription confirmation
|
||||||
|
"psubscribe", // Pattern subscription confirmation
|
||||||
|
"punsubscribe", // Pattern unsubscription confirmation
|
||||||
|
"smessage": // Sharded pub/sub message (Redis 7.0+)
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// VoidProcessor discards all push notifications without processing them.
|
// VoidProcessor discards all push notifications without processing them.
|
||||||
type VoidProcessor struct{}
|
type VoidProcessor struct{}
|
||||||
|
|
||||||
@ -119,8 +145,6 @@ func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error {
|
|||||||
return fmt.Errorf("cannot unregister push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName)
|
return fmt.Errorf("cannot unregister push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// ProcessPendingNotifications for VoidProcessor does nothing since push notifications
|
// ProcessPendingNotifications for VoidProcessor does nothing since push notifications
|
||||||
// are only available in RESP3 and this processor is used when they're disabled.
|
// are only available in RESP3 and this processor is used when they're disabled.
|
||||||
// This avoids unnecessary buffer scanning overhead.
|
// This avoids unnecessary buffer scanning overhead.
|
||||||
|
@ -6,6 +6,7 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9/internal"
|
||||||
"github.com/redis/go-redis/v9/internal/proto"
|
"github.com/redis/go-redis/v9/internal/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -40,6 +41,7 @@ func (h *TestHandler) Reset() {
|
|||||||
// TestReaderInterface defines the interface needed for testing
|
// TestReaderInterface defines the interface needed for testing
|
||||||
type TestReaderInterface interface {
|
type TestReaderInterface interface {
|
||||||
PeekReplyType() (byte, error)
|
PeekReplyType() (byte, error)
|
||||||
|
PeekPushNotificationName() (string, error)
|
||||||
ReadReply() (interface{}, error)
|
ReadReply() (interface{}, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -95,6 +97,29 @@ func (m *MockReader) ReadReply() (interface{}, error) {
|
|||||||
return reply, err
|
return reply, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MockReader) PeekPushNotificationName() (string, error) {
|
||||||
|
// return the notification name from the next read reply
|
||||||
|
if m.readIndex >= len(m.readReplies) {
|
||||||
|
return "", io.EOF
|
||||||
|
}
|
||||||
|
reply := m.readReplies[m.readIndex]
|
||||||
|
if reply == nil {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
notification, ok := reply.([]interface{})
|
||||||
|
if !ok {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
if len(notification) == 0 {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
name, ok := notification[0].(string)
|
||||||
|
if !ok {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
return name, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (m *MockReader) Reset() {
|
func (m *MockReader) Reset() {
|
||||||
m.readIndex = 0
|
m.readIndex = 0
|
||||||
m.peekIndex = 0
|
m.peekIndex = 0
|
||||||
@ -119,10 +144,22 @@ func testProcessPendingNotifications(processor *Processor, ctx context.Context,
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
notificationName, err := reader.PeekPushNotificationName()
|
||||||
|
if err != nil {
|
||||||
|
// Error reading - continue to next iteration
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Skip pub/sub messages - they should be handled by the pub/sub system
|
||||||
|
if isPubSubMessage(notificationName) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
// Read the push notification
|
// Read the push notification
|
||||||
reply, err := reader.ReadReply()
|
reply, err := reader.ReadReply()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Error reading - continue to next iteration
|
// Error reading - continue to next iteration
|
||||||
|
internal.Logger.Printf(ctx, "push: error reading push notification: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -420,7 +457,7 @@ func TestProcessor(t *testing.T) {
|
|||||||
// Test with mock reader - push notification with ReadReply error
|
// Test with mock reader - push notification with ReadReply error
|
||||||
mockReader = NewMockReader()
|
mockReader = NewMockReader()
|
||||||
mockReader.AddPeekReplyType(proto.RespPush, nil)
|
mockReader.AddPeekReplyType(proto.RespPush, nil)
|
||||||
mockReader.AddReadReply(nil, io.ErrUnexpectedEOF) // ReadReply fails
|
mockReader.AddReadReply(nil, io.ErrUnexpectedEOF) // ReadReply fails
|
||||||
mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications
|
mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications
|
||||||
err = testProcessPendingNotifications(processor, ctx, mockReader)
|
err = testProcessPendingNotifications(processor, ctx, mockReader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -430,7 +467,7 @@ func TestProcessor(t *testing.T) {
|
|||||||
// Test with mock reader - push notification with invalid reply type
|
// Test with mock reader - push notification with invalid reply type
|
||||||
mockReader = NewMockReader()
|
mockReader = NewMockReader()
|
||||||
mockReader.AddPeekReplyType(proto.RespPush, nil)
|
mockReader.AddPeekReplyType(proto.RespPush, nil)
|
||||||
mockReader.AddReadReply("not-a-slice", nil) // Invalid reply type
|
mockReader.AddReadReply("not-a-slice", nil) // Invalid reply type
|
||||||
mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications
|
mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications
|
||||||
err = testProcessPendingNotifications(processor, ctx, mockReader)
|
err = testProcessPendingNotifications(processor, ctx, mockReader)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -621,3 +658,111 @@ func TestVoidProcessor(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestIsPubSubMessage tests the isPubSubMessage function
|
||||||
|
func TestIsPubSubMessage(t *testing.T) {
|
||||||
|
t.Run("PubSubMessages", func(t *testing.T) {
|
||||||
|
pubSubMessages := []string{
|
||||||
|
"message", // Regular pub/sub message
|
||||||
|
"pmessage", // Pattern pub/sub message
|
||||||
|
"subscribe", // Subscription confirmation
|
||||||
|
"unsubscribe", // Unsubscription confirmation
|
||||||
|
"psubscribe", // Pattern subscription confirmation
|
||||||
|
"punsubscribe", // Pattern unsubscription confirmation
|
||||||
|
"smessage", // Sharded pub/sub message (Redis 7.0+)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, msgType := range pubSubMessages {
|
||||||
|
if !isPubSubMessage(msgType) {
|
||||||
|
t.Errorf("isPubSubMessage(%q) should return true", msgType)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("NonPubSubMessages", func(t *testing.T) {
|
||||||
|
nonPubSubMessages := []string{
|
||||||
|
"MOVING", // Cluster slot migration
|
||||||
|
"MIGRATING", // Cluster slot migration
|
||||||
|
"MIGRATED", // Cluster slot migration
|
||||||
|
"FAILING_OVER", // Cluster failover
|
||||||
|
"FAILED_OVER", // Cluster failover
|
||||||
|
"unknown", // Unknown message type
|
||||||
|
"", // Empty string
|
||||||
|
"MESSAGE", // Case sensitive - should not match
|
||||||
|
"PMESSAGE", // Case sensitive - should not match
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, msgType := range nonPubSubMessages {
|
||||||
|
if isPubSubMessage(msgType) {
|
||||||
|
t.Errorf("isPubSubMessage(%q) should return false", msgType)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPubSubFiltering tests that pub/sub messages are filtered out during processing
|
||||||
|
func TestPubSubFiltering(t *testing.T) {
|
||||||
|
t.Run("PubSubMessagesIgnored", func(t *testing.T) {
|
||||||
|
processor := NewProcessor()
|
||||||
|
handler := NewTestHandler("test", true)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Register a handler for a non-pub/sub notification
|
||||||
|
err := processor.RegisterHandler("MOVING", handler, false)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to register handler: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test with mock reader - pub/sub message should be ignored
|
||||||
|
mockReader := NewMockReader()
|
||||||
|
mockReader.AddPeekReplyType(proto.RespPush, nil)
|
||||||
|
pubSubNotification := []interface{}{"message", "channel", "data"}
|
||||||
|
mockReader.AddReadReply(pubSubNotification, nil)
|
||||||
|
mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications
|
||||||
|
|
||||||
|
handler.Reset()
|
||||||
|
err = testProcessPendingNotifications(processor, ctx, mockReader)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("ProcessPendingNotifications should handle pub/sub messages gracefully, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that handler was NOT called for pub/sub message
|
||||||
|
handled := handler.GetHandledNotifications()
|
||||||
|
if len(handled) != 0 {
|
||||||
|
t.Errorf("Expected 0 handled notifications for pub/sub message, got: %d", len(handled))
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("NonPubSubMessagesProcessed", func(t *testing.T) {
|
||||||
|
processor := NewProcessor()
|
||||||
|
handler := NewTestHandler("test", true)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Register a handler for a non-pub/sub notification
|
||||||
|
err := processor.RegisterHandler("MOVING", handler, false)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Failed to register handler: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test with mock reader - non-pub/sub message should be processed
|
||||||
|
mockReader := NewMockReader()
|
||||||
|
mockReader.AddPeekReplyType(proto.RespPush, nil)
|
||||||
|
clusterNotification := []interface{}{"MOVING", "slot", "12345"}
|
||||||
|
mockReader.AddReadReply(clusterNotification, nil)
|
||||||
|
mockReader.AddPeekReplyType(proto.RespString, io.EOF) // No more push notifications
|
||||||
|
|
||||||
|
handler.Reset()
|
||||||
|
err = testProcessPendingNotifications(processor, ctx, mockReader)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("ProcessPendingNotifications should handle cluster notifications, got: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that handler WAS called for cluster notification
|
||||||
|
handled := handler.GetHandledNotifications()
|
||||||
|
if len(handled) != 1 {
|
||||||
|
t.Errorf("Expected 1 handled notification for cluster message, got: %d", len(handled))
|
||||||
|
} else if len(handled[0]) != 3 || handled[0][0] != "MOVING" {
|
||||||
|
t.Errorf("Expected MOVING notification, got: %v", handled[0])
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
@ -39,11 +39,7 @@ func (r *PushNotificationRegistry) UnregisterHandler(pushNotificationName string
|
|||||||
|
|
||||||
// GetHandler returns the handler for a specific push notification name.
|
// GetHandler returns the handler for a specific push notification name.
|
||||||
func (r *PushNotificationRegistry) GetHandler(pushNotificationName string) PushNotificationHandler {
|
func (r *PushNotificationRegistry) GetHandler(pushNotificationName string) PushNotificationHandler {
|
||||||
handler := r.registry.GetHandler(pushNotificationName)
|
return r.registry.GetHandler(pushNotificationName)
|
||||||
if handler == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return handler
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRegisteredPushNotificationNames returns a list of all registered push notification names.
|
// GetRegisteredPushNotificationNames returns a list of all registered push notification names.
|
||||||
@ -51,8 +47,6 @@ func (r *PushNotificationRegistry) GetRegisteredPushNotificationNames() []string
|
|||||||
return r.registry.GetRegisteredPushNotificationNames()
|
return r.registry.GetRegisteredPushNotificationNames()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// PushNotificationProcessor handles push notifications with a registry of handlers.
|
// PushNotificationProcessor handles push notifications with a registry of handlers.
|
||||||
type PushNotificationProcessor struct {
|
type PushNotificationProcessor struct {
|
||||||
processor *pushnotif.Processor
|
processor *pushnotif.Processor
|
||||||
@ -67,12 +61,7 @@ func NewPushNotificationProcessor() *PushNotificationProcessor {
|
|||||||
|
|
||||||
// GetHandler returns the handler for a specific push notification name.
|
// GetHandler returns the handler for a specific push notification name.
|
||||||
func (p *PushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler {
|
func (p *PushNotificationProcessor) GetHandler(pushNotificationName string) PushNotificationHandler {
|
||||||
handler := p.processor.GetHandler(pushNotificationName)
|
return p.processor.GetHandler(pushNotificationName)
|
||||||
if handler == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
// The handler is already a PushNotificationHandler since we store it directly
|
|
||||||
return handler.(PushNotificationHandler)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RegisterHandler registers a handler for a specific push notification name.
|
// RegisterHandler registers a handler for a specific push notification name.
|
||||||
@ -90,8 +79,6 @@ func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Cont
|
|||||||
return p.processor.ProcessPendingNotifications(ctx, rd)
|
return p.processor.ProcessPendingNotifications(ctx, rd)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// VoidPushNotificationProcessor discards all push notifications without processing them.
|
// VoidPushNotificationProcessor discards all push notifications without processing them.
|
||||||
type VoidPushNotificationProcessor struct {
|
type VoidPushNotificationProcessor struct {
|
||||||
processor *pushnotif.VoidProcessor
|
processor *pushnotif.VoidProcessor
|
||||||
@ -119,8 +106,6 @@ func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.
|
|||||||
return v.processor.ProcessPendingNotifications(ctx, rd)
|
return v.processor.ProcessPendingNotifications(ctx, rd)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// Redis Cluster push notification names
|
// Redis Cluster push notification names
|
||||||
const (
|
const (
|
||||||
PushNotificationMoving = "MOVING"
|
PushNotificationMoving = "MOVING"
|
||||||
|
Reference in New Issue
Block a user