diff --git a/options.go b/options.go index 202345be..091ee419 100644 --- a/options.go +++ b/options.go @@ -230,7 +230,7 @@ type Options struct { // PushNotificationProcessor is the processor for handling push notifications. // If nil, a default processor will be created when PushNotifications is enabled. - PushNotificationProcessor *PushNotificationProcessor + PushNotificationProcessor PushNotificationProcessorInterface } func (opt *Options) init() { diff --git a/pubsub.go b/pubsub.go index 0a0b0d16..ae1b6d16 100644 --- a/pubsub.go +++ b/pubsub.go @@ -40,7 +40,7 @@ type PubSub struct { allCh *channel // Push notification processor for handling generic push notifications - pushProcessor *PushNotificationProcessor + pushProcessor PushNotificationProcessorInterface } func (c *PubSub) init() { @@ -49,7 +49,7 @@ func (c *PubSub) init() { // SetPushNotificationProcessor sets the push notification processor for handling // generic push notifications received on this PubSub connection. -func (c *PubSub) SetPushNotificationProcessor(processor *PushNotificationProcessor) { +func (c *PubSub) SetPushNotificationProcessor(processor PushNotificationProcessorInterface) { c.pushProcessor = processor } @@ -435,15 +435,18 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) { }, nil default: // Try to handle as generic push notification - if c.pushProcessor != nil && c.pushProcessor.IsEnabled() { + if c.pushProcessor.IsEnabled() { ctx := c.getContext() - handled := c.pushProcessor.GetRegistry().HandleNotification(ctx, reply) - if handled { - // Return a special message type to indicate it was handled - return &PushNotificationMessage{ - Command: kind, - Args: reply[1:], - }, nil + registry := c.pushProcessor.GetRegistry() + if registry != nil { + handled := registry.HandleNotification(ctx, reply) + if handled { + // Return a special message type to indicate it was handled + return &PushNotificationMessage{ + Command: kind, + Args: reply[1:], + }, nil + } } } return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind) diff --git a/push_notification_coverage_test.go b/push_notification_coverage_test.go index eee48216..8438f551 100644 --- a/push_notification_coverage_test.go +++ b/push_notification_coverage_test.go @@ -213,10 +213,13 @@ func TestConnWithoutPushNotifications(t *testing.T) { conn := client.Conn() defer conn.Close() - // Test GetPushNotificationProcessor returns nil + // Test GetPushNotificationProcessor returns VoidPushNotificationProcessor processor := conn.GetPushNotificationProcessor() - if processor != nil { - t.Error("Conn should not have push notification processor for RESP2") + if processor == nil { + t.Error("Conn should always have a push notification processor") + } + if processor.IsEnabled() { + t.Error("Push notification processor should be disabled for RESP2") } // Test RegisterPushNotificationHandler returns nil (no error) diff --git a/push_notifications.go b/push_notifications.go index e6c749ab..44fa5532 100644 --- a/push_notifications.go +++ b/push_notifications.go @@ -104,6 +104,15 @@ func (r *PushNotificationRegistry) HasHandlers() bool { return len(r.handlers) > 0 } +// PushNotificationProcessorInterface defines the interface for push notification processors. +type PushNotificationProcessorInterface interface { + IsEnabled() bool + SetEnabled(enabled bool) + GetRegistry() *PushNotificationRegistry + ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error + RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error +} + // PushNotificationProcessor handles the processing of push notifications from Redis. type PushNotificationProcessor struct { registry *PushNotificationRegistry @@ -233,3 +242,62 @@ func (info *PushNotificationInfo) String() string { } return info.Name } + +// VoidPushNotificationProcessor is a no-op processor that discards all push notifications. +// Used when push notifications are disabled to avoid nil checks throughout the codebase. +type VoidPushNotificationProcessor struct{} + +// NewVoidPushNotificationProcessor creates a new void push notification processor. +func NewVoidPushNotificationProcessor() *VoidPushNotificationProcessor { + return &VoidPushNotificationProcessor{} +} + +// IsEnabled always returns false for void processor. +func (v *VoidPushNotificationProcessor) IsEnabled() bool { + return false +} + +// SetEnabled is a no-op for void processor. +func (v *VoidPushNotificationProcessor) SetEnabled(enabled bool) { + // No-op: void processor is always disabled +} + +// GetRegistry returns nil for void processor since it doesn't maintain handlers. +func (v *VoidPushNotificationProcessor) GetRegistry() *PushNotificationRegistry { + return nil +} + +// ProcessPendingNotifications reads and discards any pending push notifications. +func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { + // Read and discard any pending push notifications to clean the buffer + for { + // Peek at the next reply type to see if it's a push notification + replyType, err := rd.PeekReplyType() + if err != nil { + // No more data available or error peeking + break + } + + // Check if this is a RESP3 push notification + if replyType == '>' { // RespPush + // Read and discard the push notification + _, err := rd.ReadReply() + if err != nil { + internal.Logger.Printf(ctx, "push: error reading push notification to discard: %v", err) + break + } + // Continue to check for more push notifications + } else { + // Not a push notification, stop processing + break + } + } + + return nil +} + +// RegisterHandler is a no-op for void processor, always returns nil. +func (v *VoidPushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { + // No-op: void processor doesn't register handlers + return nil +} diff --git a/push_notifications_test.go b/push_notifications_test.go index 88d676bf..92af7352 100644 --- a/push_notifications_test.go +++ b/push_notifications_test.go @@ -182,10 +182,13 @@ func TestClientWithoutPushNotifications(t *testing.T) { }) defer client.Close() - // Push processor should be nil + // Push processor should be a VoidPushNotificationProcessor processor := client.GetPushNotificationProcessor() - if processor != nil { - t.Error("Push notification processor should be nil when disabled") + if processor == nil { + t.Error("Push notification processor should never be nil") + } + if processor.IsEnabled() { + t.Error("Push notification processor should be disabled when PushNotifications is false") } // Registering handlers should not panic diff --git a/redis.go b/redis.go index 462e7426..054c8ba0 100644 --- a/redis.go +++ b/redis.go @@ -209,7 +209,7 @@ type baseClient struct { onClose func() error // hook called when client is closed // Push notification processing - pushProcessor *PushNotificationProcessor + pushProcessor PushNotificationProcessorInterface } func (c *baseClient) clone() *baseClient { @@ -535,7 +535,7 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool } if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error { // Check for push notifications before reading the command reply - if c.opt.Protocol == 3 && c.pushProcessor != nil && c.pushProcessor.IsEnabled() { + if c.opt.Protocol == 3 && c.pushProcessor.IsEnabled() { if err := c.pushProcessor.ProcessPendingNotifications(ctx, rd); err != nil { internal.Logger.Printf(ctx, "push: error processing push notifications: %v", err) } @@ -772,9 +772,7 @@ func NewClient(opt *Options) *Client { c.initializePushProcessor() // Update options with the initialized push processor for connection pool - if c.pushProcessor != nil { - opt.PushNotificationProcessor = c.pushProcessor - } + opt.PushNotificationProcessor = c.pushProcessor c.connPool = newConnPool(opt, c.dialHook) @@ -819,8 +817,11 @@ func (c *Client) initializePushProcessor() { if c.opt.PushNotificationProcessor != nil { c.pushProcessor = c.opt.PushNotificationProcessor } else if c.opt.PushNotifications { - // Create default processor only if push notifications are enabled + // Create default processor when push notifications are enabled c.pushProcessor = NewPushNotificationProcessor(true) + } else { + // Create void processor when push notifications are disabled + c.pushProcessor = NewVoidPushNotificationProcessor() } } @@ -828,14 +829,11 @@ func (c *Client) initializePushProcessor() { // Returns an error if a handler is already registered for this push notification name. // If protected is true, the handler cannot be unregistered. func (c *Client) RegisterPushNotificationHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - if c.pushProcessor != nil { - return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected) - } - return nil + return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected) } // GetPushNotificationProcessor returns the push notification processor. -func (c *Client) GetPushNotificationProcessor() *PushNotificationProcessor { +func (c *Client) GetPushNotificationProcessor() PushNotificationProcessorInterface { return c.pushProcessor } @@ -886,10 +884,8 @@ func (c *Client) pubSub() *PubSub { } pubsub.init() - // Set the push notification processor if available - if c.pushProcessor != nil { - pubsub.SetPushNotificationProcessor(c.pushProcessor) - } + // Set the push notification processor + pubsub.SetPushNotificationProcessor(c.pushProcessor) return pubsub } @@ -974,10 +970,8 @@ func newConn(opt *Options, connPool pool.Pooler, parentHooks *hooksMixin) *Conn c.hooksMixin = parentHooks.clone() } - // Set push notification processor if available in options - if opt.PushNotificationProcessor != nil { - c.pushProcessor = opt.PushNotificationProcessor - } + // Set push notification processor from options (always available now) + c.pushProcessor = opt.PushNotificationProcessor c.cmdable = c.Process c.statefulCmdable = c.Process @@ -1001,14 +995,11 @@ func (c *Conn) Process(ctx context.Context, cmd Cmder) error { // Returns an error if a handler is already registered for this push notification name. // If protected is true, the handler cannot be unregistered. func (c *Conn) RegisterPushNotificationHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error { - if c.pushProcessor != nil { - return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected) - } - return nil + return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected) } // GetPushNotificationProcessor returns the push notification processor. -func (c *Conn) GetPushNotificationProcessor() *PushNotificationProcessor { +func (c *Conn) GetPushNotificationProcessor() PushNotificationProcessorInterface { return c.pushProcessor }