diff --git a/pubsub.go b/pubsub.go index aba8d323..da16d319 100644 --- a/pubsub.go +++ b/pubsub.go @@ -436,9 +436,9 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) { default: // Try to handle as generic push notification ctx := c.getContext() - registry := c.pushProcessor.GetRegistry() - if registry != nil { - handled := registry.HandleNotification(ctx, reply) + handler := c.pushProcessor.GetHandler(kind) + if handler != nil { + handled := handler.HandlePushNotification(ctx, reply) if handled { // Return a special message type to indicate it was handled return &PushNotificationMessage{ diff --git a/push_notifications.go b/push_notifications.go index 6d75a5c9..a0eba283 100644 --- a/push_notifications.go +++ b/push_notifications.go @@ -142,6 +142,10 @@ func (p *PushNotificationProcessor) GetRegistryForTesting() *PushNotificationReg // ProcessPendingNotifications checks for and processes any pending push notifications. func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { + // Check for nil reader + if rd == nil { + return nil + } // Check if there are any buffered bytes that might contain push notifications if rd.Buffered() == 0 { @@ -255,6 +259,11 @@ func (v *VoidPushNotificationProcessor) GetRegistryForTesting() *PushNotificatio // ProcessPendingNotifications reads and discards any pending push notifications. func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error { + // Check for nil reader + if rd == nil { + return nil + } + // Read and discard any pending push notifications to clean the buffer for { // Peek at the next reply type to see if it's a push notification