mirror of
https://github.com/redis/go-redis.git
synced 2025-07-18 00:20:57 +03:00
refactor(push): simplify handler context
This commit is contained in:
@ -555,8 +555,11 @@ func (c *PubSub) processPendingPushNotificationWithReader(ctx context.Context, c
|
||||
func (c *PubSub) pushNotificationHandlerContext(cn *pool.Conn) push.NotificationHandlerContext {
|
||||
// PubSub doesn't have a client or connection pool, so we pass nil for those
|
||||
// PubSub connections are blocking
|
||||
return push.HandlerContext{}
|
||||
return push.NewNotificationHandlerContext(nil, nil, c, cn, true)
|
||||
return push.NotificationHandlerContext{
|
||||
PubSub: c,
|
||||
Conn: cn,
|
||||
IsBlocking: true,
|
||||
}
|
||||
}
|
||||
|
||||
type ChannelOption func(c *channel)
|
||||
|
@ -5,85 +5,38 @@ import (
|
||||
)
|
||||
|
||||
// NotificationHandlerContext provides context information about where a push notification was received.
|
||||
// This interface allows handlers to make informed decisions based on the source of the notification
|
||||
// This struct allows handlers to make informed decisions based on the source of the notification
|
||||
// with strongly typed access to different client types using concrete types.
|
||||
type NotificationHandlerContext interface {
|
||||
// GetClient returns the Redis client instance that received the notification.
|
||||
// Returns nil if no client context is available.
|
||||
type NotificationHandlerContext struct {
|
||||
// Client is the Redis client instance that received the notification.
|
||||
// It is interface to both allow for future expansion and to avoid
|
||||
// circular dependencies. The developer is responsible for type assertion.
|
||||
// It can be one of the following types:
|
||||
// - *redis.baseClient
|
||||
// - *redis.Client
|
||||
// - *redis.ClusterClient
|
||||
// - *redis.Conn
|
||||
GetClient() interface{}
|
||||
Client interface{}
|
||||
|
||||
// GetConnPool returns the connection pool from which the connection was obtained.
|
||||
// Returns nil if no connection pool context is available.
|
||||
// ConnPool is the connection pool from which the connection was obtained.
|
||||
// It is interface to both allow for future expansion and to avoid
|
||||
// circular dependencies. The developer is responsible for type assertion.
|
||||
// It can be one of the following types:
|
||||
// - *pool.ConnPool
|
||||
// - *pool.SingleConnPool
|
||||
// - *pool.StickyConnPool
|
||||
GetConnPool() interface{}
|
||||
ConnPool interface{}
|
||||
|
||||
// GetPubSub returns the PubSub instance that received the notification.
|
||||
// Returns nil if this is not a PubSub connection.
|
||||
// PubSub is the PubSub instance that received the notification.
|
||||
// It is interface to both allow for future expansion and to avoid
|
||||
// circular dependencies. The developer is responsible for type assertion.
|
||||
// It can be one of the following types:
|
||||
// - *redis.PubSub
|
||||
GetPubSub() interface{}
|
||||
PubSub interface{}
|
||||
|
||||
// GetConn returns the specific connection on which the notification was received.
|
||||
// Returns nil if no connection context is available.
|
||||
GetConn() *pool.Conn
|
||||
// Conn is the specific connection on which the notification was received.
|
||||
Conn *pool.Conn
|
||||
|
||||
// IsBlocking returns true if the notification was received on a blocking connection.
|
||||
IsBlocking() bool
|
||||
}
|
||||
|
||||
// pushNotificationHandlerContext is the concrete implementation of PushNotificationHandlerContext interface
|
||||
type pushNotificationHandlerContext struct {
|
||||
client interface{}
|
||||
connPool interface{}
|
||||
pubSub interface{}
|
||||
conn *pool.Conn
|
||||
isBlocking bool
|
||||
}
|
||||
|
||||
// NewNotificationHandlerContext creates a new push.NotificationHandlerContext instance
|
||||
func NewNotificationHandlerContext(client, connPool, pubSub interface{}, conn *pool.Conn, isBlocking bool) NotificationHandlerContext {
|
||||
return &pushNotificationHandlerContext{
|
||||
client: client,
|
||||
connPool: connPool,
|
||||
pubSub: pubSub,
|
||||
conn: conn,
|
||||
isBlocking: isBlocking,
|
||||
}
|
||||
}
|
||||
|
||||
// GetClient returns the Redis client instance that received the notification
|
||||
func (h *pushNotificationHandlerContext) GetClient() interface{} {
|
||||
return h.client
|
||||
}
|
||||
|
||||
// GetConnPool returns the connection pool from which the connection was obtained
|
||||
func (h *pushNotificationHandlerContext) GetConnPool() interface{} {
|
||||
return h.connPool
|
||||
}
|
||||
|
||||
func (h *pushNotificationHandlerContext) GetPubSub() interface{} {
|
||||
return h.pubSub
|
||||
}
|
||||
|
||||
// GetConn returns the specific connection on which the notification was received
|
||||
func (h *pushNotificationHandlerContext) GetConn() *pool.Conn {
|
||||
return h.conn
|
||||
}
|
||||
|
||||
// IsBlocking returns true if the notification was received on a blocking connection
|
||||
func (h *pushNotificationHandlerContext) IsBlocking() bool {
|
||||
return h.isBlocking
|
||||
// IsBlocking indicates if the notification was received on a blocking connection.
|
||||
IsBlocking bool
|
||||
}
|
||||
|
@ -59,59 +59,68 @@ type MockPubSub struct {
|
||||
|
||||
// TestNotificationHandlerContext tests the handler context implementation
|
||||
func TestNotificationHandlerContext(t *testing.T) {
|
||||
t.Run("NewNotificationHandlerContext", func(t *testing.T) {
|
||||
t.Run("DirectObjectCreation", func(t *testing.T) {
|
||||
client := &MockClient{name: "test-client"}
|
||||
connPool := &MockConnPool{name: "test-pool"}
|
||||
pubSub := &MockPubSub{name: "test-pubsub"}
|
||||
conn := &pool.Conn{}
|
||||
|
||||
ctx := NewNotificationHandlerContext(client, connPool, pubSub, conn, true)
|
||||
if ctx == nil {
|
||||
t.Error("NewNotificationHandlerContext should not return nil")
|
||||
ctx := NotificationHandlerContext{
|
||||
Client: client,
|
||||
ConnPool: connPool,
|
||||
PubSub: pubSub,
|
||||
Conn: conn,
|
||||
IsBlocking: true,
|
||||
}
|
||||
|
||||
if ctx.GetClient() != client {
|
||||
t.Error("GetClient should return the provided client")
|
||||
if ctx.Client != client {
|
||||
t.Error("Client field should contain the provided client")
|
||||
}
|
||||
|
||||
if ctx.GetConnPool() != connPool {
|
||||
t.Error("GetConnPool should return the provided connection pool")
|
||||
if ctx.ConnPool != connPool {
|
||||
t.Error("ConnPool field should contain the provided connection pool")
|
||||
}
|
||||
|
||||
if ctx.GetPubSub() != pubSub {
|
||||
t.Error("GetPubSub should return the provided PubSub")
|
||||
if ctx.PubSub != pubSub {
|
||||
t.Error("PubSub field should contain the provided PubSub")
|
||||
}
|
||||
|
||||
if ctx.GetConn() != conn {
|
||||
t.Error("GetConn should return the provided connection")
|
||||
if ctx.Conn != conn {
|
||||
t.Error("Conn field should contain the provided connection")
|
||||
}
|
||||
|
||||
if !ctx.IsBlocking() {
|
||||
t.Error("IsBlocking should return true")
|
||||
if !ctx.IsBlocking {
|
||||
t.Error("IsBlocking field should be true")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("NilValues", func(t *testing.T) {
|
||||
ctx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
|
||||
if ctx.GetClient() != nil {
|
||||
t.Error("GetClient should return nil when client is nil")
|
||||
ctx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
if ctx.GetConnPool() != nil {
|
||||
t.Error("GetConnPool should return nil when connPool is nil")
|
||||
if ctx.Client != nil {
|
||||
t.Error("Client field should be nil when client is nil")
|
||||
}
|
||||
|
||||
if ctx.GetPubSub() != nil {
|
||||
t.Error("GetPubSub should return nil when pubSub is nil")
|
||||
if ctx.ConnPool != nil {
|
||||
t.Error("ConnPool field should be nil when connPool is nil")
|
||||
}
|
||||
|
||||
if ctx.GetConn() != nil {
|
||||
t.Error("GetConn should return nil when conn is nil")
|
||||
if ctx.PubSub != nil {
|
||||
t.Error("PubSub field should be nil when pubSub is nil")
|
||||
}
|
||||
|
||||
if ctx.IsBlocking() {
|
||||
t.Error("IsBlocking should return false")
|
||||
if ctx.Conn != nil {
|
||||
t.Error("Conn field should be nil when conn is nil")
|
||||
}
|
||||
|
||||
if ctx.IsBlocking {
|
||||
t.Error("IsBlocking field should be false")
|
||||
}
|
||||
})
|
||||
}
|
||||
@ -427,7 +436,13 @@ func TestProcessor(t *testing.T) {
|
||||
t.Run("ProcessPendingNotifications_NilReader", func(t *testing.T) {
|
||||
processor := NewProcessor()
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, nil)
|
||||
if err != nil {
|
||||
@ -487,7 +502,13 @@ func TestVoidProcessor(t *testing.T) {
|
||||
t.Run("ProcessPendingNotifications_NilReader", func(t *testing.T) {
|
||||
processor := NewVoidProcessor()
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, nil)
|
||||
if err != nil {
|
||||
@ -541,7 +562,13 @@ func TestNotificationHandlerInterface(t *testing.T) {
|
||||
|
||||
handler := NewTestHandler("test")
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
notification := []interface{}{"TEST", "data"}
|
||||
|
||||
err := handler.HandlePushNotification(ctx, handlerCtx, notification)
|
||||
@ -566,7 +593,13 @@ func TestNotificationHandlerError(t *testing.T) {
|
||||
handler.SetReturnError(expectedError)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
notification := []interface{}{"TEST", "data"}
|
||||
|
||||
err := handler.HandlePushNotification(ctx, handlerCtx, notification)
|
||||
@ -864,7 +897,13 @@ func TestProcessorWithFakeBuffer(t *testing.T) {
|
||||
reader := proto.NewReader(buf)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
|
||||
if err != nil {
|
||||
@ -895,7 +934,13 @@ func TestProcessorWithFakeBuffer(t *testing.T) {
|
||||
reader := proto.NewReader(buf)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
|
||||
if err != nil {
|
||||
@ -917,7 +962,13 @@ func TestProcessorWithFakeBuffer(t *testing.T) {
|
||||
reader := proto.NewReader(buf)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
|
||||
if err != nil {
|
||||
@ -936,7 +987,13 @@ func TestProcessorWithFakeBuffer(t *testing.T) {
|
||||
reader := proto.NewReader(buf)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
|
||||
if err != nil {
|
||||
@ -959,7 +1016,13 @@ func TestProcessorWithFakeBuffer(t *testing.T) {
|
||||
reader := proto.NewReader(buf)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
|
||||
if err != nil {
|
||||
@ -988,7 +1051,13 @@ func TestProcessorWithFakeBuffer(t *testing.T) {
|
||||
reader := proto.NewReader(buf)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
|
||||
if err != nil {
|
||||
@ -1025,7 +1094,13 @@ func TestProcessorWithFakeBuffer(t *testing.T) {
|
||||
reader := proto.NewReader(buf)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
|
||||
if err != nil {
|
||||
@ -1051,7 +1126,13 @@ func TestProcessorWithFakeBuffer(t *testing.T) {
|
||||
reader := proto.NewReader(buf)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
|
||||
if err != nil {
|
||||
@ -1079,7 +1160,13 @@ func TestVoidProcessorWithFakeBuffer(t *testing.T) {
|
||||
reader := proto.NewReader(buf)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
|
||||
if err != nil {
|
||||
@ -1102,7 +1189,13 @@ func TestVoidProcessorWithFakeBuffer(t *testing.T) {
|
||||
reader := proto.NewReader(buf)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
|
||||
if err != nil {
|
||||
@ -1127,7 +1220,13 @@ func TestVoidProcessorWithFakeBuffer(t *testing.T) {
|
||||
reader := proto.NewReader(buf)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
|
||||
if err != nil {
|
||||
@ -1145,7 +1244,13 @@ func TestVoidProcessorWithFakeBuffer(t *testing.T) {
|
||||
reader := proto.NewReader(buf)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
|
||||
// VoidProcessor should handle errors gracefully
|
||||
@ -1167,7 +1272,13 @@ func TestProcessorErrorHandling(t *testing.T) {
|
||||
reader := proto.NewReader(buf)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
|
||||
if err != nil {
|
||||
@ -1193,7 +1304,13 @@ func TestProcessorErrorHandling(t *testing.T) {
|
||||
reader := proto.NewReader(buf)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
|
||||
// Should handle corruption gracefully
|
||||
@ -1215,7 +1332,13 @@ func TestProcessorErrorHandling(t *testing.T) {
|
||||
reader := proto.NewReader(buf)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
|
||||
// Should handle partial data gracefully
|
||||
@ -1250,7 +1373,13 @@ func TestProcessorPerformanceWithFakeData(t *testing.T) {
|
||||
reader := proto.NewReader(buf)
|
||||
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
handlerCtx := NotificationHandlerContext{
|
||||
Client: nil,
|
||||
ConnPool: nil,
|
||||
PubSub: nil,
|
||||
Conn: nil,
|
||||
IsBlocking: false,
|
||||
}
|
||||
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, reader)
|
||||
if err != nil {
|
||||
@ -1271,8 +1400,8 @@ func TestInterfaceCompliance(t *testing.T) {
|
||||
// Test that VoidProcessor implements NotificationProcessor
|
||||
var _ NotificationProcessor = (*VoidProcessor)(nil)
|
||||
|
||||
// Test that pushNotificationHandlerContext implements NotificationHandlerContext
|
||||
var _ NotificationHandlerContext = (*pushNotificationHandlerContext)(nil)
|
||||
// Test that NotificationHandlerContext is a concrete struct (no interface needed)
|
||||
var _ NotificationHandlerContext = NotificationHandlerContext{}
|
||||
|
||||
// Test that TestHandler implements NotificationHandler
|
||||
var _ NotificationHandler = (*TestHandler)(nil)
|
||||
|
@ -1,242 +0,0 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/redis/go-redis/v9/internal/pool"
|
||||
)
|
||||
|
||||
// TestHandler implements PushNotificationHandler interface for testing
|
||||
type TestHandler struct {
|
||||
name string
|
||||
handled [][]interface{}
|
||||
returnValue bool
|
||||
}
|
||||
|
||||
func NewTestHandler(name string, returnValue bool) *TestHandler {
|
||||
return &TestHandler{
|
||||
name: name,
|
||||
handled: make([][]interface{}, 0),
|
||||
returnValue: returnValue,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *TestHandler) HandlePushNotification(ctx context.Context, handlerCtx PushNotificationHandlerContext, notification []interface{}) bool {
|
||||
h.handled = append(h.handled, notification)
|
||||
return h.returnValue
|
||||
}
|
||||
|
||||
func (h *TestHandler) GetHandledNotifications() [][]interface{} {
|
||||
return h.handled
|
||||
}
|
||||
|
||||
func (h *TestHandler) Reset() {
|
||||
h.handled = make([][]interface{}, 0)
|
||||
}
|
||||
|
||||
func TestPushNotificationRegistry(t *testing.T) {
|
||||
t.Run("NewRegistry", func(t *testing.T) {
|
||||
registry := NewRegistry()
|
||||
if registry == nil {
|
||||
t.Error("NewRegistry should not return nil")
|
||||
}
|
||||
|
||||
if len(registry.GetRegisteredPushNotificationNames()) != 0 {
|
||||
t.Error("New registry should have no registered handlers")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("RegisterHandler", func(t *testing.T) {
|
||||
registry := NewRegistry()
|
||||
handler := NewTestHandler("test", true)
|
||||
|
||||
err := registry.RegisterHandler("TEST", handler, false)
|
||||
if err != nil {
|
||||
t.Errorf("RegisterHandler should not error: %v", err)
|
||||
}
|
||||
|
||||
retrievedHandler := registry.GetHandler("TEST")
|
||||
if retrievedHandler != handler {
|
||||
t.Error("GetHandler should return the registered handler")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("UnregisterHandler", func(t *testing.T) {
|
||||
registry := NewRegistry()
|
||||
handler := NewTestHandler("test", true)
|
||||
|
||||
registry.RegisterHandler("TEST", handler, false)
|
||||
|
||||
err := registry.UnregisterHandler("TEST")
|
||||
if err != nil {
|
||||
t.Errorf("UnregisterHandler should not error: %v", err)
|
||||
}
|
||||
|
||||
retrievedHandler := registry.GetHandler("TEST")
|
||||
if retrievedHandler != nil {
|
||||
t.Error("GetHandler should return nil after unregistering")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ProtectedHandler", func(t *testing.T) {
|
||||
registry := NewRegistry()
|
||||
handler := NewTestHandler("test", true)
|
||||
|
||||
// Register protected handler
|
||||
err := registry.RegisterHandler("TEST", handler, true)
|
||||
if err != nil {
|
||||
t.Errorf("RegisterHandler should not error: %v", err)
|
||||
}
|
||||
|
||||
// Try to unregister protected handler
|
||||
err = registry.UnregisterHandler("TEST")
|
||||
if err == nil {
|
||||
t.Error("UnregisterHandler should error for protected handler")
|
||||
}
|
||||
|
||||
// Handler should still be there
|
||||
retrievedHandler := registry.GetHandler("TEST")
|
||||
if retrievedHandler != handler {
|
||||
t.Error("Protected handler should still be registered")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestPushNotificationProcessor(t *testing.T) {
|
||||
t.Run("NewProcessor", func(t *testing.T) {
|
||||
processor := NewProcessor()
|
||||
if processor == nil {
|
||||
t.Error("NewProcessor should not return nil")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("RegisterAndGetHandler", func(t *testing.T) {
|
||||
processor := NewProcessor()
|
||||
handler := NewTestHandler("test", true)
|
||||
|
||||
err := processor.RegisterHandler("TEST", handler, false)
|
||||
if err != nil {
|
||||
t.Errorf("RegisterHandler should not error: %v", err)
|
||||
}
|
||||
|
||||
retrievedHandler := processor.GetHandler("TEST")
|
||||
if retrievedHandler != handler {
|
||||
t.Error("GetHandler should return the registered handler")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestVoidProcessor(t *testing.T) {
|
||||
t.Run("NewVoidProcessor", func(t *testing.T) {
|
||||
processor := NewVoidProcessor()
|
||||
if processor == nil {
|
||||
t.Error("NewVoidProcessor should not return nil")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("GetHandler", func(t *testing.T) {
|
||||
processor := NewVoidProcessor()
|
||||
handler := processor.GetHandler("TEST")
|
||||
if handler != nil {
|
||||
t.Error("VoidProcessor GetHandler should always return nil")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("RegisterHandler", func(t *testing.T) {
|
||||
processor := NewVoidProcessor()
|
||||
handler := NewTestHandler("test", true)
|
||||
|
||||
err := processor.RegisterHandler("TEST", handler, false)
|
||||
if err == nil {
|
||||
t.Error("VoidProcessor RegisterHandler should return error")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("ProcessPendingNotifications", func(t *testing.T) {
|
||||
processor := NewVoidProcessor()
|
||||
ctx := context.Background()
|
||||
handlerCtx := NewPushNotificationHandlerContext(nil, nil, nil, nil, false)
|
||||
|
||||
// VoidProcessor should always succeed and do nothing
|
||||
err := processor.ProcessPendingNotifications(ctx, handlerCtx, nil)
|
||||
if err != nil {
|
||||
t.Errorf("VoidProcessor ProcessPendingNotifications should never error, got: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestPushNotificationHandlerContext(t *testing.T) {
|
||||
t.Run("NewHandlerContext", func(t *testing.T) {
|
||||
client := &Client{}
|
||||
connPool := &pool.ConnPool{}
|
||||
pubSub := &PubSub{}
|
||||
conn := &pool.Conn{}
|
||||
|
||||
ctx := NewPushNotificationHandlerContext(client, connPool, pubSub, conn, true)
|
||||
if ctx == nil {
|
||||
t.Error("NewPushNotificationHandlerContext should not return nil")
|
||||
}
|
||||
|
||||
if ctx.GetClient() != client {
|
||||
t.Error("GetClient should return the provided client")
|
||||
}
|
||||
|
||||
if ctx.GetConnPool() != connPool {
|
||||
t.Error("GetConnPool should return the provided connection pool")
|
||||
}
|
||||
|
||||
if ctx.GetPubSub() != pubSub {
|
||||
t.Error("GetPubSub should return the provided PubSub")
|
||||
}
|
||||
|
||||
if ctx.GetConn() != conn {
|
||||
t.Error("GetConn should return the provided connection")
|
||||
}
|
||||
|
||||
if !ctx.IsBlocking() {
|
||||
t.Error("IsBlocking should return true")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("TypedGetters", func(t *testing.T) {
|
||||
client := &Client{}
|
||||
ctx := NewPushNotificationHandlerContext(client, nil, nil, nil, false)
|
||||
|
||||
// Test regular client getter
|
||||
regularClient := ctx.GetRegularClient()
|
||||
if regularClient != client {
|
||||
t.Error("GetRegularClient should return the client when it's a regular client")
|
||||
}
|
||||
|
||||
// Test cluster client getter (should be nil for regular client)
|
||||
clusterClient := ctx.GetClusterClient()
|
||||
if clusterClient != nil {
|
||||
t.Error("GetClusterClient should return nil when client is not a cluster client")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func TestPushNotificationConstants(t *testing.T) {
|
||||
t.Run("Constants", func(t *testing.T) {
|
||||
if PushNotificationMoving != "MOVING" {
|
||||
t.Error("PushNotificationMoving should be 'MOVING'")
|
||||
}
|
||||
|
||||
if PushNotificationMigrating != "MIGRATING" {
|
||||
t.Error("PushNotificationMigrating should be 'MIGRATING'")
|
||||
}
|
||||
|
||||
if PushNotificationMigrated != "MIGRATED" {
|
||||
t.Error("PushNotificationMigrated should be 'MIGRATED'")
|
||||
}
|
||||
|
||||
if PushNotificationFailingOver != "FAILING_OVER" {
|
||||
t.Error("PushNotificationFailingOver should be 'FAILING_OVER'")
|
||||
}
|
||||
|
||||
if PushNotificationFailedOver != "FAILED_OVER" {
|
||||
t.Error("PushNotificationFailedOver should be 'FAILED_OVER'")
|
||||
}
|
||||
})
|
||||
}
|
6
redis.go
6
redis.go
@ -1130,5 +1130,9 @@ func (c *baseClient) processPendingPushNotificationWithReader(ctx context.Contex
|
||||
|
||||
// pushNotificationHandlerContext creates a handler context for push notification processing
|
||||
func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) push.NotificationHandlerContext {
|
||||
return push.NewNotificationHandlerContext(c, c.connPool, nil, cn, false)
|
||||
return push.NotificationHandlerContext{
|
||||
Client: c,
|
||||
ConnPool: c.connPool,
|
||||
Conn: cn,
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user