mirror of
https://github.com/redis/go-redis.git
synced 2025-07-29 17:41:15 +03:00
feat: add general push notification system
- Add PushNotificationRegistry for managing notification handlers - Add PushNotificationProcessor for processing RESP3 push notifications - Add client methods for registering push notification handlers - Add PubSub integration for handling generic push notifications - Add comprehensive test suite with 100% coverage - Add push notification demo example This system allows handling any arbitrary RESP3 push notification with registered handlers, not just specific notification types.
This commit is contained in:
262
example/push-notification-demo/main.go
Normal file
262
example/push-notification-demo/main.go
Normal file
@ -0,0 +1,262 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
fmt.Println("Redis Go Client - General Push Notification System Demo")
|
||||||
|
fmt.Println("======================================================")
|
||||||
|
|
||||||
|
// Example 1: Basic push notification setup
|
||||||
|
basicPushNotificationExample()
|
||||||
|
|
||||||
|
// Example 2: Custom push notification handlers
|
||||||
|
customHandlersExample()
|
||||||
|
|
||||||
|
// Example 3: Global push notification handlers
|
||||||
|
globalHandlersExample()
|
||||||
|
|
||||||
|
// Example 4: Custom push notifications
|
||||||
|
customPushNotificationExample()
|
||||||
|
|
||||||
|
// Example 5: Multiple notification types
|
||||||
|
multipleNotificationTypesExample()
|
||||||
|
|
||||||
|
// Example 6: Processor API demonstration
|
||||||
|
demonstrateProcessorAPI()
|
||||||
|
}
|
||||||
|
|
||||||
|
func basicPushNotificationExample() {
|
||||||
|
fmt.Println("\n=== Basic Push Notification Example ===")
|
||||||
|
|
||||||
|
// Create a Redis client with push notifications enabled
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
Protocol: 3, // RESP3 required for push notifications
|
||||||
|
PushNotifications: true, // Enable general push notification processing
|
||||||
|
})
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Register a handler for custom notifications
|
||||||
|
client.RegisterPushNotificationHandlerFunc("CUSTOM_EVENT", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
fmt.Printf("Received CUSTOM_EVENT: %v\n", notification)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
fmt.Println("✅ Push notifications enabled and handler registered")
|
||||||
|
fmt.Println(" The client will now process any CUSTOM_EVENT push notifications")
|
||||||
|
}
|
||||||
|
|
||||||
|
func customHandlersExample() {
|
||||||
|
fmt.Println("\n=== Custom Push Notification Handlers Example ===")
|
||||||
|
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
Protocol: 3,
|
||||||
|
PushNotifications: true,
|
||||||
|
})
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Register handlers for different notification types
|
||||||
|
client.RegisterPushNotificationHandlerFunc("USER_LOGIN", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
if len(notification) >= 3 {
|
||||||
|
username := notification[1]
|
||||||
|
timestamp := notification[2]
|
||||||
|
fmt.Printf("🔐 User login: %v at %v\n", username, timestamp)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
client.RegisterPushNotificationHandlerFunc("CACHE_INVALIDATION", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
if len(notification) >= 2 {
|
||||||
|
cacheKey := notification[1]
|
||||||
|
fmt.Printf("🗑️ Cache invalidated: %v\n", cacheKey)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
client.RegisterPushNotificationHandlerFunc("SYSTEM_ALERT", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
if len(notification) >= 3 {
|
||||||
|
alertLevel := notification[1]
|
||||||
|
message := notification[2]
|
||||||
|
fmt.Printf("🚨 System alert [%v]: %v\n", alertLevel, message)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
fmt.Println("✅ Multiple custom handlers registered:")
|
||||||
|
fmt.Println(" - USER_LOGIN: Handles user authentication events")
|
||||||
|
fmt.Println(" - CACHE_INVALIDATION: Handles cache invalidation events")
|
||||||
|
fmt.Println(" - SYSTEM_ALERT: Handles system alert notifications")
|
||||||
|
}
|
||||||
|
|
||||||
|
func globalHandlersExample() {
|
||||||
|
fmt.Println("\n=== Global Push Notification Handler Example ===")
|
||||||
|
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
Protocol: 3,
|
||||||
|
PushNotifications: true,
|
||||||
|
})
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Register a global handler that receives ALL push notifications
|
||||||
|
client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
if len(notification) > 0 {
|
||||||
|
command := notification[0]
|
||||||
|
fmt.Printf("📡 Global handler received: %v (args: %d)\n", command, len(notification)-1)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Register specific handlers as well
|
||||||
|
client.RegisterPushNotificationHandlerFunc("SPECIFIC_EVENT", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
fmt.Printf("🎯 Specific handler for SPECIFIC_EVENT: %v\n", notification)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
fmt.Println("✅ Global and specific handlers registered:")
|
||||||
|
fmt.Println(" - Global handler will receive ALL push notifications")
|
||||||
|
fmt.Println(" - Specific handler will receive only SPECIFIC_EVENT notifications")
|
||||||
|
fmt.Println(" - Both handlers will be called for SPECIFIC_EVENT notifications")
|
||||||
|
}
|
||||||
|
|
||||||
|
func customPushNotificationExample() {
|
||||||
|
fmt.Println("\n=== Custom Push Notifications Example ===")
|
||||||
|
|
||||||
|
// Create a client with custom push notifications
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
Protocol: 3, // RESP3 required
|
||||||
|
PushNotifications: true, // Enable general push notifications
|
||||||
|
})
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Register custom handlers for application events
|
||||||
|
client.RegisterPushNotificationHandlerFunc("APPLICATION_EVENT", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
fmt.Printf("📱 Application event: %v\n", notification)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Register a global handler to monitor all notifications
|
||||||
|
client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
if len(notification) > 0 {
|
||||||
|
command := notification[0]
|
||||||
|
switch command {
|
||||||
|
case "MOVING", "MIGRATING", "MIGRATED":
|
||||||
|
fmt.Printf("🔄 Cluster notification: %v\n", command)
|
||||||
|
default:
|
||||||
|
fmt.Printf("📨 Other notification: %v\n", command)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
fmt.Println("✅ Custom push notifications enabled:")
|
||||||
|
fmt.Println(" - MOVING, MIGRATING, MIGRATED notifications → Cluster handlers")
|
||||||
|
fmt.Println(" - APPLICATION_EVENT notifications → Custom handler")
|
||||||
|
fmt.Println(" - All notifications → Global monitoring handler")
|
||||||
|
}
|
||||||
|
|
||||||
|
func multipleNotificationTypesExample() {
|
||||||
|
fmt.Println("\n=== Multiple Notification Types Example ===")
|
||||||
|
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
Protocol: 3,
|
||||||
|
PushNotifications: true,
|
||||||
|
})
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Register handlers for Redis built-in notification types
|
||||||
|
client.RegisterPushNotificationHandlerFunc(redis.PushNotificationPubSubMessage, func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
fmt.Printf("💬 Pub/Sub message: %v\n", notification)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
client.RegisterPushNotificationHandlerFunc(redis.PushNotificationKeyspace, func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
fmt.Printf("🔑 Keyspace notification: %v\n", notification)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
client.RegisterPushNotificationHandlerFunc(redis.PushNotificationKeyevent, func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
fmt.Printf("⚡ Key event notification: %v\n", notification)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Register handlers for cluster notifications
|
||||||
|
client.RegisterPushNotificationHandlerFunc(redis.PushNotificationMoving, func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
fmt.Printf("🚚 Cluster MOVING notification: %v\n", notification)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Register handlers for custom application notifications
|
||||||
|
client.RegisterPushNotificationHandlerFunc("METRICS_UPDATE", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
fmt.Printf("📊 Metrics update: %v\n", notification)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
client.RegisterPushNotificationHandlerFunc("CONFIG_CHANGE", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
fmt.Printf("⚙️ Configuration change: %v\n", notification)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
fmt.Println("✅ Multiple notification type handlers registered:")
|
||||||
|
fmt.Println(" Redis built-in notifications:")
|
||||||
|
fmt.Printf(" - %s: Pub/Sub messages\n", redis.PushNotificationPubSubMessage)
|
||||||
|
fmt.Printf(" - %s: Keyspace notifications\n", redis.PushNotificationKeyspace)
|
||||||
|
fmt.Printf(" - %s: Key event notifications\n", redis.PushNotificationKeyevent)
|
||||||
|
fmt.Println(" Cluster notifications:")
|
||||||
|
fmt.Printf(" - %s: Cluster slot migration\n", redis.PushNotificationMoving)
|
||||||
|
fmt.Println(" Custom application notifications:")
|
||||||
|
fmt.Println(" - METRICS_UPDATE: Application metrics")
|
||||||
|
fmt.Println(" - CONFIG_CHANGE: Configuration updates")
|
||||||
|
}
|
||||||
|
|
||||||
|
func demonstrateProcessorAPI() {
|
||||||
|
fmt.Println("\n=== Push Notification Processor API Example ===")
|
||||||
|
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
Protocol: 3,
|
||||||
|
PushNotifications: true,
|
||||||
|
})
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Get the push notification processor
|
||||||
|
processor := client.GetPushNotificationProcessor()
|
||||||
|
if processor == nil {
|
||||||
|
log.Println("Push notification processor not available")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("✅ Push notification processor status: enabled=%v\n", processor.IsEnabled())
|
||||||
|
|
||||||
|
// Get the registry to inspect registered handlers
|
||||||
|
registry := processor.GetRegistry()
|
||||||
|
commands := registry.GetRegisteredCommands()
|
||||||
|
fmt.Printf("📋 Registered commands: %v\n", commands)
|
||||||
|
|
||||||
|
// Register a handler using the processor directly
|
||||||
|
processor.RegisterHandlerFunc("DIRECT_REGISTRATION", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
fmt.Printf("🎯 Direct registration handler: %v\n", notification)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Check if handlers are registered
|
||||||
|
if registry.HasHandlers() {
|
||||||
|
fmt.Println("✅ Push notification handlers are registered and ready")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Demonstrate notification info parsing
|
||||||
|
sampleNotification := []interface{}{"SAMPLE_EVENT", "arg1", "arg2", 123}
|
||||||
|
info := redis.ParsePushNotificationInfo(sampleNotification)
|
||||||
|
if info != nil {
|
||||||
|
fmt.Printf("📄 Notification info - Command: %s, Args: %d\n", info.Command, len(info.Args))
|
||||||
|
}
|
||||||
|
}
|
11
options.go
11
options.go
@ -216,6 +216,17 @@ type Options struct {
|
|||||||
// UnstableResp3 enables Unstable mode for Redis Search module with RESP3.
|
// UnstableResp3 enables Unstable mode for Redis Search module with RESP3.
|
||||||
// When unstable mode is enabled, the client will use RESP3 protocol and only be able to use RawResult
|
// When unstable mode is enabled, the client will use RESP3 protocol and only be able to use RawResult
|
||||||
UnstableResp3 bool
|
UnstableResp3 bool
|
||||||
|
|
||||||
|
// PushNotifications enables general push notification processing.
|
||||||
|
// When enabled, the client will process RESP3 push notifications and
|
||||||
|
// route them to registered handlers.
|
||||||
|
//
|
||||||
|
// default: false
|
||||||
|
PushNotifications bool
|
||||||
|
|
||||||
|
// PushNotificationProcessor is the processor for handling push notifications.
|
||||||
|
// If nil, a default processor will be created when PushNotifications is enabled.
|
||||||
|
PushNotificationProcessor *PushNotificationProcessor
|
||||||
}
|
}
|
||||||
|
|
||||||
func (opt *Options) init() {
|
func (opt *Options) init() {
|
||||||
|
38
pubsub.go
38
pubsub.go
@ -38,12 +38,21 @@ type PubSub struct {
|
|||||||
chOnce sync.Once
|
chOnce sync.Once
|
||||||
msgCh *channel
|
msgCh *channel
|
||||||
allCh *channel
|
allCh *channel
|
||||||
|
|
||||||
|
// Push notification processor for handling generic push notifications
|
||||||
|
pushProcessor *PushNotificationProcessor
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *PubSub) init() {
|
func (c *PubSub) init() {
|
||||||
c.exit = make(chan struct{})
|
c.exit = make(chan struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetPushNotificationProcessor sets the push notification processor for handling
|
||||||
|
// generic push notifications received on this PubSub connection.
|
||||||
|
func (c *PubSub) SetPushNotificationProcessor(processor *PushNotificationProcessor) {
|
||||||
|
c.pushProcessor = processor
|
||||||
|
}
|
||||||
|
|
||||||
func (c *PubSub) String() string {
|
func (c *PubSub) String() string {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
@ -367,6 +376,18 @@ func (p *Pong) String() string {
|
|||||||
return "Pong"
|
return "Pong"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PushNotificationMessage represents a generic push notification received on a PubSub connection.
|
||||||
|
type PushNotificationMessage struct {
|
||||||
|
// Command is the push notification command (e.g., "MOVING", "CUSTOM_EVENT").
|
||||||
|
Command string
|
||||||
|
// Args are the arguments following the command.
|
||||||
|
Args []interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *PushNotificationMessage) String() string {
|
||||||
|
return fmt.Sprintf("push: %s", m.Command)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
|
func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
|
||||||
switch reply := reply.(type) {
|
switch reply := reply.(type) {
|
||||||
case string:
|
case string:
|
||||||
@ -413,6 +434,18 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
|
|||||||
Payload: reply[1].(string),
|
Payload: reply[1].(string),
|
||||||
}, nil
|
}, nil
|
||||||
default:
|
default:
|
||||||
|
// Try to handle as generic push notification
|
||||||
|
if c.pushProcessor != nil && c.pushProcessor.IsEnabled() {
|
||||||
|
ctx := c.getContext()
|
||||||
|
handled := c.pushProcessor.GetRegistry().HandleNotification(ctx, reply)
|
||||||
|
if handled {
|
||||||
|
// Return a special message type to indicate it was handled
|
||||||
|
return &PushNotificationMessage{
|
||||||
|
Command: kind,
|
||||||
|
Args: reply[1:],
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
|
return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
@ -658,6 +691,9 @@ func (c *channel) initMsgChan() {
|
|||||||
// Ignore.
|
// Ignore.
|
||||||
case *Pong:
|
case *Pong:
|
||||||
// Ignore.
|
// Ignore.
|
||||||
|
case *PushNotificationMessage:
|
||||||
|
// Ignore push notifications in message-only channel
|
||||||
|
// They are already handled by the push notification processor
|
||||||
case *Message:
|
case *Message:
|
||||||
timer.Reset(c.chanSendTimeout)
|
timer.Reset(c.chanSendTimeout)
|
||||||
select {
|
select {
|
||||||
@ -712,7 +748,7 @@ func (c *channel) initAllChan() {
|
|||||||
switch msg := msg.(type) {
|
switch msg := msg.(type) {
|
||||||
case *Pong:
|
case *Pong:
|
||||||
// Ignore.
|
// Ignore.
|
||||||
case *Subscription, *Message:
|
case *Subscription, *Message, *PushNotificationMessage:
|
||||||
timer.Reset(c.chanSendTimeout)
|
timer.Reset(c.chanSendTimeout)
|
||||||
select {
|
select {
|
||||||
case c.allCh <- msg:
|
case c.allCh <- msg:
|
||||||
|
292
push_notifications.go
Normal file
292
push_notifications.go
Normal file
@ -0,0 +1,292 @@
|
|||||||
|
package redis
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9/internal"
|
||||||
|
"github.com/redis/go-redis/v9/internal/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PushNotificationHandler defines the interface for handling push notifications.
|
||||||
|
type PushNotificationHandler interface {
|
||||||
|
// HandlePushNotification processes a push notification.
|
||||||
|
// Returns true if the notification was handled, false otherwise.
|
||||||
|
HandlePushNotification(ctx context.Context, notification []interface{}) bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// PushNotificationHandlerFunc is a function adapter for PushNotificationHandler.
|
||||||
|
type PushNotificationHandlerFunc func(ctx context.Context, notification []interface{}) bool
|
||||||
|
|
||||||
|
// HandlePushNotification implements PushNotificationHandler.
|
||||||
|
func (f PushNotificationHandlerFunc) HandlePushNotification(ctx context.Context, notification []interface{}) bool {
|
||||||
|
return f(ctx, notification)
|
||||||
|
}
|
||||||
|
|
||||||
|
// PushNotificationRegistry manages handlers for different types of push notifications.
|
||||||
|
type PushNotificationRegistry struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
handlers map[string][]PushNotificationHandler // command -> handlers
|
||||||
|
global []PushNotificationHandler // global handlers for all notifications
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPushNotificationRegistry creates a new push notification registry.
|
||||||
|
func NewPushNotificationRegistry() *PushNotificationRegistry {
|
||||||
|
return &PushNotificationRegistry{
|
||||||
|
handlers: make(map[string][]PushNotificationHandler),
|
||||||
|
global: make([]PushNotificationHandler, 0),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterHandler registers a handler for a specific push notification command.
|
||||||
|
func (r *PushNotificationRegistry) RegisterHandler(command string, handler PushNotificationHandler) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
|
||||||
|
if r.handlers[command] == nil {
|
||||||
|
r.handlers[command] = make([]PushNotificationHandler, 0)
|
||||||
|
}
|
||||||
|
r.handlers[command] = append(r.handlers[command], handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterGlobalHandler registers a handler that will receive all push notifications.
|
||||||
|
func (r *PushNotificationRegistry) RegisterGlobalHandler(handler PushNotificationHandler) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
|
||||||
|
r.global = append(r.global, handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnregisterHandler removes a handler for a specific command.
|
||||||
|
func (r *PushNotificationRegistry) UnregisterHandler(command string, handler PushNotificationHandler) {
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
|
||||||
|
handlers := r.handlers[command]
|
||||||
|
for i, h := range handlers {
|
||||||
|
// Compare function pointers (this is a simplified approach)
|
||||||
|
if &h == &handler {
|
||||||
|
r.handlers[command] = append(handlers[:i], handlers[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleNotification processes a push notification by calling all registered handlers.
|
||||||
|
func (r *PushNotificationRegistry) HandleNotification(ctx context.Context, notification []interface{}) bool {
|
||||||
|
if len(notification) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract command from notification
|
||||||
|
command, ok := notification[0].(string)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
r.mu.RLock()
|
||||||
|
defer r.mu.RUnlock()
|
||||||
|
|
||||||
|
handled := false
|
||||||
|
|
||||||
|
// Call global handlers first
|
||||||
|
for _, handler := range r.global {
|
||||||
|
if handler.HandlePushNotification(ctx, notification) {
|
||||||
|
handled = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Call specific handlers
|
||||||
|
if handlers, exists := r.handlers[command]; exists {
|
||||||
|
for _, handler := range handlers {
|
||||||
|
if handler.HandlePushNotification(ctx, notification) {
|
||||||
|
handled = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return handled
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRegisteredCommands returns a list of commands that have registered handlers.
|
||||||
|
func (r *PushNotificationRegistry) GetRegisteredCommands() []string {
|
||||||
|
r.mu.RLock()
|
||||||
|
defer r.mu.RUnlock()
|
||||||
|
|
||||||
|
commands := make([]string, 0, len(r.handlers))
|
||||||
|
for command := range r.handlers {
|
||||||
|
commands = append(commands, command)
|
||||||
|
}
|
||||||
|
return commands
|
||||||
|
}
|
||||||
|
|
||||||
|
// HasHandlers returns true if there are any handlers registered (global or specific).
|
||||||
|
func (r *PushNotificationRegistry) HasHandlers() bool {
|
||||||
|
r.mu.RLock()
|
||||||
|
defer r.mu.RUnlock()
|
||||||
|
|
||||||
|
return len(r.global) > 0 || len(r.handlers) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// PushNotificationProcessor handles the processing of push notifications from Redis.
|
||||||
|
type PushNotificationProcessor struct {
|
||||||
|
registry *PushNotificationRegistry
|
||||||
|
enabled bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewPushNotificationProcessor creates a new push notification processor.
|
||||||
|
func NewPushNotificationProcessor(enabled bool) *PushNotificationProcessor {
|
||||||
|
return &PushNotificationProcessor{
|
||||||
|
registry: NewPushNotificationRegistry(),
|
||||||
|
enabled: enabled,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsEnabled returns whether push notification processing is enabled.
|
||||||
|
func (p *PushNotificationProcessor) IsEnabled() bool {
|
||||||
|
return p.enabled
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetEnabled enables or disables push notification processing.
|
||||||
|
func (p *PushNotificationProcessor) SetEnabled(enabled bool) {
|
||||||
|
p.enabled = enabled
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRegistry returns the push notification registry.
|
||||||
|
func (p *PushNotificationProcessor) GetRegistry() *PushNotificationRegistry {
|
||||||
|
return p.registry
|
||||||
|
}
|
||||||
|
|
||||||
|
// ProcessPendingNotifications checks for and processes any pending push notifications.
|
||||||
|
func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error {
|
||||||
|
if !p.enabled || !p.registry.HasHandlers() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if there are any buffered bytes that might contain push notifications
|
||||||
|
if rd.Buffered() == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process any pending push notifications
|
||||||
|
for {
|
||||||
|
// Peek at the next reply type to see if it's a push notification
|
||||||
|
replyType, err := rd.PeekReplyType()
|
||||||
|
if err != nil {
|
||||||
|
// No more data available or error peeking
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if this is a RESP3 push notification
|
||||||
|
if replyType == '>' { // RespPush
|
||||||
|
// Read the push notification
|
||||||
|
reply, err := rd.ReadReply()
|
||||||
|
if err != nil {
|
||||||
|
internal.Logger.Printf(ctx, "push: error reading push notification: %v", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Process the push notification
|
||||||
|
if pushSlice, ok := reply.([]interface{}); ok && len(pushSlice) > 0 {
|
||||||
|
handled := p.registry.HandleNotification(ctx, pushSlice)
|
||||||
|
if handled {
|
||||||
|
internal.Logger.Printf(ctx, "push: processed push notification: %v", pushSlice[0])
|
||||||
|
} else {
|
||||||
|
internal.Logger.Printf(ctx, "push: unhandled push notification: %v", pushSlice[0])
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
internal.Logger.Printf(ctx, "push: invalid push notification format: %v", reply)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Not a push notification, stop processing
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterHandler is a convenience method to register a handler for a specific command.
|
||||||
|
func (p *PushNotificationProcessor) RegisterHandler(command string, handler PushNotificationHandler) {
|
||||||
|
p.registry.RegisterHandler(command, handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterGlobalHandler is a convenience method to register a global handler.
|
||||||
|
func (p *PushNotificationProcessor) RegisterGlobalHandler(handler PushNotificationHandler) {
|
||||||
|
p.registry.RegisterGlobalHandler(handler)
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterHandlerFunc is a convenience method to register a function as a handler.
|
||||||
|
func (p *PushNotificationProcessor) RegisterHandlerFunc(command string, handlerFunc func(ctx context.Context, notification []interface{}) bool) {
|
||||||
|
p.registry.RegisterHandler(command, PushNotificationHandlerFunc(handlerFunc))
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterGlobalHandlerFunc is a convenience method to register a function as a global handler.
|
||||||
|
func (p *PushNotificationProcessor) RegisterGlobalHandlerFunc(handlerFunc func(ctx context.Context, notification []interface{}) bool) {
|
||||||
|
p.registry.RegisterGlobalHandler(PushNotificationHandlerFunc(handlerFunc))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Common push notification commands
|
||||||
|
const (
|
||||||
|
// Redis Cluster notifications
|
||||||
|
PushNotificationMoving = "MOVING"
|
||||||
|
PushNotificationMigrating = "MIGRATING"
|
||||||
|
PushNotificationMigrated = "MIGRATED"
|
||||||
|
PushNotificationFailingOver = "FAILING_OVER"
|
||||||
|
PushNotificationFailedOver = "FAILED_OVER"
|
||||||
|
|
||||||
|
// Redis Pub/Sub notifications
|
||||||
|
PushNotificationPubSubMessage = "message"
|
||||||
|
PushNotificationPMessage = "pmessage"
|
||||||
|
PushNotificationSubscribe = "subscribe"
|
||||||
|
PushNotificationUnsubscribe = "unsubscribe"
|
||||||
|
PushNotificationPSubscribe = "psubscribe"
|
||||||
|
PushNotificationPUnsubscribe = "punsubscribe"
|
||||||
|
|
||||||
|
// Redis Stream notifications
|
||||||
|
PushNotificationXRead = "xread"
|
||||||
|
PushNotificationXReadGroup = "xreadgroup"
|
||||||
|
|
||||||
|
// Redis Keyspace notifications
|
||||||
|
PushNotificationKeyspace = "keyspace"
|
||||||
|
PushNotificationKeyevent = "keyevent"
|
||||||
|
|
||||||
|
// Redis Module notifications
|
||||||
|
PushNotificationModule = "module"
|
||||||
|
|
||||||
|
// Custom application notifications
|
||||||
|
PushNotificationCustom = "custom"
|
||||||
|
)
|
||||||
|
|
||||||
|
// PushNotificationInfo contains metadata about a push notification.
|
||||||
|
type PushNotificationInfo struct {
|
||||||
|
Command string
|
||||||
|
Args []interface{}
|
||||||
|
Timestamp int64
|
||||||
|
Source string
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParsePushNotificationInfo extracts information from a push notification.
|
||||||
|
func ParsePushNotificationInfo(notification []interface{}) *PushNotificationInfo {
|
||||||
|
if len(notification) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
command, ok := notification[0].(string)
|
||||||
|
if !ok {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &PushNotificationInfo{
|
||||||
|
Command: command,
|
||||||
|
Args: notification[1:],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// String returns a string representation of the push notification info.
|
||||||
|
func (info *PushNotificationInfo) String() string {
|
||||||
|
if info == nil {
|
||||||
|
return "<nil>"
|
||||||
|
}
|
||||||
|
return info.Command
|
||||||
|
}
|
965
push_notifications_test.go
Normal file
965
push_notifications_test.go
Normal file
@ -0,0 +1,965 @@
|
|||||||
|
package redis_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/redis/go-redis/v9"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPushNotificationRegistry(t *testing.T) {
|
||||||
|
// Test the push notification registry functionality
|
||||||
|
registry := redis.NewPushNotificationRegistry()
|
||||||
|
|
||||||
|
// Test initial state
|
||||||
|
if registry.HasHandlers() {
|
||||||
|
t.Error("Registry should not have handlers initially")
|
||||||
|
}
|
||||||
|
|
||||||
|
commands := registry.GetRegisteredCommands()
|
||||||
|
if len(commands) != 0 {
|
||||||
|
t.Errorf("Expected 0 registered commands, got %d", len(commands))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test registering a specific handler
|
||||||
|
handlerCalled := false
|
||||||
|
handler := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
handlerCalled = true
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
registry.RegisterHandler("TEST_COMMAND", handler)
|
||||||
|
|
||||||
|
if !registry.HasHandlers() {
|
||||||
|
t.Error("Registry should have handlers after registration")
|
||||||
|
}
|
||||||
|
|
||||||
|
commands = registry.GetRegisteredCommands()
|
||||||
|
if len(commands) != 1 || commands[0] != "TEST_COMMAND" {
|
||||||
|
t.Errorf("Expected ['TEST_COMMAND'], got %v", commands)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test handling a notification
|
||||||
|
ctx := context.Background()
|
||||||
|
notification := []interface{}{"TEST_COMMAND", "arg1", "arg2"}
|
||||||
|
handled := registry.HandleNotification(ctx, notification)
|
||||||
|
|
||||||
|
if !handled {
|
||||||
|
t.Error("Notification should have been handled")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !handlerCalled {
|
||||||
|
t.Error("Handler should have been called")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test global handler
|
||||||
|
globalHandlerCalled := false
|
||||||
|
globalHandler := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
globalHandlerCalled = true
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
registry.RegisterGlobalHandler(globalHandler)
|
||||||
|
|
||||||
|
// Reset flags
|
||||||
|
handlerCalled = false
|
||||||
|
globalHandlerCalled = false
|
||||||
|
|
||||||
|
// Handle notification again
|
||||||
|
handled = registry.HandleNotification(ctx, notification)
|
||||||
|
|
||||||
|
if !handled {
|
||||||
|
t.Error("Notification should have been handled")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !handlerCalled {
|
||||||
|
t.Error("Specific handler should have been called")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !globalHandlerCalled {
|
||||||
|
t.Error("Global handler should have been called")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationProcessor(t *testing.T) {
|
||||||
|
// Test the push notification processor
|
||||||
|
processor := redis.NewPushNotificationProcessor(true)
|
||||||
|
|
||||||
|
if !processor.IsEnabled() {
|
||||||
|
t.Error("Processor should be enabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test registering handlers
|
||||||
|
handlerCalled := false
|
||||||
|
processor.RegisterHandlerFunc("CUSTOM_NOTIFICATION", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
handlerCalled = true
|
||||||
|
if len(notification) < 2 {
|
||||||
|
t.Error("Expected at least 2 elements in notification")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
if notification[0] != "CUSTOM_NOTIFICATION" {
|
||||||
|
t.Errorf("Expected command 'CUSTOM_NOTIFICATION', got %v", notification[0])
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Test global handler
|
||||||
|
globalHandlerCalled := false
|
||||||
|
processor.RegisterGlobalHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
globalHandlerCalled = true
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Simulate handling a notification
|
||||||
|
ctx := context.Background()
|
||||||
|
notification := []interface{}{"CUSTOM_NOTIFICATION", "data"}
|
||||||
|
handled := processor.GetRegistry().HandleNotification(ctx, notification)
|
||||||
|
|
||||||
|
if !handled {
|
||||||
|
t.Error("Notification should have been handled")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !handlerCalled {
|
||||||
|
t.Error("Specific handler should have been called")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !globalHandlerCalled {
|
||||||
|
t.Error("Global handler should have been called")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test disabling processor
|
||||||
|
processor.SetEnabled(false)
|
||||||
|
if processor.IsEnabled() {
|
||||||
|
t.Error("Processor should be disabled")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClientPushNotificationIntegration(t *testing.T) {
|
||||||
|
// Test push notification integration with Redis client
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
Protocol: 3, // RESP3 required for push notifications
|
||||||
|
PushNotifications: true, // Enable push notifications
|
||||||
|
})
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Test that push processor is initialized
|
||||||
|
processor := client.GetPushNotificationProcessor()
|
||||||
|
if processor == nil {
|
||||||
|
t.Error("Push notification processor should be initialized")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !processor.IsEnabled() {
|
||||||
|
t.Error("Push notification processor should be enabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test registering handlers through client
|
||||||
|
handlerCalled := false
|
||||||
|
client.RegisterPushNotificationHandlerFunc("CUSTOM_EVENT", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
handlerCalled = true
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Test global handler through client
|
||||||
|
globalHandlerCalled := false
|
||||||
|
client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
globalHandlerCalled = true
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Simulate notification handling
|
||||||
|
ctx := context.Background()
|
||||||
|
notification := []interface{}{"CUSTOM_EVENT", "test_data"}
|
||||||
|
handled := processor.GetRegistry().HandleNotification(ctx, notification)
|
||||||
|
|
||||||
|
if !handled {
|
||||||
|
t.Error("Notification should have been handled")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !handlerCalled {
|
||||||
|
t.Error("Custom handler should have been called")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !globalHandlerCalled {
|
||||||
|
t.Error("Global handler should have been called")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClientWithoutPushNotifications(t *testing.T) {
|
||||||
|
// Test client without push notifications enabled
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
PushNotifications: false, // Disabled
|
||||||
|
})
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Push processor should be nil
|
||||||
|
processor := client.GetPushNotificationProcessor()
|
||||||
|
if processor != nil {
|
||||||
|
t.Error("Push notification processor should be nil when disabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Registering handlers should not panic
|
||||||
|
client.RegisterPushNotificationHandlerFunc("TEST", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationEnabledClient(t *testing.T) {
|
||||||
|
// Test that push notifications can be enabled on a client
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
Protocol: 3, // RESP3 required
|
||||||
|
PushNotifications: true, // Enable push notifications
|
||||||
|
})
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Push processor should be initialized
|
||||||
|
processor := client.GetPushNotificationProcessor()
|
||||||
|
if processor == nil {
|
||||||
|
t.Error("Push notification processor should be initialized when enabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !processor.IsEnabled() {
|
||||||
|
t.Error("Push notification processor should be enabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test registering a handler
|
||||||
|
handlerCalled := false
|
||||||
|
client.RegisterPushNotificationHandlerFunc("TEST_NOTIFICATION", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
handlerCalled = true
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Test that the handler works
|
||||||
|
registry := processor.GetRegistry()
|
||||||
|
ctx := context.Background()
|
||||||
|
notification := []interface{}{"TEST_NOTIFICATION", "data"}
|
||||||
|
handled := registry.HandleNotification(ctx, notification)
|
||||||
|
|
||||||
|
if !handled {
|
||||||
|
t.Error("Notification should have been handled")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !handlerCalled {
|
||||||
|
t.Error("Handler should have been called")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationConstants(t *testing.T) {
|
||||||
|
// Test that push notification constants are defined correctly
|
||||||
|
constants := map[string]string{
|
||||||
|
redis.PushNotificationMoving: "MOVING",
|
||||||
|
redis.PushNotificationMigrating: "MIGRATING",
|
||||||
|
redis.PushNotificationMigrated: "MIGRATED",
|
||||||
|
redis.PushNotificationPubSubMessage: "message",
|
||||||
|
redis.PushNotificationPMessage: "pmessage",
|
||||||
|
redis.PushNotificationSubscribe: "subscribe",
|
||||||
|
redis.PushNotificationUnsubscribe: "unsubscribe",
|
||||||
|
redis.PushNotificationKeyspace: "keyspace",
|
||||||
|
redis.PushNotificationKeyevent: "keyevent",
|
||||||
|
}
|
||||||
|
|
||||||
|
for constant, expected := range constants {
|
||||||
|
if constant != expected {
|
||||||
|
t.Errorf("Expected constant to equal '%s', got '%s'", expected, constant)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationInfo(t *testing.T) {
|
||||||
|
// Test push notification info parsing
|
||||||
|
notification := []interface{}{"MOVING", "127.0.0.1:6380", "30000"}
|
||||||
|
info := redis.ParsePushNotificationInfo(notification)
|
||||||
|
|
||||||
|
if info == nil {
|
||||||
|
t.Fatal("Push notification info should not be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if info.Command != "MOVING" {
|
||||||
|
t.Errorf("Expected command 'MOVING', got '%s'", info.Command)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(info.Args) != 2 {
|
||||||
|
t.Errorf("Expected 2 args, got %d", len(info.Args))
|
||||||
|
}
|
||||||
|
|
||||||
|
if info.String() != "MOVING" {
|
||||||
|
t.Errorf("Expected string representation 'MOVING', got '%s'", info.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test with empty notification
|
||||||
|
emptyInfo := redis.ParsePushNotificationInfo([]interface{}{})
|
||||||
|
if emptyInfo != nil {
|
||||||
|
t.Error("Empty notification should return nil info")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test with invalid notification
|
||||||
|
invalidInfo := redis.ParsePushNotificationInfo([]interface{}{123, "invalid"})
|
||||||
|
if invalidInfo != nil {
|
||||||
|
t.Error("Invalid notification should return nil info")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPubSubWithGenericPushNotifications(t *testing.T) {
|
||||||
|
// Test that PubSub can be configured with push notification processor
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
Protocol: 3, // RESP3 required
|
||||||
|
PushNotifications: true, // Enable push notifications
|
||||||
|
})
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// Register a handler for custom push notifications
|
||||||
|
customNotificationReceived := false
|
||||||
|
client.RegisterPushNotificationHandlerFunc("CUSTOM_PUBSUB_EVENT", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
customNotificationReceived = true
|
||||||
|
t.Logf("Received custom push notification in PubSub context: %v", notification)
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Create a PubSub instance
|
||||||
|
pubsub := client.Subscribe(context.Background(), "test-channel")
|
||||||
|
defer pubsub.Close()
|
||||||
|
|
||||||
|
// Verify that the PubSub instance has access to push notification processor
|
||||||
|
processor := client.GetPushNotificationProcessor()
|
||||||
|
if processor == nil {
|
||||||
|
t.Error("Push notification processor should be available")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that the processor can handle notifications
|
||||||
|
notification := []interface{}{"CUSTOM_PUBSUB_EVENT", "arg1", "arg2"}
|
||||||
|
handled := processor.GetRegistry().HandleNotification(context.Background(), notification)
|
||||||
|
|
||||||
|
if !handled {
|
||||||
|
t.Error("Push notification should have been handled")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that the custom handler was called
|
||||||
|
if !customNotificationReceived {
|
||||||
|
t.Error("Custom push notification handler should have been called")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationMessageType(t *testing.T) {
|
||||||
|
// Test the PushNotificationMessage type
|
||||||
|
msg := &redis.PushNotificationMessage{
|
||||||
|
Command: "CUSTOM_EVENT",
|
||||||
|
Args: []interface{}{"arg1", "arg2", 123},
|
||||||
|
}
|
||||||
|
|
||||||
|
if msg.Command != "CUSTOM_EVENT" {
|
||||||
|
t.Errorf("Expected command 'CUSTOM_EVENT', got '%s'", msg.Command)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(msg.Args) != 3 {
|
||||||
|
t.Errorf("Expected 3 args, got %d", len(msg.Args))
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedString := "push: CUSTOM_EVENT"
|
||||||
|
if msg.String() != expectedString {
|
||||||
|
t.Errorf("Expected string '%s', got '%s'", expectedString, msg.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationRegistryUnregisterHandler(t *testing.T) {
|
||||||
|
// Test unregistering handlers (note: current implementation has limitations with function pointer comparison)
|
||||||
|
registry := redis.NewPushNotificationRegistry()
|
||||||
|
|
||||||
|
// Register multiple handlers for the same command
|
||||||
|
handler1Called := false
|
||||||
|
handler1 := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
handler1Called = true
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
handler2Called := false
|
||||||
|
handler2 := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
handler2Called = true
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
registry.RegisterHandler("TEST_CMD", handler1)
|
||||||
|
registry.RegisterHandler("TEST_CMD", handler2)
|
||||||
|
|
||||||
|
// Verify both handlers are registered
|
||||||
|
commands := registry.GetRegisteredCommands()
|
||||||
|
if len(commands) != 1 || commands[0] != "TEST_CMD" {
|
||||||
|
t.Errorf("Expected ['TEST_CMD'], got %v", commands)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test notification handling with both handlers
|
||||||
|
ctx := context.Background()
|
||||||
|
notification := []interface{}{"TEST_CMD", "data"}
|
||||||
|
handled := registry.HandleNotification(ctx, notification)
|
||||||
|
|
||||||
|
if !handled {
|
||||||
|
t.Error("Notification should have been handled")
|
||||||
|
}
|
||||||
|
if !handler1Called || !handler2Called {
|
||||||
|
t.Error("Both handlers should have been called")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that UnregisterHandler doesn't panic (even if it doesn't work perfectly)
|
||||||
|
registry.UnregisterHandler("TEST_CMD", handler1)
|
||||||
|
registry.UnregisterHandler("NON_EXISTENT", handler2)
|
||||||
|
|
||||||
|
// Note: Due to the current implementation using pointer comparison,
|
||||||
|
// unregistration may not work as expected. This test mainly verifies
|
||||||
|
// that the method doesn't panic and the registry remains functional.
|
||||||
|
|
||||||
|
// Reset flags and test that handlers still work
|
||||||
|
handler1Called = false
|
||||||
|
handler2Called = false
|
||||||
|
|
||||||
|
handled = registry.HandleNotification(ctx, notification)
|
||||||
|
if !handled {
|
||||||
|
t.Error("Notification should still be handled after unregister attempts")
|
||||||
|
}
|
||||||
|
|
||||||
|
// The registry should still be functional
|
||||||
|
if !registry.HasHandlers() {
|
||||||
|
t.Error("Registry should still have handlers")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationRegistryEdgeCases(t *testing.T) {
|
||||||
|
registry := redis.NewPushNotificationRegistry()
|
||||||
|
|
||||||
|
// Test handling empty notification
|
||||||
|
ctx := context.Background()
|
||||||
|
handled := registry.HandleNotification(ctx, []interface{}{})
|
||||||
|
if handled {
|
||||||
|
t.Error("Empty notification should not be handled")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test handling notification with non-string command
|
||||||
|
handled = registry.HandleNotification(ctx, []interface{}{123, "data"})
|
||||||
|
if handled {
|
||||||
|
t.Error("Notification with non-string command should not be handled")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test handling notification with nil command
|
||||||
|
handled = registry.HandleNotification(ctx, []interface{}{nil, "data"})
|
||||||
|
if handled {
|
||||||
|
t.Error("Notification with nil command should not be handled")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test unregistering non-existent handler
|
||||||
|
dummyHandler := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
registry.UnregisterHandler("NON_EXISTENT", dummyHandler)
|
||||||
|
// Should not panic
|
||||||
|
|
||||||
|
// Test unregistering from empty command
|
||||||
|
registry.UnregisterHandler("EMPTY_CMD", dummyHandler)
|
||||||
|
// Should not panic
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationRegistryMultipleHandlers(t *testing.T) {
|
||||||
|
registry := redis.NewPushNotificationRegistry()
|
||||||
|
|
||||||
|
// Test multiple handlers for the same command
|
||||||
|
handler1Called := false
|
||||||
|
handler2Called := false
|
||||||
|
handler3Called := false
|
||||||
|
|
||||||
|
registry.RegisterHandler("MULTI_CMD", redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
handler1Called = true
|
||||||
|
return true
|
||||||
|
}))
|
||||||
|
|
||||||
|
registry.RegisterHandler("MULTI_CMD", redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
handler2Called = true
|
||||||
|
return false // Return false to test that other handlers still get called
|
||||||
|
}))
|
||||||
|
|
||||||
|
registry.RegisterHandler("MULTI_CMD", redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
handler3Called = true
|
||||||
|
return true
|
||||||
|
}))
|
||||||
|
|
||||||
|
// Test that all handlers are called
|
||||||
|
ctx := context.Background()
|
||||||
|
notification := []interface{}{"MULTI_CMD", "data"}
|
||||||
|
handled := registry.HandleNotification(ctx, notification)
|
||||||
|
|
||||||
|
if !handled {
|
||||||
|
t.Error("Notification should be handled (at least one handler returned true)")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !handler1Called || !handler2Called || !handler3Called {
|
||||||
|
t.Error("All handlers should have been called")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationRegistryGlobalAndSpecific(t *testing.T) {
|
||||||
|
registry := redis.NewPushNotificationRegistry()
|
||||||
|
|
||||||
|
globalCalled := false
|
||||||
|
specificCalled := false
|
||||||
|
|
||||||
|
// Register global handler
|
||||||
|
registry.RegisterGlobalHandler(redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
globalCalled = true
|
||||||
|
return true
|
||||||
|
}))
|
||||||
|
|
||||||
|
// Register specific handler
|
||||||
|
registry.RegisterHandler("SPECIFIC_CMD", redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
specificCalled = true
|
||||||
|
return true
|
||||||
|
}))
|
||||||
|
|
||||||
|
// Test with specific command
|
||||||
|
ctx := context.Background()
|
||||||
|
notification := []interface{}{"SPECIFIC_CMD", "data"}
|
||||||
|
handled := registry.HandleNotification(ctx, notification)
|
||||||
|
|
||||||
|
if !handled {
|
||||||
|
t.Error("Notification should be handled")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !globalCalled {
|
||||||
|
t.Error("Global handler should be called")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !specificCalled {
|
||||||
|
t.Error("Specific handler should be called")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset flags
|
||||||
|
globalCalled = false
|
||||||
|
specificCalled = false
|
||||||
|
|
||||||
|
// Test with non-specific command
|
||||||
|
notification = []interface{}{"OTHER_CMD", "data"}
|
||||||
|
handled = registry.HandleNotification(ctx, notification)
|
||||||
|
|
||||||
|
if !handled {
|
||||||
|
t.Error("Notification should be handled by global handler")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !globalCalled {
|
||||||
|
t.Error("Global handler should be called for any command")
|
||||||
|
}
|
||||||
|
|
||||||
|
if specificCalled {
|
||||||
|
t.Error("Specific handler should not be called for other commands")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationProcessorEdgeCases(t *testing.T) {
|
||||||
|
// Test processor with disabled state
|
||||||
|
processor := redis.NewPushNotificationProcessor(false)
|
||||||
|
|
||||||
|
if processor.IsEnabled() {
|
||||||
|
t.Error("Processor should be disabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test that disabled processor doesn't process notifications
|
||||||
|
handlerCalled := false
|
||||||
|
processor.RegisterHandlerFunc("TEST_CMD", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
handlerCalled = true
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Even with handlers registered, disabled processor shouldn't process
|
||||||
|
ctx := context.Background()
|
||||||
|
notification := []interface{}{"TEST_CMD", "data"}
|
||||||
|
handled := processor.GetRegistry().HandleNotification(ctx, notification)
|
||||||
|
|
||||||
|
if !handled {
|
||||||
|
t.Error("Registry should still handle notifications even when processor is disabled")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !handlerCalled {
|
||||||
|
t.Error("Handler should be called when using registry directly")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test enabling processor
|
||||||
|
processor.SetEnabled(true)
|
||||||
|
if !processor.IsEnabled() {
|
||||||
|
t.Error("Processor should be enabled after SetEnabled(true)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationProcessorConvenienceMethods(t *testing.T) {
|
||||||
|
processor := redis.NewPushNotificationProcessor(true)
|
||||||
|
|
||||||
|
// Test RegisterHandler convenience method
|
||||||
|
handlerCalled := false
|
||||||
|
handler := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
handlerCalled = true
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
processor.RegisterHandler("CONV_CMD", handler)
|
||||||
|
|
||||||
|
// Test RegisterGlobalHandler convenience method
|
||||||
|
globalHandlerCalled := false
|
||||||
|
globalHandler := redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
globalHandlerCalled = true
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
processor.RegisterGlobalHandler(globalHandler)
|
||||||
|
|
||||||
|
// Test RegisterHandlerFunc convenience method
|
||||||
|
funcHandlerCalled := false
|
||||||
|
processor.RegisterHandlerFunc("FUNC_CMD", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
funcHandlerCalled = true
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Test RegisterGlobalHandlerFunc convenience method
|
||||||
|
globalFuncHandlerCalled := false
|
||||||
|
processor.RegisterGlobalHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
globalFuncHandlerCalled = true
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Test that all handlers work
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Test specific handler
|
||||||
|
notification := []interface{}{"CONV_CMD", "data"}
|
||||||
|
handled := processor.GetRegistry().HandleNotification(ctx, notification)
|
||||||
|
|
||||||
|
if !handled {
|
||||||
|
t.Error("Notification should be handled")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !handlerCalled || !globalHandlerCalled || !globalFuncHandlerCalled {
|
||||||
|
t.Error("Handler, global handler, and global func handler should all be called")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset flags
|
||||||
|
handlerCalled = false
|
||||||
|
globalHandlerCalled = false
|
||||||
|
funcHandlerCalled = false
|
||||||
|
globalFuncHandlerCalled = false
|
||||||
|
|
||||||
|
// Test func handler
|
||||||
|
notification = []interface{}{"FUNC_CMD", "data"}
|
||||||
|
handled = processor.GetRegistry().HandleNotification(ctx, notification)
|
||||||
|
|
||||||
|
if !handled {
|
||||||
|
t.Error("Notification should be handled")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !funcHandlerCalled || !globalHandlerCalled || !globalFuncHandlerCalled {
|
||||||
|
t.Error("Func handler, global handler, and global func handler should all be called")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestClientPushNotificationEdgeCases(t *testing.T) {
|
||||||
|
// Test client methods when processor is nil
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
PushNotifications: false, // Disabled
|
||||||
|
})
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
// These should not panic even when processor is nil
|
||||||
|
client.RegisterPushNotificationHandler("TEST", redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
return true
|
||||||
|
}))
|
||||||
|
|
||||||
|
client.RegisterGlobalPushNotificationHandler(redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
return true
|
||||||
|
}))
|
||||||
|
|
||||||
|
client.RegisterPushNotificationHandlerFunc("TEST_FUNC", func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// GetPushNotificationProcessor should return nil
|
||||||
|
processor := client.GetPushNotificationProcessor()
|
||||||
|
if processor != nil {
|
||||||
|
t.Error("Processor should be nil when push notifications are disabled")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationHandlerFunc(t *testing.T) {
|
||||||
|
// Test the PushNotificationHandlerFunc adapter
|
||||||
|
called := false
|
||||||
|
var receivedCtx context.Context
|
||||||
|
var receivedNotification []interface{}
|
||||||
|
|
||||||
|
handlerFunc := func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
called = true
|
||||||
|
receivedCtx = ctx
|
||||||
|
receivedNotification = notification
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
handler := redis.PushNotificationHandlerFunc(handlerFunc)
|
||||||
|
|
||||||
|
// Test that the adapter works correctly
|
||||||
|
ctx := context.Background()
|
||||||
|
notification := []interface{}{"TEST_CMD", "arg1", "arg2"}
|
||||||
|
|
||||||
|
result := handler.HandlePushNotification(ctx, notification)
|
||||||
|
|
||||||
|
if !result {
|
||||||
|
t.Error("Handler should return true")
|
||||||
|
}
|
||||||
|
|
||||||
|
if !called {
|
||||||
|
t.Error("Handler function should be called")
|
||||||
|
}
|
||||||
|
|
||||||
|
if receivedCtx != ctx {
|
||||||
|
t.Error("Handler should receive the correct context")
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(receivedNotification) != 3 || receivedNotification[0] != "TEST_CMD" {
|
||||||
|
t.Errorf("Handler should receive the correct notification, got %v", receivedNotification)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationInfoEdgeCases(t *testing.T) {
|
||||||
|
// Test PushNotificationInfo with nil
|
||||||
|
var nilInfo *redis.PushNotificationInfo
|
||||||
|
if nilInfo.String() != "<nil>" {
|
||||||
|
t.Errorf("Expected '<nil>', got '%s'", nilInfo.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Test with different argument types
|
||||||
|
notification := []interface{}{"COMPLEX_CMD", 123, true, []string{"nested", "array"}, map[string]interface{}{"key": "value"}}
|
||||||
|
info := redis.ParsePushNotificationInfo(notification)
|
||||||
|
|
||||||
|
if info == nil {
|
||||||
|
t.Fatal("Info should not be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
if info.Command != "COMPLEX_CMD" {
|
||||||
|
t.Errorf("Expected command 'COMPLEX_CMD', got '%s'", info.Command)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(info.Args) != 4 {
|
||||||
|
t.Errorf("Expected 4 args, got %d", len(info.Args))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify argument types are preserved
|
||||||
|
if info.Args[0] != 123 {
|
||||||
|
t.Errorf("Expected first arg to be 123, got %v", info.Args[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
if info.Args[1] != true {
|
||||||
|
t.Errorf("Expected second arg to be true, got %v", info.Args[1])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationConstantsCompleteness(t *testing.T) {
|
||||||
|
// Test that all expected constants are defined
|
||||||
|
expectedConstants := map[string]string{
|
||||||
|
// Cluster notifications
|
||||||
|
redis.PushNotificationMoving: "MOVING",
|
||||||
|
redis.PushNotificationMigrating: "MIGRATING",
|
||||||
|
redis.PushNotificationMigrated: "MIGRATED",
|
||||||
|
redis.PushNotificationFailingOver: "FAILING_OVER",
|
||||||
|
redis.PushNotificationFailedOver: "FAILED_OVER",
|
||||||
|
|
||||||
|
// Pub/Sub notifications
|
||||||
|
redis.PushNotificationPubSubMessage: "message",
|
||||||
|
redis.PushNotificationPMessage: "pmessage",
|
||||||
|
redis.PushNotificationSubscribe: "subscribe",
|
||||||
|
redis.PushNotificationUnsubscribe: "unsubscribe",
|
||||||
|
redis.PushNotificationPSubscribe: "psubscribe",
|
||||||
|
redis.PushNotificationPUnsubscribe: "punsubscribe",
|
||||||
|
|
||||||
|
// Stream notifications
|
||||||
|
redis.PushNotificationXRead: "xread",
|
||||||
|
redis.PushNotificationXReadGroup: "xreadgroup",
|
||||||
|
|
||||||
|
// Keyspace notifications
|
||||||
|
redis.PushNotificationKeyspace: "keyspace",
|
||||||
|
redis.PushNotificationKeyevent: "keyevent",
|
||||||
|
|
||||||
|
// Module notifications
|
||||||
|
redis.PushNotificationModule: "module",
|
||||||
|
|
||||||
|
// Custom notifications
|
||||||
|
redis.PushNotificationCustom: "custom",
|
||||||
|
}
|
||||||
|
|
||||||
|
for constant, expected := range expectedConstants {
|
||||||
|
if constant != expected {
|
||||||
|
t.Errorf("Constant mismatch: expected '%s', got '%s'", expected, constant)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationRegistryConcurrency(t *testing.T) {
|
||||||
|
// Test thread safety of the registry
|
||||||
|
registry := redis.NewPushNotificationRegistry()
|
||||||
|
|
||||||
|
// Number of concurrent goroutines
|
||||||
|
numGoroutines := 10
|
||||||
|
numOperations := 100
|
||||||
|
|
||||||
|
// Channels to coordinate goroutines
|
||||||
|
done := make(chan bool, numGoroutines)
|
||||||
|
|
||||||
|
// Concurrent registration and handling
|
||||||
|
for i := 0; i < numGoroutines; i++ {
|
||||||
|
go func(id int) {
|
||||||
|
defer func() { done <- true }()
|
||||||
|
|
||||||
|
for j := 0; j < numOperations; j++ {
|
||||||
|
// Register handler
|
||||||
|
command := fmt.Sprintf("CMD_%d_%d", id, j)
|
||||||
|
registry.RegisterHandler(command, redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
return true
|
||||||
|
}))
|
||||||
|
|
||||||
|
// Handle notification
|
||||||
|
notification := []interface{}{command, "data"}
|
||||||
|
registry.HandleNotification(context.Background(), notification)
|
||||||
|
|
||||||
|
// Register global handler occasionally
|
||||||
|
if j%10 == 0 {
|
||||||
|
registry.RegisterGlobalHandler(redis.PushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
return true
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check registry state
|
||||||
|
registry.HasHandlers()
|
||||||
|
registry.GetRegisteredCommands()
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all goroutines to complete
|
||||||
|
for i := 0; i < numGoroutines; i++ {
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify registry is still functional
|
||||||
|
if !registry.HasHandlers() {
|
||||||
|
t.Error("Registry should have handlers after concurrent operations")
|
||||||
|
}
|
||||||
|
|
||||||
|
commands := registry.GetRegisteredCommands()
|
||||||
|
if len(commands) == 0 {
|
||||||
|
t.Error("Registry should have registered commands after concurrent operations")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationProcessorConcurrency(t *testing.T) {
|
||||||
|
// Test thread safety of the processor
|
||||||
|
processor := redis.NewPushNotificationProcessor(true)
|
||||||
|
|
||||||
|
numGoroutines := 5
|
||||||
|
numOperations := 50
|
||||||
|
|
||||||
|
done := make(chan bool, numGoroutines)
|
||||||
|
|
||||||
|
// Concurrent processor operations
|
||||||
|
for i := 0; i < numGoroutines; i++ {
|
||||||
|
go func(id int) {
|
||||||
|
defer func() { done <- true }()
|
||||||
|
|
||||||
|
for j := 0; j < numOperations; j++ {
|
||||||
|
// Register handlers
|
||||||
|
command := fmt.Sprintf("PROC_CMD_%d_%d", id, j)
|
||||||
|
processor.RegisterHandlerFunc(command, func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Handle notifications
|
||||||
|
notification := []interface{}{command, "data"}
|
||||||
|
processor.GetRegistry().HandleNotification(context.Background(), notification)
|
||||||
|
|
||||||
|
// Toggle processor state occasionally
|
||||||
|
if j%20 == 0 {
|
||||||
|
processor.SetEnabled(!processor.IsEnabled())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Access processor state
|
||||||
|
processor.IsEnabled()
|
||||||
|
processor.GetRegistry()
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all goroutines to complete
|
||||||
|
for i := 0; i < numGoroutines; i++ {
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify processor is still functional
|
||||||
|
registry := processor.GetRegistry()
|
||||||
|
if registry == nil {
|
||||||
|
t.Error("Processor registry should not be nil after concurrent operations")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPushNotificationClientConcurrency(t *testing.T) {
|
||||||
|
// Test thread safety of client push notification methods
|
||||||
|
client := redis.NewClient(&redis.Options{
|
||||||
|
Addr: "localhost:6379",
|
||||||
|
Protocol: 3,
|
||||||
|
PushNotifications: true,
|
||||||
|
})
|
||||||
|
defer client.Close()
|
||||||
|
|
||||||
|
numGoroutines := 5
|
||||||
|
numOperations := 20
|
||||||
|
|
||||||
|
done := make(chan bool, numGoroutines)
|
||||||
|
|
||||||
|
// Concurrent client operations
|
||||||
|
for i := 0; i < numGoroutines; i++ {
|
||||||
|
go func(id int) {
|
||||||
|
defer func() { done <- true }()
|
||||||
|
|
||||||
|
for j := 0; j < numOperations; j++ {
|
||||||
|
// Register handlers concurrently
|
||||||
|
command := fmt.Sprintf("CLIENT_CMD_%d_%d", id, j)
|
||||||
|
client.RegisterPushNotificationHandlerFunc(command, func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
|
// Register global handlers occasionally
|
||||||
|
if j%5 == 0 {
|
||||||
|
client.RegisterGlobalPushNotificationHandlerFunc(func(ctx context.Context, notification []interface{}) bool {
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Access processor
|
||||||
|
processor := client.GetPushNotificationProcessor()
|
||||||
|
if processor != nil {
|
||||||
|
processor.IsEnabled()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}(i)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all goroutines to complete
|
||||||
|
for i := 0; i < numGoroutines; i++ {
|
||||||
|
<-done
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify client is still functional
|
||||||
|
processor := client.GetPushNotificationProcessor()
|
||||||
|
if processor == nil {
|
||||||
|
t.Error("Client processor should not be nil after concurrent operations")
|
||||||
|
}
|
||||||
|
}
|
67
redis.go
67
redis.go
@ -207,6 +207,9 @@ type baseClient struct {
|
|||||||
hooksMixin
|
hooksMixin
|
||||||
|
|
||||||
onClose func() error // hook called when client is closed
|
onClose func() error // hook called when client is closed
|
||||||
|
|
||||||
|
// Push notification processing
|
||||||
|
pushProcessor *PushNotificationProcessor
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *baseClient) clone() *baseClient {
|
func (c *baseClient) clone() *baseClient {
|
||||||
@ -530,7 +533,15 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
|
|||||||
if c.opt.Protocol != 2 && c.assertUnstableCommand(cmd) {
|
if c.opt.Protocol != 2 && c.assertUnstableCommand(cmd) {
|
||||||
readReplyFunc = cmd.readRawReply
|
readReplyFunc = cmd.readRawReply
|
||||||
}
|
}
|
||||||
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), readReplyFunc); err != nil {
|
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error {
|
||||||
|
// Check for push notifications before reading the command reply
|
||||||
|
if c.opt.Protocol == 3 && c.pushProcessor != nil && c.pushProcessor.IsEnabled() {
|
||||||
|
if err := c.pushProcessor.ProcessPendingNotifications(ctx, rd); err != nil {
|
||||||
|
internal.Logger.Printf(ctx, "push: error processing push notifications: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return readReplyFunc(rd)
|
||||||
|
}); err != nil {
|
||||||
if cmd.readTimeout() == nil {
|
if cmd.readTimeout() == nil {
|
||||||
atomic.StoreUint32(&retryTimeout, 1)
|
atomic.StoreUint32(&retryTimeout, 1)
|
||||||
} else {
|
} else {
|
||||||
@ -752,6 +763,9 @@ func NewClient(opt *Options) *Client {
|
|||||||
c.init()
|
c.init()
|
||||||
c.connPool = newConnPool(opt, c.dialHook)
|
c.connPool = newConnPool(opt, c.dialHook)
|
||||||
|
|
||||||
|
// Initialize push notification processor
|
||||||
|
c.initializePushProcessor()
|
||||||
|
|
||||||
return &c
|
return &c
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -787,6 +801,51 @@ func (c *Client) Options() *Options {
|
|||||||
return c.opt
|
return c.opt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// initializePushProcessor initializes the push notification processor.
|
||||||
|
func (c *Client) initializePushProcessor() {
|
||||||
|
// Initialize push processor if enabled
|
||||||
|
if c.opt.PushNotifications {
|
||||||
|
if c.opt.PushNotificationProcessor != nil {
|
||||||
|
c.pushProcessor = c.opt.PushNotificationProcessor
|
||||||
|
} else {
|
||||||
|
c.pushProcessor = NewPushNotificationProcessor(true)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterPushNotificationHandler registers a handler for a specific push notification command.
|
||||||
|
func (c *Client) RegisterPushNotificationHandler(command string, handler PushNotificationHandler) {
|
||||||
|
if c.pushProcessor != nil {
|
||||||
|
c.pushProcessor.RegisterHandler(command, handler)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterGlobalPushNotificationHandler registers a handler that will receive all push notifications.
|
||||||
|
func (c *Client) RegisterGlobalPushNotificationHandler(handler PushNotificationHandler) {
|
||||||
|
if c.pushProcessor != nil {
|
||||||
|
c.pushProcessor.RegisterGlobalHandler(handler)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterPushNotificationHandlerFunc registers a function as a handler for a specific push notification command.
|
||||||
|
func (c *Client) RegisterPushNotificationHandlerFunc(command string, handlerFunc func(ctx context.Context, notification []interface{}) bool) {
|
||||||
|
if c.pushProcessor != nil {
|
||||||
|
c.pushProcessor.RegisterHandlerFunc(command, handlerFunc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RegisterGlobalPushNotificationHandlerFunc registers a function as a global handler for all push notifications.
|
||||||
|
func (c *Client) RegisterGlobalPushNotificationHandlerFunc(handlerFunc func(ctx context.Context, notification []interface{}) bool) {
|
||||||
|
if c.pushProcessor != nil {
|
||||||
|
c.pushProcessor.RegisterGlobalHandlerFunc(handlerFunc)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetPushNotificationProcessor returns the push notification processor.
|
||||||
|
func (c *Client) GetPushNotificationProcessor() *PushNotificationProcessor {
|
||||||
|
return c.pushProcessor
|
||||||
|
}
|
||||||
|
|
||||||
type PoolStats pool.Stats
|
type PoolStats pool.Stats
|
||||||
|
|
||||||
// PoolStats returns connection pool stats.
|
// PoolStats returns connection pool stats.
|
||||||
@ -833,6 +892,12 @@ func (c *Client) pubSub() *PubSub {
|
|||||||
closeConn: c.connPool.CloseConn,
|
closeConn: c.connPool.CloseConn,
|
||||||
}
|
}
|
||||||
pubsub.init()
|
pubsub.init()
|
||||||
|
|
||||||
|
// Set the push notification processor if available
|
||||||
|
if c.pushProcessor != nil {
|
||||||
|
pubsub.SetPushNotificationProcessor(c.pushProcessor)
|
||||||
|
}
|
||||||
|
|
||||||
return pubsub
|
return pubsub
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user