mirror of
https://github.com/redis/go-redis.git
synced 2025-07-20 22:42:59 +03:00
feat: add VoidPushNotificationProcessor for disabled push notifications
- Add VoidPushNotificationProcessor that reads and discards push notifications - Create PushNotificationProcessorInterface for consistent behavior - Always provide a processor (real or void) instead of nil - VoidPushNotificationProcessor properly cleans RESP3 push notifications from buffer - Remove all nil checks throughout codebase for cleaner, safer code - Update tests to expect VoidPushNotificationProcessor when disabled Benefits: - Eliminates nil pointer risks throughout the codebase - Follows null object pattern for safer operation - Properly handles RESP3 push notifications even when disabled - Consistent interface regardless of push notification settings - Cleaner code without defensive nil checks everywhere
This commit is contained in:
@ -230,7 +230,7 @@ type Options struct {
|
|||||||
|
|
||||||
// PushNotificationProcessor is the processor for handling push notifications.
|
// PushNotificationProcessor is the processor for handling push notifications.
|
||||||
// If nil, a default processor will be created when PushNotifications is enabled.
|
// If nil, a default processor will be created when PushNotifications is enabled.
|
||||||
PushNotificationProcessor *PushNotificationProcessor
|
PushNotificationProcessor PushNotificationProcessorInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
func (opt *Options) init() {
|
func (opt *Options) init() {
|
||||||
|
23
pubsub.go
23
pubsub.go
@ -40,7 +40,7 @@ type PubSub struct {
|
|||||||
allCh *channel
|
allCh *channel
|
||||||
|
|
||||||
// Push notification processor for handling generic push notifications
|
// Push notification processor for handling generic push notifications
|
||||||
pushProcessor *PushNotificationProcessor
|
pushProcessor PushNotificationProcessorInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *PubSub) init() {
|
func (c *PubSub) init() {
|
||||||
@ -49,7 +49,7 @@ func (c *PubSub) init() {
|
|||||||
|
|
||||||
// SetPushNotificationProcessor sets the push notification processor for handling
|
// SetPushNotificationProcessor sets the push notification processor for handling
|
||||||
// generic push notifications received on this PubSub connection.
|
// generic push notifications received on this PubSub connection.
|
||||||
func (c *PubSub) SetPushNotificationProcessor(processor *PushNotificationProcessor) {
|
func (c *PubSub) SetPushNotificationProcessor(processor PushNotificationProcessorInterface) {
|
||||||
c.pushProcessor = processor
|
c.pushProcessor = processor
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -435,15 +435,18 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
|
|||||||
}, nil
|
}, nil
|
||||||
default:
|
default:
|
||||||
// Try to handle as generic push notification
|
// Try to handle as generic push notification
|
||||||
if c.pushProcessor != nil && c.pushProcessor.IsEnabled() {
|
if c.pushProcessor.IsEnabled() {
|
||||||
ctx := c.getContext()
|
ctx := c.getContext()
|
||||||
handled := c.pushProcessor.GetRegistry().HandleNotification(ctx, reply)
|
registry := c.pushProcessor.GetRegistry()
|
||||||
if handled {
|
if registry != nil {
|
||||||
// Return a special message type to indicate it was handled
|
handled := registry.HandleNotification(ctx, reply)
|
||||||
return &PushNotificationMessage{
|
if handled {
|
||||||
Command: kind,
|
// Return a special message type to indicate it was handled
|
||||||
Args: reply[1:],
|
return &PushNotificationMessage{
|
||||||
}, nil
|
Command: kind,
|
||||||
|
Args: reply[1:],
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
|
return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
|
||||||
|
@ -213,10 +213,13 @@ func TestConnWithoutPushNotifications(t *testing.T) {
|
|||||||
conn := client.Conn()
|
conn := client.Conn()
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
|
|
||||||
// Test GetPushNotificationProcessor returns nil
|
// Test GetPushNotificationProcessor returns VoidPushNotificationProcessor
|
||||||
processor := conn.GetPushNotificationProcessor()
|
processor := conn.GetPushNotificationProcessor()
|
||||||
if processor != nil {
|
if processor == nil {
|
||||||
t.Error("Conn should not have push notification processor for RESP2")
|
t.Error("Conn should always have a push notification processor")
|
||||||
|
}
|
||||||
|
if processor.IsEnabled() {
|
||||||
|
t.Error("Push notification processor should be disabled for RESP2")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Test RegisterPushNotificationHandler returns nil (no error)
|
// Test RegisterPushNotificationHandler returns nil (no error)
|
||||||
|
@ -104,6 +104,15 @@ func (r *PushNotificationRegistry) HasHandlers() bool {
|
|||||||
return len(r.handlers) > 0
|
return len(r.handlers) > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PushNotificationProcessorInterface defines the interface for push notification processors.
|
||||||
|
type PushNotificationProcessorInterface interface {
|
||||||
|
IsEnabled() bool
|
||||||
|
SetEnabled(enabled bool)
|
||||||
|
GetRegistry() *PushNotificationRegistry
|
||||||
|
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
|
||||||
|
RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error
|
||||||
|
}
|
||||||
|
|
||||||
// PushNotificationProcessor handles the processing of push notifications from Redis.
|
// PushNotificationProcessor handles the processing of push notifications from Redis.
|
||||||
type PushNotificationProcessor struct {
|
type PushNotificationProcessor struct {
|
||||||
registry *PushNotificationRegistry
|
registry *PushNotificationRegistry
|
||||||
@ -233,3 +242,62 @@ func (info *PushNotificationInfo) String() string {
|
|||||||
}
|
}
|
||||||
return info.Name
|
return info.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// VoidPushNotificationProcessor is a no-op processor that discards all push notifications.
|
||||||
|
// Used when push notifications are disabled to avoid nil checks throughout the codebase.
|
||||||
|
type VoidPushNotificationProcessor struct{}
|
||||||
|
|
||||||
|
// NewVoidPushNotificationProcessor creates a new void push notification processor.
|
||||||
|
func NewVoidPushNotificationProcessor() *VoidPushNotificationProcessor {
|
||||||
|
return &VoidPushNotificationProcessor{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsEnabled always returns false for void processor.
|
||||||
|
func (v *VoidPushNotificationProcessor) IsEnabled() bool {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetEnabled is a no-op for void processor.
|
||||||
|
func (v *VoidPushNotificationProcessor) SetEnabled(enabled bool) {
|
||||||
|
// No-op: void processor is always disabled
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRegistry returns nil for void processor since it doesn't maintain handlers.
|
||||||
|
func (v *VoidPushNotificationProcessor) GetRegistry() *PushNotificationRegistry {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProcessPendingNotifications reads and discards any pending push notifications.
|
||||||
|
func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error {
|
||||||
|
// Read and discard any pending push notifications to clean the buffer
|
||||||
|
for {
|
||||||
|
// Peek at the next reply type to see if it's a push notification
|
||||||
|
replyType, err := rd.PeekReplyType()
|
||||||
|
if err != nil {
|
||||||
|
// No more data available or error peeking
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if this is a RESP3 push notification
|
||||||
|
if replyType == '>' { // RespPush
|
||||||
|
// Read and discard the push notification
|
||||||
|
_, err := rd.ReadReply()
|
||||||
|
if err != nil {
|
||||||
|
internal.Logger.Printf(ctx, "push: error reading push notification to discard: %v", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
// Continue to check for more push notifications
|
||||||
|
} else {
|
||||||
|
// Not a push notification, stop processing
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterHandler is a no-op for void processor, always returns nil.
|
||||||
|
func (v *VoidPushNotificationProcessor) RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
|
||||||
|
// No-op: void processor doesn't register handlers
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -182,10 +182,13 @@ func TestClientWithoutPushNotifications(t *testing.T) {
|
|||||||
})
|
})
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
|
||||||
// Push processor should be nil
|
// Push processor should be a VoidPushNotificationProcessor
|
||||||
processor := client.GetPushNotificationProcessor()
|
processor := client.GetPushNotificationProcessor()
|
||||||
if processor != nil {
|
if processor == nil {
|
||||||
t.Error("Push notification processor should be nil when disabled")
|
t.Error("Push notification processor should never be nil")
|
||||||
|
}
|
||||||
|
if processor.IsEnabled() {
|
||||||
|
t.Error("Push notification processor should be disabled when PushNotifications is false")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Registering handlers should not panic
|
// Registering handlers should not panic
|
||||||
|
39
redis.go
39
redis.go
@ -209,7 +209,7 @@ type baseClient struct {
|
|||||||
onClose func() error // hook called when client is closed
|
onClose func() error // hook called when client is closed
|
||||||
|
|
||||||
// Push notification processing
|
// Push notification processing
|
||||||
pushProcessor *PushNotificationProcessor
|
pushProcessor PushNotificationProcessorInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *baseClient) clone() *baseClient {
|
func (c *baseClient) clone() *baseClient {
|
||||||
@ -535,7 +535,7 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
|
|||||||
}
|
}
|
||||||
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error {
|
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error {
|
||||||
// Check for push notifications before reading the command reply
|
// Check for push notifications before reading the command reply
|
||||||
if c.opt.Protocol == 3 && c.pushProcessor != nil && c.pushProcessor.IsEnabled() {
|
if c.opt.Protocol == 3 && c.pushProcessor.IsEnabled() {
|
||||||
if err := c.pushProcessor.ProcessPendingNotifications(ctx, rd); err != nil {
|
if err := c.pushProcessor.ProcessPendingNotifications(ctx, rd); err != nil {
|
||||||
internal.Logger.Printf(ctx, "push: error processing push notifications: %v", err)
|
internal.Logger.Printf(ctx, "push: error processing push notifications: %v", err)
|
||||||
}
|
}
|
||||||
@ -772,9 +772,7 @@ func NewClient(opt *Options) *Client {
|
|||||||
c.initializePushProcessor()
|
c.initializePushProcessor()
|
||||||
|
|
||||||
// Update options with the initialized push processor for connection pool
|
// Update options with the initialized push processor for connection pool
|
||||||
if c.pushProcessor != nil {
|
opt.PushNotificationProcessor = c.pushProcessor
|
||||||
opt.PushNotificationProcessor = c.pushProcessor
|
|
||||||
}
|
|
||||||
|
|
||||||
c.connPool = newConnPool(opt, c.dialHook)
|
c.connPool = newConnPool(opt, c.dialHook)
|
||||||
|
|
||||||
@ -819,8 +817,11 @@ func (c *Client) initializePushProcessor() {
|
|||||||
if c.opt.PushNotificationProcessor != nil {
|
if c.opt.PushNotificationProcessor != nil {
|
||||||
c.pushProcessor = c.opt.PushNotificationProcessor
|
c.pushProcessor = c.opt.PushNotificationProcessor
|
||||||
} else if c.opt.PushNotifications {
|
} else if c.opt.PushNotifications {
|
||||||
// Create default processor only if push notifications are enabled
|
// Create default processor when push notifications are enabled
|
||||||
c.pushProcessor = NewPushNotificationProcessor(true)
|
c.pushProcessor = NewPushNotificationProcessor(true)
|
||||||
|
} else {
|
||||||
|
// Create void processor when push notifications are disabled
|
||||||
|
c.pushProcessor = NewVoidPushNotificationProcessor()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -828,14 +829,11 @@ func (c *Client) initializePushProcessor() {
|
|||||||
// Returns an error if a handler is already registered for this push notification name.
|
// Returns an error if a handler is already registered for this push notification name.
|
||||||
// If protected is true, the handler cannot be unregistered.
|
// If protected is true, the handler cannot be unregistered.
|
||||||
func (c *Client) RegisterPushNotificationHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
|
func (c *Client) RegisterPushNotificationHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
|
||||||
if c.pushProcessor != nil {
|
return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected)
|
||||||
return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPushNotificationProcessor returns the push notification processor.
|
// GetPushNotificationProcessor returns the push notification processor.
|
||||||
func (c *Client) GetPushNotificationProcessor() *PushNotificationProcessor {
|
func (c *Client) GetPushNotificationProcessor() PushNotificationProcessorInterface {
|
||||||
return c.pushProcessor
|
return c.pushProcessor
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -886,10 +884,8 @@ func (c *Client) pubSub() *PubSub {
|
|||||||
}
|
}
|
||||||
pubsub.init()
|
pubsub.init()
|
||||||
|
|
||||||
// Set the push notification processor if available
|
// Set the push notification processor
|
||||||
if c.pushProcessor != nil {
|
pubsub.SetPushNotificationProcessor(c.pushProcessor)
|
||||||
pubsub.SetPushNotificationProcessor(c.pushProcessor)
|
|
||||||
}
|
|
||||||
|
|
||||||
return pubsub
|
return pubsub
|
||||||
}
|
}
|
||||||
@ -974,10 +970,8 @@ func newConn(opt *Options, connPool pool.Pooler, parentHooks *hooksMixin) *Conn
|
|||||||
c.hooksMixin = parentHooks.clone()
|
c.hooksMixin = parentHooks.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set push notification processor if available in options
|
// Set push notification processor from options (always available now)
|
||||||
if opt.PushNotificationProcessor != nil {
|
c.pushProcessor = opt.PushNotificationProcessor
|
||||||
c.pushProcessor = opt.PushNotificationProcessor
|
|
||||||
}
|
|
||||||
|
|
||||||
c.cmdable = c.Process
|
c.cmdable = c.Process
|
||||||
c.statefulCmdable = c.Process
|
c.statefulCmdable = c.Process
|
||||||
@ -1001,14 +995,11 @@ func (c *Conn) Process(ctx context.Context, cmd Cmder) error {
|
|||||||
// Returns an error if a handler is already registered for this push notification name.
|
// Returns an error if a handler is already registered for this push notification name.
|
||||||
// If protected is true, the handler cannot be unregistered.
|
// If protected is true, the handler cannot be unregistered.
|
||||||
func (c *Conn) RegisterPushNotificationHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
|
func (c *Conn) RegisterPushNotificationHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
|
||||||
if c.pushProcessor != nil {
|
return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected)
|
||||||
return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetPushNotificationProcessor returns the push notification processor.
|
// GetPushNotificationProcessor returns the push notification processor.
|
||||||
func (c *Conn) GetPushNotificationProcessor() *PushNotificationProcessor {
|
func (c *Conn) GetPushNotificationProcessor() PushNotificationProcessorInterface {
|
||||||
return c.pushProcessor
|
return c.pushProcessor
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user