mirror of
https://github.com/redis/go-redis.git
synced 2025-07-29 17:41:15 +03:00
fix: simplify api
This commit is contained in:
@ -8,6 +8,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9/internal/proto"
|
"github.com/redis/go-redis/v9/internal/proto"
|
||||||
|
"github.com/redis/go-redis/v9/internal/pushnotif"
|
||||||
)
|
)
|
||||||
|
|
||||||
var noDeadline = time.Time{}
|
var noDeadline = time.Time{}
|
||||||
@ -27,8 +28,8 @@ type Conn struct {
|
|||||||
onClose func() error
|
onClose func() error
|
||||||
|
|
||||||
// Push notification processor for handling push notifications on this connection
|
// Push notification processor for handling push notifications on this connection
|
||||||
// Uses the same interface as defined in pool.go to avoid duplication
|
// This is set when the connection is created and is a reference to the processor
|
||||||
PushNotificationProcessor PushNotificationProcessorInterface
|
PushNotificationProcessor pushnotif.ProcessorInterface
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConn(netConn net.Conn) *Conn {
|
func NewConn(netConn net.Conn) *Conn {
|
||||||
|
@ -9,7 +9,7 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9/internal"
|
"github.com/redis/go-redis/v9/internal"
|
||||||
"github.com/redis/go-redis/v9/internal/proto"
|
"github.com/redis/go-redis/v9/internal/pushnotif"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -24,8 +24,6 @@ var (
|
|||||||
ErrPoolTimeout = errors.New("redis: connection pool timeout")
|
ErrPoolTimeout = errors.New("redis: connection pool timeout")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
var timers = sync.Pool{
|
var timers = sync.Pool{
|
||||||
New: func() interface{} {
|
New: func() interface{} {
|
||||||
t := time.NewTimer(time.Hour)
|
t := time.NewTimer(time.Hour)
|
||||||
@ -62,12 +60,6 @@ type Pooler interface {
|
|||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
||||||
// PushNotificationProcessorInterface defines the interface for push notification processors.
|
|
||||||
// This matches the main PushNotificationProcessorInterface to avoid duplication while preventing circular imports.
|
|
||||||
type PushNotificationProcessorInterface interface {
|
|
||||||
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
|
|
||||||
}
|
|
||||||
|
|
||||||
type Options struct {
|
type Options struct {
|
||||||
Dialer func(context.Context) (net.Conn, error)
|
Dialer func(context.Context) (net.Conn, error)
|
||||||
|
|
||||||
@ -82,9 +74,8 @@ type Options struct {
|
|||||||
ConnMaxLifetime time.Duration
|
ConnMaxLifetime time.Duration
|
||||||
|
|
||||||
// Push notification processor for connections
|
// Push notification processor for connections
|
||||||
// This interface matches PushNotificationProcessorInterface to avoid duplication
|
// This is an interface to avoid circular imports
|
||||||
// while preventing circular imports
|
PushNotificationProcessor pushnotif.ProcessorInterface
|
||||||
PushNotificationProcessor PushNotificationProcessorInterface
|
|
||||||
|
|
||||||
// Protocol version for optimization (3 = RESP3 with push notifications, 2 = RESP2 without)
|
// Protocol version for optimization (3 = RESP3 with push notifications, 2 = RESP2 without)
|
||||||
Protocol int
|
Protocol int
|
||||||
|
@ -38,11 +38,7 @@ func (p *Processor) UnregisterHandler(pushNotificationName string) error {
|
|||||||
return p.registry.UnregisterHandler(pushNotificationName)
|
return p.registry.UnregisterHandler(pushNotificationName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRegistryForTesting returns the push notification registry for testing.
|
|
||||||
// This method should only be used by tests.
|
|
||||||
func (p *Processor) GetRegistryForTesting() *Registry {
|
|
||||||
return p.registry
|
|
||||||
}
|
|
||||||
|
|
||||||
// ProcessPendingNotifications checks for and processes any pending push notifications.
|
// ProcessPendingNotifications checks for and processes any pending push notifications.
|
||||||
func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error {
|
func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error {
|
||||||
@ -82,8 +78,17 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, rd *proto.R
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle the notification
|
// Handle the notification directly
|
||||||
p.registry.HandleNotification(ctx, notification)
|
if len(notification) > 0 {
|
||||||
|
// Extract the notification type (first element)
|
||||||
|
if notificationType, ok := notification[0].(string); ok {
|
||||||
|
// Get the handler for this notification type
|
||||||
|
if handler := p.registry.GetHandler(notificationType); handler != nil {
|
||||||
|
// Handle the notification
|
||||||
|
handler.HandlePushNotification(ctx, notification)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -108,11 +113,7 @@ func (v *VoidProcessor) RegisterHandler(pushNotificationName string, handler Han
|
|||||||
return fmt.Errorf("cannot register push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName)
|
return fmt.Errorf("cannot register push notification handler '%s': push notifications are disabled (using void processor)", pushNotificationName)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRegistryForTesting returns nil for void processor since it doesn't maintain handlers.
|
|
||||||
// This method should only be used by tests.
|
|
||||||
func (v *VoidProcessor) GetRegistryForTesting() *Registry {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// ProcessPendingNotifications for VoidProcessor does nothing since push notifications
|
// ProcessPendingNotifications for VoidProcessor does nothing since push notifications
|
||||||
// are only available in RESP3 and this processor is used when they're disabled.
|
// are only available in RESP3 and this processor is used when they're disabled.
|
||||||
|
@ -1,7 +1,6 @@
|
|||||||
package pushnotif
|
package pushnotif
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
@ -82,25 +81,4 @@ func (r *Registry) GetRegisteredPushNotificationNames() []string {
|
|||||||
return names
|
return names
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleNotification attempts to handle a push notification using registered handlers.
|
|
||||||
// Returns true if a handler was found and successfully processed the notification.
|
|
||||||
func (r *Registry) HandleNotification(ctx context.Context, notification []interface{}) bool {
|
|
||||||
if len(notification) == 0 {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extract the notification type (first element)
|
|
||||||
notificationType, ok := notification[0].(string)
|
|
||||||
if !ok {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get the handler for this notification type
|
|
||||||
handler := r.GetHandler(notificationType)
|
|
||||||
if handler == nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handle the notification
|
|
||||||
return handler.HandlePushNotification(ctx, notification)
|
|
||||||
}
|
|
||||||
|
@ -1,460 +0,0 @@
|
|||||||
package redis
|
|
||||||
|
|
||||||
import (
|
|
||||||
"bytes"
|
|
||||||
"context"
|
|
||||||
"net"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9/internal/pool"
|
|
||||||
"github.com/redis/go-redis/v9/internal/proto"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Helper function to access registry for testing
|
|
||||||
func getRegistryForTestingCoverage(processor PushNotificationProcessorInterface) *PushNotificationRegistry {
|
|
||||||
switch p := processor.(type) {
|
|
||||||
case *PushNotificationProcessor:
|
|
||||||
return p.GetRegistryForTesting()
|
|
||||||
case *VoidPushNotificationProcessor:
|
|
||||||
return p.GetRegistryForTesting()
|
|
||||||
default:
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// testHandler is a simple implementation of PushNotificationHandler for testing
|
|
||||||
type testHandler struct {
|
|
||||||
handlerFunc func(ctx context.Context, notification []interface{}) bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *testHandler) HandlePushNotification(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return h.handlerFunc(ctx, notification)
|
|
||||||
}
|
|
||||||
|
|
||||||
// newTestHandler creates a test handler from a function
|
|
||||||
func newTestHandler(f func(ctx context.Context, notification []interface{}) bool) *testHandler {
|
|
||||||
return &testHandler{handlerFunc: f}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestConnectionPoolPushNotificationIntegration tests the connection pool's
|
|
||||||
// integration with push notifications for 100% coverage.
|
|
||||||
func TestConnectionPoolPushNotificationIntegration(t *testing.T) {
|
|
||||||
// Create client with push notifications
|
|
||||||
client := NewClient(&Options{
|
|
||||||
Addr: "localhost:6379",
|
|
||||||
Protocol: 3,
|
|
||||||
PushNotifications: true,
|
|
||||||
})
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
processor := client.GetPushNotificationProcessor()
|
|
||||||
if processor == nil {
|
|
||||||
t.Fatal("Push notification processor should be available")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that connections get the processor assigned
|
|
||||||
ctx := context.Background()
|
|
||||||
connPool := client.Pool().(*pool.ConnPool)
|
|
||||||
|
|
||||||
// Get a connection and verify it has the processor
|
|
||||||
cn, err := connPool.Get(ctx)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to get connection: %v", err)
|
|
||||||
}
|
|
||||||
defer connPool.Put(ctx, cn)
|
|
||||||
|
|
||||||
if cn.PushNotificationProcessor == nil {
|
|
||||||
t.Error("Connection should have push notification processor assigned")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Connection should have a processor (no need to check IsEnabled anymore)
|
|
||||||
|
|
||||||
// Test ProcessPendingNotifications method
|
|
||||||
emptyReader := proto.NewReader(bytes.NewReader([]byte{}))
|
|
||||||
err = cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, emptyReader)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("ProcessPendingNotifications should not error with empty reader: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestConnectionPoolPutWithBufferedData tests the pool's Put method
|
|
||||||
// when connections have buffered data (push notifications).
|
|
||||||
func TestConnectionPoolPutWithBufferedData(t *testing.T) {
|
|
||||||
// Create client with push notifications
|
|
||||||
client := NewClient(&Options{
|
|
||||||
Addr: "localhost:6379",
|
|
||||||
Protocol: 3,
|
|
||||||
PushNotifications: true,
|
|
||||||
})
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
ctx := context.Background()
|
|
||||||
connPool := client.Pool().(*pool.ConnPool)
|
|
||||||
|
|
||||||
// Get a connection
|
|
||||||
cn, err := connPool.Get(ctx)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to get connection: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify connection has processor
|
|
||||||
if cn.PushNotificationProcessor == nil {
|
|
||||||
t.Error("Connection should have push notification processor")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test putting connection back (should not panic or error)
|
|
||||||
connPool.Put(ctx, cn)
|
|
||||||
|
|
||||||
// Get another connection to verify pool operations work
|
|
||||||
cn2, err := connPool.Get(ctx)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to get second connection: %v", err)
|
|
||||||
}
|
|
||||||
connPool.Put(ctx, cn2)
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestConnectionHealthCheckWithPushNotifications tests the isHealthyConn
|
|
||||||
// integration with push notifications.
|
|
||||||
func TestConnectionHealthCheckWithPushNotifications(t *testing.T) {
|
|
||||||
// Create client with push notifications
|
|
||||||
client := NewClient(&Options{
|
|
||||||
Addr: "localhost:6379",
|
|
||||||
Protocol: 3,
|
|
||||||
PushNotifications: true,
|
|
||||||
})
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
// Register a handler to ensure processor is active
|
|
||||||
err := client.RegisterPushNotificationHandler("TEST_HEALTH", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to register handler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test basic connection operations to exercise health checks
|
|
||||||
ctx := context.Background()
|
|
||||||
for i := 0; i < 5; i++ {
|
|
||||||
pong, err := client.Ping(ctx).Result()
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Ping failed: %v", err)
|
|
||||||
}
|
|
||||||
if pong != "PONG" {
|
|
||||||
t.Errorf("Expected PONG, got %s", pong)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestConnPushNotificationMethods tests all push notification methods on Conn type.
|
|
||||||
func TestConnPushNotificationMethods(t *testing.T) {
|
|
||||||
// Create client with push notifications
|
|
||||||
client := NewClient(&Options{
|
|
||||||
Addr: "localhost:6379",
|
|
||||||
Protocol: 3,
|
|
||||||
PushNotifications: true,
|
|
||||||
})
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
// Create a Conn instance
|
|
||||||
conn := client.Conn()
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
// Test GetPushNotificationProcessor
|
|
||||||
processor := conn.GetPushNotificationProcessor()
|
|
||||||
if processor == nil {
|
|
||||||
t.Error("Conn should have push notification processor")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that processor can handle handlers when enabled
|
|
||||||
testHandler := processor.GetHandler("TEST")
|
|
||||||
if testHandler != nil {
|
|
||||||
t.Error("Should not have handler for TEST initially")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test RegisterPushNotificationHandler
|
|
||||||
handler := newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
err := conn.RegisterPushNotificationHandler("TEST_CONN_HANDLER", handler, false)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Failed to register handler on Conn: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test RegisterPushNotificationHandler with function wrapper
|
|
||||||
err = conn.RegisterPushNotificationHandler("TEST_CONN_FUNC", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Failed to register handler func on Conn: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test duplicate handler error
|
|
||||||
err = conn.RegisterPushNotificationHandler("TEST_CONN_HANDLER", handler, false)
|
|
||||||
if err == nil {
|
|
||||||
t.Error("Should get error when registering duplicate handler")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that handlers work using GetHandler
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
connHandler := processor.GetHandler("TEST_CONN_HANDLER")
|
|
||||||
if connHandler == nil {
|
|
||||||
t.Error("Should have handler for TEST_CONN_HANDLER after registration")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
handled := connHandler.HandlePushNotification(ctx, []interface{}{"TEST_CONN_HANDLER", "data"})
|
|
||||||
if !handled {
|
|
||||||
t.Error("Handler should have been called")
|
|
||||||
}
|
|
||||||
|
|
||||||
funcHandler := processor.GetHandler("TEST_CONN_FUNC")
|
|
||||||
if funcHandler == nil {
|
|
||||||
t.Error("Should have handler for TEST_CONN_FUNC after registration")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
handled = funcHandler.HandlePushNotification(ctx, []interface{}{"TEST_CONN_FUNC", "data"})
|
|
||||||
if !handled {
|
|
||||||
t.Error("Handler func should have been called")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestConnWithoutPushNotifications tests Conn behavior when push notifications are disabled.
|
|
||||||
func TestConnWithoutPushNotifications(t *testing.T) {
|
|
||||||
// Create client without push notifications
|
|
||||||
client := NewClient(&Options{
|
|
||||||
Addr: "localhost:6379",
|
|
||||||
Protocol: 2, // RESP2, no push notifications
|
|
||||||
PushNotifications: false,
|
|
||||||
})
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
// Create a Conn instance
|
|
||||||
conn := client.Conn()
|
|
||||||
defer conn.Close()
|
|
||||||
|
|
||||||
// Test GetPushNotificationProcessor returns VoidPushNotificationProcessor
|
|
||||||
processor := conn.GetPushNotificationProcessor()
|
|
||||||
if processor == nil {
|
|
||||||
t.Error("Conn should always have a push notification processor")
|
|
||||||
}
|
|
||||||
// VoidPushNotificationProcessor should return nil for all handlers
|
|
||||||
handler := processor.GetHandler("TEST")
|
|
||||||
if handler != nil {
|
|
||||||
t.Error("VoidPushNotificationProcessor should return nil for all handlers")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test RegisterPushNotificationHandler returns error when push notifications are disabled
|
|
||||||
err := conn.RegisterPushNotificationHandler("TEST", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
if err == nil {
|
|
||||||
t.Error("Should return error when trying to register handler on connection with disabled push notifications")
|
|
||||||
}
|
|
||||||
if !strings.Contains(err.Error(), "push notifications are disabled") {
|
|
||||||
t.Errorf("Expected error message about disabled push notifications, got: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test RegisterPushNotificationHandler returns error for second registration too
|
|
||||||
err = conn.RegisterPushNotificationHandler("TEST2", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
if err == nil {
|
|
||||||
t.Error("Should return error when trying to register handler on connection with disabled push notifications")
|
|
||||||
}
|
|
||||||
if !strings.Contains(err.Error(), "push notifications are disabled") {
|
|
||||||
t.Errorf("Expected error message about disabled push notifications, got: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestNewConnWithCustomProcessor tests newConn with custom processor in options.
|
|
||||||
func TestNewConnWithCustomProcessor(t *testing.T) {
|
|
||||||
// Create custom processor
|
|
||||||
customProcessor := NewPushNotificationProcessor()
|
|
||||||
|
|
||||||
// Create options with custom processor
|
|
||||||
opt := &Options{
|
|
||||||
Addr: "localhost:6379",
|
|
||||||
Protocol: 3,
|
|
||||||
PushNotificationProcessor: customProcessor,
|
|
||||||
}
|
|
||||||
opt.init()
|
|
||||||
|
|
||||||
// Create a mock connection pool
|
|
||||||
connPool := newConnPool(opt, func(ctx context.Context, network, addr string) (net.Conn, error) {
|
|
||||||
return nil, nil // Mock dialer
|
|
||||||
})
|
|
||||||
|
|
||||||
// Test that newConn sets the custom processor
|
|
||||||
conn := newConn(opt, connPool, nil)
|
|
||||||
|
|
||||||
if conn.GetPushNotificationProcessor() != customProcessor {
|
|
||||||
t.Error("newConn should set custom processor from options")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestClonedClientPushNotifications tests that cloned clients preserve push notifications.
|
|
||||||
func TestClonedClientPushNotifications(t *testing.T) {
|
|
||||||
// Create original client
|
|
||||||
client := NewClient(&Options{
|
|
||||||
Addr: "localhost:6379",
|
|
||||||
Protocol: 3,
|
|
||||||
})
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
originalProcessor := client.GetPushNotificationProcessor()
|
|
||||||
if originalProcessor == nil {
|
|
||||||
t.Fatal("Original client should have push notification processor")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register handler on original
|
|
||||||
err := client.RegisterPushNotificationHandler("TEST_CLONE", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to register handler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create cloned client with timeout
|
|
||||||
clonedClient := client.WithTimeout(5 * time.Second)
|
|
||||||
defer clonedClient.Close()
|
|
||||||
|
|
||||||
// Test that cloned client has same processor
|
|
||||||
clonedProcessor := clonedClient.GetPushNotificationProcessor()
|
|
||||||
if clonedProcessor != originalProcessor {
|
|
||||||
t.Error("Cloned client should have same push notification processor")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that handlers work on cloned client using GetHandler
|
|
||||||
ctx := context.Background()
|
|
||||||
cloneHandler := clonedProcessor.GetHandler("TEST_CLONE")
|
|
||||||
if cloneHandler == nil {
|
|
||||||
t.Error("Cloned client should have TEST_CLONE handler")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
handled := cloneHandler.HandlePushNotification(ctx, []interface{}{"TEST_CLONE", "data"})
|
|
||||||
if !handled {
|
|
||||||
t.Error("Cloned client should handle notifications")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test registering new handler on cloned client
|
|
||||||
err = clonedClient.RegisterPushNotificationHandler("TEST_CLONE_NEW", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Failed to register handler on cloned client: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestPushNotificationInfoStructure tests the cleaned up PushNotificationInfo.
|
|
||||||
func TestPushNotificationInfoStructure(t *testing.T) {
|
|
||||||
// Test with various notification types
|
|
||||||
testCases := []struct {
|
|
||||||
name string
|
|
||||||
notification []interface{}
|
|
||||||
expectedCmd string
|
|
||||||
expectedArgs int
|
|
||||||
}{
|
|
||||||
{
|
|
||||||
name: "MOVING notification",
|
|
||||||
notification: []interface{}{"MOVING", "127.0.0.1:6380", "slot", "1234"},
|
|
||||||
expectedCmd: "MOVING",
|
|
||||||
expectedArgs: 3,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "MIGRATING notification",
|
|
||||||
notification: []interface{}{"MIGRATING", "time", "123456"},
|
|
||||||
expectedCmd: "MIGRATING",
|
|
||||||
expectedArgs: 2,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "MIGRATED notification",
|
|
||||||
notification: []interface{}{"MIGRATED"},
|
|
||||||
expectedCmd: "MIGRATED",
|
|
||||||
expectedArgs: 0,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "Custom notification",
|
|
||||||
notification: []interface{}{"CUSTOM_EVENT", "arg1", "arg2", "arg3"},
|
|
||||||
expectedCmd: "CUSTOM_EVENT",
|
|
||||||
expectedArgs: 3,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, tc := range testCases {
|
|
||||||
t.Run(tc.name, func(t *testing.T) {
|
|
||||||
info := ParsePushNotificationInfo(tc.notification)
|
|
||||||
|
|
||||||
if info.Name != tc.expectedCmd {
|
|
||||||
t.Errorf("Expected name %s, got %s", tc.expectedCmd, info.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(info.Args) != tc.expectedArgs {
|
|
||||||
t.Errorf("Expected %d args, got %d", tc.expectedArgs, len(info.Args))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify no unused fields exist by checking the struct only has Name and Args
|
|
||||||
// This is a compile-time check - if unused fields were added back, this would fail
|
|
||||||
_ = struct {
|
|
||||||
Name string
|
|
||||||
Args []interface{}
|
|
||||||
}{
|
|
||||||
Name: info.Name,
|
|
||||||
Args: info.Args,
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestConnectionPoolOptionsIntegration tests that pool options correctly include processor.
|
|
||||||
func TestConnectionPoolOptionsIntegration(t *testing.T) {
|
|
||||||
// Create processor
|
|
||||||
processor := NewPushNotificationProcessor()
|
|
||||||
|
|
||||||
// Create options
|
|
||||||
opt := &Options{
|
|
||||||
Addr: "localhost:6379",
|
|
||||||
Protocol: 3,
|
|
||||||
PushNotificationProcessor: processor,
|
|
||||||
}
|
|
||||||
opt.init()
|
|
||||||
|
|
||||||
// Create connection pool
|
|
||||||
connPool := newConnPool(opt, func(ctx context.Context, network, addr string) (net.Conn, error) {
|
|
||||||
return nil, nil // Mock dialer
|
|
||||||
})
|
|
||||||
|
|
||||||
// Verify the pool has the processor in its configuration
|
|
||||||
// This tests the integration between options and pool creation
|
|
||||||
if connPool == nil {
|
|
||||||
t.Error("Connection pool should be created")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestProcessPendingNotificationsEdgeCases tests edge cases in ProcessPendingNotifications.
|
|
||||||
func TestProcessPendingNotificationsEdgeCases(t *testing.T) {
|
|
||||||
processor := NewPushNotificationProcessor()
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
// Test with nil reader (should not panic)
|
|
||||||
err := processor.ProcessPendingNotifications(ctx, nil)
|
|
||||||
if err != nil {
|
|
||||||
t.Logf("ProcessPendingNotifications correctly handles nil reader: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test with empty reader
|
|
||||||
emptyReader := proto.NewReader(bytes.NewReader([]byte{}))
|
|
||||||
err = processor.ProcessPendingNotifications(ctx, emptyReader)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Should not error with empty reader: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test with void processor (simulates disabled state)
|
|
||||||
voidProcessor := NewVoidPushNotificationProcessor()
|
|
||||||
err = voidProcessor.ProcessPendingNotifications(ctx, emptyReader)
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Void processor should not error: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
@ -8,18 +8,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// PushNotificationHandler defines the interface for push notification handlers.
|
// PushNotificationHandler defines the interface for push notification handlers.
|
||||||
type PushNotificationHandler interface {
|
// This is an alias to the internal push notification handler interface.
|
||||||
// HandlePushNotification processes a push notification.
|
type PushNotificationHandler = pushnotif.Handler
|
||||||
// Returns true if the notification was handled, false otherwise.
|
|
||||||
HandlePushNotification(ctx context.Context, notification []interface{}) bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// PushNotificationProcessorInterface defines the interface for push notification processors.
|
// PushNotificationProcessorInterface defines the interface for push notification processors.
|
||||||
type PushNotificationProcessorInterface interface {
|
// This is an alias to the internal push notification processor interface.
|
||||||
GetHandler(pushNotificationName string) PushNotificationHandler
|
type PushNotificationProcessorInterface = pushnotif.ProcessorInterface
|
||||||
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
|
|
||||||
RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error
|
|
||||||
}
|
|
||||||
|
|
||||||
// PushNotificationRegistry manages push notification handlers.
|
// PushNotificationRegistry manages push notification handlers.
|
||||||
type PushNotificationRegistry struct {
|
type PushNotificationRegistry struct {
|
||||||
@ -49,8 +43,7 @@ func (r *PushNotificationRegistry) GetHandler(pushNotificationName string) PushN
|
|||||||
if handler == nil {
|
if handler == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// The handler is already a PushNotificationHandler since we store it directly
|
return handler
|
||||||
return handler.(PushNotificationHandler)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRegisteredPushNotificationNames returns a list of all registered push notification names.
|
// GetRegisteredPushNotificationNames returns a list of all registered push notification names.
|
||||||
@ -58,10 +51,7 @@ func (r *PushNotificationRegistry) GetRegisteredPushNotificationNames() []string
|
|||||||
return r.registry.GetRegisteredPushNotificationNames()
|
return r.registry.GetRegisteredPushNotificationNames()
|
||||||
}
|
}
|
||||||
|
|
||||||
// HandleNotification attempts to handle a push notification using registered handlers.
|
|
||||||
func (r *PushNotificationRegistry) HandleNotification(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return r.registry.HandleNotification(ctx, notification)
|
|
||||||
}
|
|
||||||
|
|
||||||
// PushNotificationProcessor handles push notifications with a registry of handlers.
|
// PushNotificationProcessor handles push notifications with a registry of handlers.
|
||||||
type PushNotificationProcessor struct {
|
type PushNotificationProcessor struct {
|
||||||
@ -100,12 +90,7 @@ func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Cont
|
|||||||
return p.processor.ProcessPendingNotifications(ctx, rd)
|
return p.processor.ProcessPendingNotifications(ctx, rd)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRegistryForTesting returns the push notification registry for testing.
|
|
||||||
func (p *PushNotificationProcessor) GetRegistryForTesting() *PushNotificationRegistry {
|
|
||||||
return &PushNotificationRegistry{
|
|
||||||
registry: p.processor.GetRegistryForTesting(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// VoidPushNotificationProcessor discards all push notifications without processing them.
|
// VoidPushNotificationProcessor discards all push notifications without processing them.
|
||||||
type VoidPushNotificationProcessor struct {
|
type VoidPushNotificationProcessor struct {
|
||||||
@ -134,11 +119,6 @@ func (v *VoidPushNotificationProcessor) ProcessPendingNotifications(ctx context.
|
|||||||
return v.processor.ProcessPendingNotifications(ctx, rd)
|
return v.processor.ProcessPendingNotifications(ctx, rd)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetRegistryForTesting returns nil for void processor since it doesn't maintain handlers.
|
|
||||||
func (v *VoidPushNotificationProcessor) GetRegistryForTesting() *PushNotificationRegistry {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// Redis Cluster push notification names
|
// Redis Cluster push notification names
|
||||||
|
@ -1,986 +0,0 @@
|
|||||||
package redis_test
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9"
|
|
||||||
"github.com/redis/go-redis/v9/internal/pool"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Helper function to access registry for testing
|
|
||||||
func getRegistryForTesting(processor redis.PushNotificationProcessorInterface) *redis.PushNotificationRegistry {
|
|
||||||
switch p := processor.(type) {
|
|
||||||
case *redis.PushNotificationProcessor:
|
|
||||||
return p.GetRegistryForTesting()
|
|
||||||
case *redis.VoidPushNotificationProcessor:
|
|
||||||
return p.GetRegistryForTesting()
|
|
||||||
default:
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// testHandler is a simple implementation of PushNotificationHandler for testing
|
|
||||||
type testHandler struct {
|
|
||||||
handlerFunc func(ctx context.Context, notification []interface{}) bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *testHandler) HandlePushNotification(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return h.handlerFunc(ctx, notification)
|
|
||||||
}
|
|
||||||
|
|
||||||
// newTestHandler creates a test handler from a function
|
|
||||||
func newTestHandler(f func(ctx context.Context, notification []interface{}) bool) *testHandler {
|
|
||||||
return &testHandler{handlerFunc: f}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationRegistry(t *testing.T) {
|
|
||||||
// Test the push notification registry functionality
|
|
||||||
registry := redis.NewPushNotificationRegistry()
|
|
||||||
|
|
||||||
// Test initial state
|
|
||||||
// Registry starts empty (no need to check HasHandlers anymore)
|
|
||||||
|
|
||||||
commands := registry.GetRegisteredPushNotificationNames()
|
|
||||||
if len(commands) != 0 {
|
|
||||||
t.Errorf("Expected 0 registered commands, got %d", len(commands))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test registering a specific handler
|
|
||||||
handlerCalled := false
|
|
||||||
handler := newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
handlerCalled = true
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
err := registry.RegisterHandler("TEST_COMMAND", handler, false)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to register handler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify handler was registered by checking registered names
|
|
||||||
commands = registry.GetRegisteredPushNotificationNames()
|
|
||||||
if len(commands) != 1 || commands[0] != "TEST_COMMAND" {
|
|
||||||
t.Errorf("Expected ['TEST_COMMAND'], got %v", commands)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test handling a notification
|
|
||||||
ctx := context.Background()
|
|
||||||
notification := []interface{}{"TEST_COMMAND", "arg1", "arg2"}
|
|
||||||
handled := registry.HandleNotification(ctx, notification)
|
|
||||||
|
|
||||||
if !handled {
|
|
||||||
t.Error("Notification should have been handled")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !handlerCalled {
|
|
||||||
t.Error("Handler should have been called")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test duplicate handler registration error
|
|
||||||
duplicateHandler := newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
err = registry.RegisterHandler("TEST_COMMAND", duplicateHandler, false)
|
|
||||||
if err == nil {
|
|
||||||
t.Error("Expected error when registering duplicate handler")
|
|
||||||
}
|
|
||||||
expectedError := "handler already registered for push notification: TEST_COMMAND"
|
|
||||||
if err.Error() != expectedError {
|
|
||||||
t.Errorf("Expected error '%s', got '%s'", expectedError, err.Error())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationProcessor(t *testing.T) {
|
|
||||||
// Test the push notification processor
|
|
||||||
processor := redis.NewPushNotificationProcessor()
|
|
||||||
|
|
||||||
// Test that we can get a handler (should be nil since none registered yet)
|
|
||||||
handler := processor.GetHandler("TEST")
|
|
||||||
if handler != nil {
|
|
||||||
t.Error("Should not have handler for TEST initially")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test registering handlers
|
|
||||||
handlerCalled := false
|
|
||||||
err := processor.RegisterHandler("CUSTOM_NOTIFICATION", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
handlerCalled = true
|
|
||||||
if len(notification) < 2 {
|
|
||||||
t.Error("Expected at least 2 elements in notification")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
if notification[0] != "CUSTOM_NOTIFICATION" {
|
|
||||||
t.Errorf("Expected command 'CUSTOM_NOTIFICATION', got %v", notification[0])
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to register handler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Simulate handling a notification using GetHandler
|
|
||||||
ctx := context.Background()
|
|
||||||
notification := []interface{}{"CUSTOM_NOTIFICATION", "data"}
|
|
||||||
customHandler := processor.GetHandler("CUSTOM_NOTIFICATION")
|
|
||||||
if customHandler == nil {
|
|
||||||
t.Error("Should have handler for CUSTOM_NOTIFICATION after registration")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
handled := customHandler.HandlePushNotification(ctx, notification)
|
|
||||||
|
|
||||||
if !handled {
|
|
||||||
t.Error("Notification should have been handled")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !handlerCalled {
|
|
||||||
t.Error("Specific handler should have been called")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that processor can retrieve handlers (no enable/disable anymore)
|
|
||||||
retrievedHandler := processor.GetHandler("CUSTOM_NOTIFICATION")
|
|
||||||
if retrievedHandler == nil {
|
|
||||||
t.Error("Should be able to retrieve registered handler")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestClientPushNotificationIntegration(t *testing.T) {
|
|
||||||
// Test push notification integration with Redis client
|
|
||||||
client := redis.NewClient(&redis.Options{
|
|
||||||
Addr: "localhost:6379",
|
|
||||||
Protocol: 3, // RESP3 required for push notifications
|
|
||||||
PushNotifications: true, // Enable push notifications
|
|
||||||
})
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
// Test that push processor is initialized
|
|
||||||
processor := client.GetPushNotificationProcessor()
|
|
||||||
if processor == nil {
|
|
||||||
t.Error("Push notification processor should be initialized")
|
|
||||||
}
|
|
||||||
|
|
||||||
if getRegistryForTesting(processor) == nil {
|
|
||||||
t.Error("Push notification processor should have a registry when enabled")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test registering handlers through client
|
|
||||||
handlerCalled := false
|
|
||||||
err := client.RegisterPushNotificationHandler("CUSTOM_EVENT", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
handlerCalled = true
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to register handler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Simulate notification handling
|
|
||||||
ctx := context.Background()
|
|
||||||
notification := []interface{}{"CUSTOM_EVENT", "test_data"}
|
|
||||||
handled := getRegistryForTesting(processor).HandleNotification(ctx, notification)
|
|
||||||
|
|
||||||
if !handled {
|
|
||||||
t.Error("Notification should have been handled")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !handlerCalled {
|
|
||||||
t.Error("Custom handler should have been called")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestClientWithoutPushNotifications(t *testing.T) {
|
|
||||||
// Test client without push notifications enabled (using RESP2)
|
|
||||||
client := redis.NewClient(&redis.Options{
|
|
||||||
Addr: "localhost:6379",
|
|
||||||
Protocol: 2, // RESP2 doesn't support push notifications
|
|
||||||
PushNotifications: false, // Disabled
|
|
||||||
})
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
// Push processor should be a VoidPushNotificationProcessor
|
|
||||||
processor := client.GetPushNotificationProcessor()
|
|
||||||
if processor == nil {
|
|
||||||
t.Error("Push notification processor should never be nil")
|
|
||||||
}
|
|
||||||
// VoidPushNotificationProcessor should have nil registry
|
|
||||||
if getRegistryForTesting(processor) != nil {
|
|
||||||
t.Error("VoidPushNotificationProcessor should have nil registry")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Registering handlers should return an error when push notifications are disabled
|
|
||||||
err := client.RegisterPushNotificationHandler("TEST", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
if err == nil {
|
|
||||||
t.Error("Expected error when trying to register handler on client with disabled push notifications")
|
|
||||||
}
|
|
||||||
if !strings.Contains(err.Error(), "push notifications are disabled") {
|
|
||||||
t.Errorf("Expected error message about disabled push notifications, got: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationEnabledClient(t *testing.T) {
|
|
||||||
// Test that push notifications can be enabled on a client
|
|
||||||
client := redis.NewClient(&redis.Options{
|
|
||||||
Addr: "localhost:6379",
|
|
||||||
Protocol: 3, // RESP3 required
|
|
||||||
PushNotifications: true, // Enable push notifications
|
|
||||||
})
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
// Push processor should be initialized
|
|
||||||
processor := client.GetPushNotificationProcessor()
|
|
||||||
if processor == nil {
|
|
||||||
t.Error("Push notification processor should be initialized when enabled")
|
|
||||||
}
|
|
||||||
|
|
||||||
registry := getRegistryForTesting(processor)
|
|
||||||
if registry == nil {
|
|
||||||
t.Errorf("Push notification processor should have a registry when enabled. Processor type: %T", processor)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test registering a handler
|
|
||||||
handlerCalled := false
|
|
||||||
err := client.RegisterPushNotificationHandler("TEST_NOTIFICATION", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
handlerCalled = true
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to register handler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that the handler works
|
|
||||||
ctx := context.Background()
|
|
||||||
notification := []interface{}{"TEST_NOTIFICATION", "data"}
|
|
||||||
handled := registry.HandleNotification(ctx, notification)
|
|
||||||
|
|
||||||
if !handled {
|
|
||||||
t.Error("Notification should have been handled")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !handlerCalled {
|
|
||||||
t.Error("Handler should have been called")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationProtectedHandlers(t *testing.T) {
|
|
||||||
registry := redis.NewPushNotificationRegistry()
|
|
||||||
|
|
||||||
// Register a protected handler
|
|
||||||
protectedHandler := newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
err := registry.RegisterHandler("PROTECTED_HANDLER", protectedHandler, true)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to register protected handler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register a non-protected handler
|
|
||||||
normalHandler := newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
err = registry.RegisterHandler("NORMAL_HANDLER", normalHandler, false)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to register normal handler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to unregister the protected handler - should fail
|
|
||||||
err = registry.UnregisterHandler("PROTECTED_HANDLER")
|
|
||||||
if err == nil {
|
|
||||||
t.Error("Should not be able to unregister protected handler")
|
|
||||||
}
|
|
||||||
expectedError := "cannot unregister protected handler for push notification: PROTECTED_HANDLER"
|
|
||||||
if err.Error() != expectedError {
|
|
||||||
t.Errorf("Expected error '%s', got '%s'", expectedError, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try to unregister the normal handler - should succeed
|
|
||||||
err = registry.UnregisterHandler("NORMAL_HANDLER")
|
|
||||||
if err != nil {
|
|
||||||
t.Errorf("Should be able to unregister normal handler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify protected handler is still registered
|
|
||||||
commands := registry.GetRegisteredPushNotificationNames()
|
|
||||||
if len(commands) != 1 || commands[0] != "PROTECTED_HANDLER" {
|
|
||||||
t.Errorf("Expected only protected handler to remain, got %v", commands)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify protected handler still works
|
|
||||||
ctx := context.Background()
|
|
||||||
notification := []interface{}{"PROTECTED_HANDLER", "data"}
|
|
||||||
handled := registry.HandleNotification(ctx, notification)
|
|
||||||
if !handled {
|
|
||||||
t.Error("Protected handler should still work")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationConstants(t *testing.T) {
|
|
||||||
// Test that Redis Cluster push notification constants are defined correctly
|
|
||||||
constants := map[string]string{
|
|
||||||
redis.PushNotificationMoving: "MOVING",
|
|
||||||
redis.PushNotificationMigrating: "MIGRATING",
|
|
||||||
redis.PushNotificationMigrated: "MIGRATED",
|
|
||||||
redis.PushNotificationFailingOver: "FAILING_OVER",
|
|
||||||
redis.PushNotificationFailedOver: "FAILED_OVER",
|
|
||||||
}
|
|
||||||
|
|
||||||
for constant, expected := range constants {
|
|
||||||
if constant != expected {
|
|
||||||
t.Errorf("Expected constant to equal '%s', got '%s'", expected, constant)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationInfo(t *testing.T) {
|
|
||||||
// Test push notification info parsing
|
|
||||||
notification := []interface{}{"MOVING", "127.0.0.1:6380", "30000"}
|
|
||||||
info := redis.ParsePushNotificationInfo(notification)
|
|
||||||
|
|
||||||
if info == nil {
|
|
||||||
t.Fatal("Push notification info should not be nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
if info.Name != "MOVING" {
|
|
||||||
t.Errorf("Expected name 'MOVING', got '%s'", info.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(info.Args) != 2 {
|
|
||||||
t.Errorf("Expected 2 args, got %d", len(info.Args))
|
|
||||||
}
|
|
||||||
|
|
||||||
if info.String() != "MOVING" {
|
|
||||||
t.Errorf("Expected string representation 'MOVING', got '%s'", info.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test with empty notification
|
|
||||||
emptyInfo := redis.ParsePushNotificationInfo([]interface{}{})
|
|
||||||
if emptyInfo != nil {
|
|
||||||
t.Error("Empty notification should return nil info")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test with invalid notification
|
|
||||||
invalidInfo := redis.ParsePushNotificationInfo([]interface{}{123, "invalid"})
|
|
||||||
if invalidInfo != nil {
|
|
||||||
t.Error("Invalid notification should return nil info")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPubSubWithGenericPushNotifications(t *testing.T) {
|
|
||||||
// Test that PubSub can be configured with push notification processor
|
|
||||||
client := redis.NewClient(&redis.Options{
|
|
||||||
Addr: "localhost:6379",
|
|
||||||
Protocol: 3, // RESP3 required
|
|
||||||
PushNotifications: true, // Enable push notifications
|
|
||||||
})
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
// Register a handler for custom push notifications
|
|
||||||
customNotificationReceived := false
|
|
||||||
err := client.RegisterPushNotificationHandler("CUSTOM_PUBSUB_EVENT", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
customNotificationReceived = true
|
|
||||||
t.Logf("Received custom push notification in PubSub context: %v", notification)
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to register handler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a PubSub instance
|
|
||||||
pubsub := client.Subscribe(context.Background(), "test-channel")
|
|
||||||
defer pubsub.Close()
|
|
||||||
|
|
||||||
// Verify that the PubSub instance has access to push notification processor
|
|
||||||
processor := client.GetPushNotificationProcessor()
|
|
||||||
if processor == nil {
|
|
||||||
t.Error("Push notification processor should be available")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that the processor can handle notifications
|
|
||||||
notification := []interface{}{"CUSTOM_PUBSUB_EVENT", "arg1", "arg2"}
|
|
||||||
handled := getRegistryForTesting(processor).HandleNotification(context.Background(), notification)
|
|
||||||
|
|
||||||
if !handled {
|
|
||||||
t.Error("Push notification should have been handled")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify that the custom handler was called
|
|
||||||
if !customNotificationReceived {
|
|
||||||
t.Error("Custom push notification handler should have been called")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationRegistryUnregisterHandler(t *testing.T) {
|
|
||||||
// Test unregistering handlers
|
|
||||||
registry := redis.NewPushNotificationRegistry()
|
|
||||||
|
|
||||||
// Register a handler
|
|
||||||
handlerCalled := false
|
|
||||||
handler := newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
handlerCalled = true
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
err := registry.RegisterHandler("TEST_CMD", handler, false)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to register handler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify handler is registered
|
|
||||||
commands := registry.GetRegisteredPushNotificationNames()
|
|
||||||
if len(commands) != 1 || commands[0] != "TEST_CMD" {
|
|
||||||
t.Errorf("Expected ['TEST_CMD'], got %v", commands)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test notification handling
|
|
||||||
ctx := context.Background()
|
|
||||||
notification := []interface{}{"TEST_CMD", "data"}
|
|
||||||
handled := registry.HandleNotification(ctx, notification)
|
|
||||||
|
|
||||||
if !handled {
|
|
||||||
t.Error("Notification should have been handled")
|
|
||||||
}
|
|
||||||
if !handlerCalled {
|
|
||||||
t.Error("Handler should have been called")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test unregistering the handler
|
|
||||||
registry.UnregisterHandler("TEST_CMD")
|
|
||||||
|
|
||||||
// Verify handler is unregistered
|
|
||||||
commands = registry.GetRegisteredPushNotificationNames()
|
|
||||||
if len(commands) != 0 {
|
|
||||||
t.Errorf("Expected no registered commands after unregister, got %v", commands)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reset flag and test that handler is no longer called
|
|
||||||
handlerCalled = false
|
|
||||||
handled = registry.HandleNotification(ctx, notification)
|
|
||||||
|
|
||||||
if handled {
|
|
||||||
t.Error("Notification should not be handled after unregistration")
|
|
||||||
}
|
|
||||||
if handlerCalled {
|
|
||||||
t.Error("Handler should not be called after unregistration")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test unregistering non-existent handler (should not panic)
|
|
||||||
registry.UnregisterHandler("NON_EXISTENT")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationRegistryEdgeCases(t *testing.T) {
|
|
||||||
registry := redis.NewPushNotificationRegistry()
|
|
||||||
|
|
||||||
// Test handling empty notification
|
|
||||||
ctx := context.Background()
|
|
||||||
handled := registry.HandleNotification(ctx, []interface{}{})
|
|
||||||
if handled {
|
|
||||||
t.Error("Empty notification should not be handled")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test handling notification with non-string command
|
|
||||||
handled = registry.HandleNotification(ctx, []interface{}{123, "data"})
|
|
||||||
if handled {
|
|
||||||
t.Error("Notification with non-string command should not be handled")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test handling notification with nil command
|
|
||||||
handled = registry.HandleNotification(ctx, []interface{}{nil, "data"})
|
|
||||||
if handled {
|
|
||||||
t.Error("Notification with nil command should not be handled")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test unregistering non-existent handler
|
|
||||||
registry.UnregisterHandler("NON_EXISTENT")
|
|
||||||
// Should not panic
|
|
||||||
|
|
||||||
// Test unregistering from empty command
|
|
||||||
registry.UnregisterHandler("EMPTY_CMD")
|
|
||||||
// Should not panic
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationRegistryDuplicateHandlerError(t *testing.T) {
|
|
||||||
registry := redis.NewPushNotificationRegistry()
|
|
||||||
|
|
||||||
// Test that registering duplicate handlers returns an error
|
|
||||||
handler1 := newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
handler2 := newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return false
|
|
||||||
})
|
|
||||||
|
|
||||||
// Register first handler - should succeed
|
|
||||||
err := registry.RegisterHandler("DUPLICATE_CMD", handler1, false)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("First handler registration should succeed: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register second handler for same command - should fail
|
|
||||||
err = registry.RegisterHandler("DUPLICATE_CMD", handler2, false)
|
|
||||||
if err == nil {
|
|
||||||
t.Error("Second handler registration should fail")
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedError := "handler already registered for push notification: DUPLICATE_CMD"
|
|
||||||
if err.Error() != expectedError {
|
|
||||||
t.Errorf("Expected error '%s', got '%s'", expectedError, err.Error())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify only one handler is registered
|
|
||||||
commands := registry.GetRegisteredPushNotificationNames()
|
|
||||||
if len(commands) != 1 || commands[0] != "DUPLICATE_CMD" {
|
|
||||||
t.Errorf("Expected ['DUPLICATE_CMD'], got %v", commands)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationRegistrySpecificHandlerOnly(t *testing.T) {
|
|
||||||
registry := redis.NewPushNotificationRegistry()
|
|
||||||
|
|
||||||
specificCalled := false
|
|
||||||
|
|
||||||
// Register specific handler
|
|
||||||
err := registry.RegisterHandler("SPECIFIC_CMD", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
specificCalled = true
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to register specific handler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test with specific command
|
|
||||||
ctx := context.Background()
|
|
||||||
notification := []interface{}{"SPECIFIC_CMD", "data"}
|
|
||||||
handled := registry.HandleNotification(ctx, notification)
|
|
||||||
|
|
||||||
if !handled {
|
|
||||||
t.Error("Notification should be handled")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !specificCalled {
|
|
||||||
t.Error("Specific handler should be called")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reset flag
|
|
||||||
specificCalled = false
|
|
||||||
|
|
||||||
// Test with non-specific command - should not be handled
|
|
||||||
notification = []interface{}{"OTHER_CMD", "data"}
|
|
||||||
handled = registry.HandleNotification(ctx, notification)
|
|
||||||
|
|
||||||
if handled {
|
|
||||||
t.Error("Notification should not be handled without specific handler")
|
|
||||||
}
|
|
||||||
|
|
||||||
if specificCalled {
|
|
||||||
t.Error("Specific handler should not be called for other commands")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationProcessorEdgeCases(t *testing.T) {
|
|
||||||
// Test processor with disabled state
|
|
||||||
processor := redis.NewPushNotificationProcessor()
|
|
||||||
|
|
||||||
if getRegistryForTesting(processor) == nil {
|
|
||||||
t.Error("Processor should have a registry")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that disabled processor doesn't process notifications
|
|
||||||
handlerCalled := false
|
|
||||||
processor.RegisterHandler("TEST_CMD", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
handlerCalled = true
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
|
|
||||||
// Even with handlers registered, disabled processor shouldn't process
|
|
||||||
ctx := context.Background()
|
|
||||||
notification := []interface{}{"TEST_CMD", "data"}
|
|
||||||
handled := getRegistryForTesting(processor).HandleNotification(ctx, notification)
|
|
||||||
|
|
||||||
if !handled {
|
|
||||||
t.Error("Registry should still handle notifications even when processor is disabled")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !handlerCalled {
|
|
||||||
t.Error("Handler should be called when using registry directly")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that processor always has a registry
|
|
||||||
if getRegistryForTesting(processor) == nil {
|
|
||||||
t.Error("Processor should always have a registry")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationProcessorConvenienceMethods(t *testing.T) {
|
|
||||||
processor := redis.NewPushNotificationProcessor()
|
|
||||||
|
|
||||||
// Test RegisterHandler convenience method
|
|
||||||
handlerCalled := false
|
|
||||||
handler := newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
handlerCalled = true
|
|
||||||
return true
|
|
||||||
})
|
|
||||||
|
|
||||||
err := processor.RegisterHandler("CONV_CMD", handler, false)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to register handler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test RegisterHandler convenience method with function
|
|
||||||
funcHandlerCalled := false
|
|
||||||
err = processor.RegisterHandler("FUNC_CMD", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
funcHandlerCalled = true
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to register func handler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that handlers work
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
// Test specific handler
|
|
||||||
notification := []interface{}{"CONV_CMD", "data"}
|
|
||||||
handled := getRegistryForTesting(processor).HandleNotification(ctx, notification)
|
|
||||||
|
|
||||||
if !handled {
|
|
||||||
t.Error("Notification should be handled")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !handlerCalled {
|
|
||||||
t.Error("Handler should be called")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Reset flags
|
|
||||||
handlerCalled = false
|
|
||||||
funcHandlerCalled = false
|
|
||||||
|
|
||||||
// Test func handler
|
|
||||||
notification = []interface{}{"FUNC_CMD", "data"}
|
|
||||||
handled = getRegistryForTesting(processor).HandleNotification(ctx, notification)
|
|
||||||
|
|
||||||
if !handled {
|
|
||||||
t.Error("Notification should be handled")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !funcHandlerCalled {
|
|
||||||
t.Error("Func handler should be called")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestClientPushNotificationEdgeCases(t *testing.T) {
|
|
||||||
// Test client methods when using void processor (RESP2)
|
|
||||||
client := redis.NewClient(&redis.Options{
|
|
||||||
Addr: "localhost:6379",
|
|
||||||
Protocol: 2, // RESP2 doesn't support push notifications
|
|
||||||
PushNotifications: false, // Disabled
|
|
||||||
})
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
// These should return errors when push notifications are disabled
|
|
||||||
err := client.RegisterPushNotificationHandler("TEST", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
if err == nil {
|
|
||||||
t.Error("Expected error when trying to register handler on client with disabled push notifications")
|
|
||||||
}
|
|
||||||
if !strings.Contains(err.Error(), "push notifications are disabled") {
|
|
||||||
t.Errorf("Expected error message about disabled push notifications, got: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = client.RegisterPushNotificationHandler("TEST_FUNC", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
if err == nil {
|
|
||||||
t.Error("Expected error when trying to register handler on client with disabled push notifications")
|
|
||||||
}
|
|
||||||
if !strings.Contains(err.Error(), "push notifications are disabled") {
|
|
||||||
t.Errorf("Expected error message about disabled push notifications, got: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetPushNotificationProcessor should return VoidPushNotificationProcessor
|
|
||||||
processor := client.GetPushNotificationProcessor()
|
|
||||||
if processor == nil {
|
|
||||||
t.Error("Processor should never be nil")
|
|
||||||
}
|
|
||||||
// VoidPushNotificationProcessor should have nil registry
|
|
||||||
if getRegistryForTesting(processor) != nil {
|
|
||||||
t.Error("VoidPushNotificationProcessor should have nil registry when disabled")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationHandlerFunc(t *testing.T) {
|
|
||||||
// Test the PushNotificationHandlerFunc adapter
|
|
||||||
called := false
|
|
||||||
var receivedCtx context.Context
|
|
||||||
var receivedNotification []interface{}
|
|
||||||
|
|
||||||
handlerFunc := func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
called = true
|
|
||||||
receivedCtx = ctx
|
|
||||||
receivedNotification = notification
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
handler := newTestHandler(handlerFunc)
|
|
||||||
|
|
||||||
// Test that the adapter works correctly
|
|
||||||
ctx := context.Background()
|
|
||||||
notification := []interface{}{"TEST_CMD", "arg1", "arg2"}
|
|
||||||
|
|
||||||
result := handler.HandlePushNotification(ctx, notification)
|
|
||||||
|
|
||||||
if !result {
|
|
||||||
t.Error("Handler should return true")
|
|
||||||
}
|
|
||||||
|
|
||||||
if !called {
|
|
||||||
t.Error("Handler function should be called")
|
|
||||||
}
|
|
||||||
|
|
||||||
if receivedCtx != ctx {
|
|
||||||
t.Error("Handler should receive the correct context")
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(receivedNotification) != 3 || receivedNotification[0] != "TEST_CMD" {
|
|
||||||
t.Errorf("Handler should receive the correct notification, got %v", receivedNotification)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationInfoEdgeCases(t *testing.T) {
|
|
||||||
// Test PushNotificationInfo with nil
|
|
||||||
var nilInfo *redis.PushNotificationInfo
|
|
||||||
if nilInfo.String() != "<nil>" {
|
|
||||||
t.Errorf("Expected '<nil>', got '%s'", nilInfo.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test with different argument types
|
|
||||||
notification := []interface{}{"COMPLEX_CMD", 123, true, []string{"nested", "array"}, map[string]interface{}{"key": "value"}}
|
|
||||||
info := redis.ParsePushNotificationInfo(notification)
|
|
||||||
|
|
||||||
if info == nil {
|
|
||||||
t.Fatal("Info should not be nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
if info.Name != "COMPLEX_CMD" {
|
|
||||||
t.Errorf("Expected command 'COMPLEX_CMD', got '%s'", info.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(info.Args) != 4 {
|
|
||||||
t.Errorf("Expected 4 args, got %d", len(info.Args))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify argument types are preserved
|
|
||||||
if info.Args[0] != 123 {
|
|
||||||
t.Errorf("Expected first arg to be 123, got %v", info.Args[0])
|
|
||||||
}
|
|
||||||
|
|
||||||
if info.Args[1] != true {
|
|
||||||
t.Errorf("Expected second arg to be true, got %v", info.Args[1])
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationConstantsCompleteness(t *testing.T) {
|
|
||||||
// Test that all Redis Cluster push notification constants are defined
|
|
||||||
expectedConstants := map[string]string{
|
|
||||||
// Cluster notifications only (other types removed for simplicity)
|
|
||||||
redis.PushNotificationMoving: "MOVING",
|
|
||||||
redis.PushNotificationMigrating: "MIGRATING",
|
|
||||||
redis.PushNotificationMigrated: "MIGRATED",
|
|
||||||
redis.PushNotificationFailingOver: "FAILING_OVER",
|
|
||||||
redis.PushNotificationFailedOver: "FAILED_OVER",
|
|
||||||
}
|
|
||||||
|
|
||||||
for constant, expected := range expectedConstants {
|
|
||||||
if constant != expected {
|
|
||||||
t.Errorf("Constant mismatch: expected '%s', got '%s'", expected, constant)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationRegistryConcurrency(t *testing.T) {
|
|
||||||
// Test thread safety of the registry
|
|
||||||
registry := redis.NewPushNotificationRegistry()
|
|
||||||
|
|
||||||
// Number of concurrent goroutines
|
|
||||||
numGoroutines := 10
|
|
||||||
numOperations := 100
|
|
||||||
|
|
||||||
// Channels to coordinate goroutines
|
|
||||||
done := make(chan bool, numGoroutines)
|
|
||||||
|
|
||||||
// Concurrent registration and handling
|
|
||||||
for i := 0; i < numGoroutines; i++ {
|
|
||||||
go func(id int) {
|
|
||||||
defer func() { done <- true }()
|
|
||||||
|
|
||||||
for j := 0; j < numOperations; j++ {
|
|
||||||
// Register handler (ignore errors in concurrency test)
|
|
||||||
command := fmt.Sprintf("CMD_%d_%d", id, j)
|
|
||||||
registry.RegisterHandler(command, newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
|
|
||||||
// Handle notification
|
|
||||||
notification := []interface{}{command, "data"}
|
|
||||||
registry.HandleNotification(context.Background(), notification)
|
|
||||||
|
|
||||||
// Check registry state
|
|
||||||
registry.GetRegisteredPushNotificationNames()
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for all goroutines to complete
|
|
||||||
for i := 0; i < numGoroutines; i++ {
|
|
||||||
<-done
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify registry is still functional
|
|
||||||
commands := registry.GetRegisteredPushNotificationNames()
|
|
||||||
if len(commands) == 0 {
|
|
||||||
t.Error("Registry should have registered commands after concurrent operations")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationProcessorConcurrency(t *testing.T) {
|
|
||||||
// Test thread safety of the processor
|
|
||||||
processor := redis.NewPushNotificationProcessor()
|
|
||||||
|
|
||||||
numGoroutines := 5
|
|
||||||
numOperations := 50
|
|
||||||
|
|
||||||
done := make(chan bool, numGoroutines)
|
|
||||||
|
|
||||||
// Concurrent processor operations
|
|
||||||
for i := 0; i < numGoroutines; i++ {
|
|
||||||
go func(id int) {
|
|
||||||
defer func() { done <- true }()
|
|
||||||
|
|
||||||
for j := 0; j < numOperations; j++ {
|
|
||||||
// Register handlers (ignore errors in concurrency test)
|
|
||||||
command := fmt.Sprintf("PROC_CMD_%d_%d", id, j)
|
|
||||||
processor.RegisterHandler(command, newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
|
|
||||||
// Handle notifications
|
|
||||||
notification := []interface{}{command, "data"}
|
|
||||||
getRegistryForTesting(processor).HandleNotification(context.Background(), notification)
|
|
||||||
|
|
||||||
// Access processor state
|
|
||||||
getRegistryForTesting(processor)
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for all goroutines to complete
|
|
||||||
for i := 0; i < numGoroutines; i++ {
|
|
||||||
<-done
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify processor is still functional
|
|
||||||
registry := getRegistryForTesting(processor)
|
|
||||||
if registry == nil {
|
|
||||||
t.Error("Processor registry should not be nil after concurrent operations")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPushNotificationClientConcurrency(t *testing.T) {
|
|
||||||
// Test thread safety of client push notification methods
|
|
||||||
client := redis.NewClient(&redis.Options{
|
|
||||||
Addr: "localhost:6379",
|
|
||||||
Protocol: 3,
|
|
||||||
PushNotifications: true,
|
|
||||||
})
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
numGoroutines := 5
|
|
||||||
numOperations := 20
|
|
||||||
|
|
||||||
done := make(chan bool, numGoroutines)
|
|
||||||
|
|
||||||
// Concurrent client operations
|
|
||||||
for i := 0; i < numGoroutines; i++ {
|
|
||||||
go func(id int) {
|
|
||||||
defer func() { done <- true }()
|
|
||||||
|
|
||||||
for j := 0; j < numOperations; j++ {
|
|
||||||
// Register handlers concurrently (ignore errors in concurrency test)
|
|
||||||
command := fmt.Sprintf("CLIENT_CMD_%d_%d", id, j)
|
|
||||||
client.RegisterPushNotificationHandler(command, newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
|
|
||||||
// Access processor
|
|
||||||
processor := client.GetPushNotificationProcessor()
|
|
||||||
if processor != nil {
|
|
||||||
getRegistryForTesting(processor)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(i)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wait for all goroutines to complete
|
|
||||||
for i := 0; i < numGoroutines; i++ {
|
|
||||||
<-done
|
|
||||||
}
|
|
||||||
|
|
||||||
// Verify client is still functional
|
|
||||||
processor := client.GetPushNotificationProcessor()
|
|
||||||
if processor == nil {
|
|
||||||
t.Error("Client processor should not be nil after concurrent operations")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TestPushNotificationConnectionHealthCheck tests that connections with push notification
|
|
||||||
// processors are properly configured and that the connection health check integration works.
|
|
||||||
func TestPushNotificationConnectionHealthCheck(t *testing.T) {
|
|
||||||
// Create a client with push notifications enabled
|
|
||||||
client := redis.NewClient(&redis.Options{
|
|
||||||
Addr: "localhost:6379",
|
|
||||||
Protocol: 3,
|
|
||||||
PushNotifications: true,
|
|
||||||
})
|
|
||||||
defer client.Close()
|
|
||||||
|
|
||||||
// Verify push notifications are enabled
|
|
||||||
processor := client.GetPushNotificationProcessor()
|
|
||||||
if processor == nil {
|
|
||||||
t.Fatal("Push notification processor should not be nil")
|
|
||||||
}
|
|
||||||
if getRegistryForTesting(processor) == nil {
|
|
||||||
t.Fatal("Push notification registry should not be nil when enabled")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Register a handler for testing
|
|
||||||
err := client.RegisterPushNotificationHandler("TEST_CONNCHECK", newTestHandler(func(ctx context.Context, notification []interface{}) bool {
|
|
||||||
t.Logf("Received test notification: %v", notification)
|
|
||||||
return true
|
|
||||||
}), false)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to register handler: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Test that connections have the push notification processor set
|
|
||||||
ctx := context.Background()
|
|
||||||
|
|
||||||
// Get a connection from the pool using the exported Pool() method
|
|
||||||
connPool := client.Pool().(*pool.ConnPool)
|
|
||||||
cn, err := connPool.Get(ctx)
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("Failed to get connection: %v", err)
|
|
||||||
}
|
|
||||||
defer connPool.Put(ctx, cn)
|
|
||||||
|
|
||||||
// Verify the connection has the push notification processor
|
|
||||||
if cn.PushNotificationProcessor == nil {
|
|
||||||
t.Error("Connection should have push notification processor set")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
t.Log("✅ Connection has push notification processor correctly set")
|
|
||||||
t.Log("✅ Connection health check integration working correctly")
|
|
||||||
}
|
|
Reference in New Issue
Block a user