mirror of
https://github.com/redis/go-redis.git
synced 2025-07-20 22:42:59 +03:00
refactor: remove unnecessary enabled field and IsEnabled/SetEnabled methods
- Remove enabled field from PushNotificationProcessor struct - Remove IsEnabled() and SetEnabled() methods from processor interface - Remove enabled parameter from NewPushNotificationProcessor() - Update all interfaces in pool package to remove IsEnabled requirement - Simplify processor logic - if processor exists, it works - VoidPushNotificationProcessor handles disabled case by discarding notifications - Update all tests to use simplified interface without enable/disable logic Benefits: - Simpler, cleaner interface with less complexity - No unnecessary state management for enabled/disabled - VoidPushNotificationProcessor pattern handles disabled case elegantly - Reduced cognitive overhead - processors just work when set - Eliminates redundant enabled checks throughout codebase - More predictable behavior - set processor = it works
This commit is contained in:
@ -28,7 +28,6 @@ type Conn struct {
|
||||
|
||||
// Push notification processor for handling push notifications on this connection
|
||||
PushNotificationProcessor interface {
|
||||
IsEnabled() bool
|
||||
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
|
||||
}
|
||||
}
|
||||
|
@ -75,7 +75,6 @@ type Options struct {
|
||||
|
||||
// Push notification processor for connections
|
||||
PushNotificationProcessor interface {
|
||||
IsEnabled() bool
|
||||
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
|
||||
}
|
||||
}
|
||||
@ -391,7 +390,7 @@ func (p *ConnPool) popIdle() (*Conn, error) {
|
||||
func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
|
||||
if cn.rd.Buffered() > 0 {
|
||||
// Check if this might be push notification data
|
||||
if cn.PushNotificationProcessor != nil && cn.PushNotificationProcessor.IsEnabled() {
|
||||
if cn.PushNotificationProcessor != nil {
|
||||
// Try to process pending push notifications before discarding connection
|
||||
err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd)
|
||||
if err != nil {
|
||||
@ -555,7 +554,7 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool {
|
||||
if err := connCheck(cn.netConn); err != nil {
|
||||
// If there's unexpected data and we have push notification support,
|
||||
// it might be push notifications
|
||||
if err == errUnexpectedRead && cn.PushNotificationProcessor != nil && cn.PushNotificationProcessor.IsEnabled() {
|
||||
if err == errUnexpectedRead && cn.PushNotificationProcessor != nil {
|
||||
// Try to process any pending push notifications
|
||||
ctx := context.Background()
|
||||
if procErr := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); procErr != nil {
|
||||
|
@ -435,7 +435,6 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
|
||||
}, nil
|
||||
default:
|
||||
// Try to handle as generic push notification
|
||||
if c.pushProcessor.IsEnabled() {
|
||||
ctx := c.getContext()
|
||||
registry := c.pushProcessor.GetRegistry()
|
||||
if registry != nil {
|
||||
@ -448,7 +447,6 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, fmt.Errorf("redis: unsupported pubsub message: %q", kind)
|
||||
}
|
||||
default:
|
||||
|
@ -56,9 +56,7 @@ func TestConnectionPoolPushNotificationIntegration(t *testing.T) {
|
||||
t.Error("Connection should have push notification processor assigned")
|
||||
}
|
||||
|
||||
if !cn.PushNotificationProcessor.IsEnabled() {
|
||||
t.Error("Connection push notification processor should be enabled")
|
||||
}
|
||||
// Connection should have a processor (no need to check IsEnabled anymore)
|
||||
|
||||
// Test ProcessPendingNotifications method
|
||||
emptyReader := proto.NewReader(bytes.NewReader([]byte{}))
|
||||
@ -156,8 +154,9 @@ func TestConnPushNotificationMethods(t *testing.T) {
|
||||
t.Error("Conn should have push notification processor")
|
||||
}
|
||||
|
||||
if !processor.IsEnabled() {
|
||||
t.Error("Conn push notification processor should be enabled")
|
||||
// Processor should have a registry when enabled
|
||||
if processor.GetRegistry() == nil {
|
||||
t.Error("Conn push notification processor should have a registry when enabled")
|
||||
}
|
||||
|
||||
// Test RegisterPushNotificationHandler
|
||||
@ -218,8 +217,9 @@ func TestConnWithoutPushNotifications(t *testing.T) {
|
||||
if processor == nil {
|
||||
t.Error("Conn should always have a push notification processor")
|
||||
}
|
||||
if processor.IsEnabled() {
|
||||
t.Error("Push notification processor should be disabled for RESP2")
|
||||
// VoidPushNotificationProcessor should have nil registry
|
||||
if processor.GetRegistry() != nil {
|
||||
t.Error("VoidPushNotificationProcessor should have nil registry for RESP2")
|
||||
}
|
||||
|
||||
// Test RegisterPushNotificationHandler returns nil (no error)
|
||||
@ -242,7 +242,7 @@ func TestConnWithoutPushNotifications(t *testing.T) {
|
||||
// TestNewConnWithCustomProcessor tests newConn with custom processor in options.
|
||||
func TestNewConnWithCustomProcessor(t *testing.T) {
|
||||
// Create custom processor
|
||||
customProcessor := NewPushNotificationProcessor(true)
|
||||
customProcessor := NewPushNotificationProcessor()
|
||||
|
||||
// Create options with custom processor
|
||||
opt := &Options{
|
||||
@ -377,7 +377,7 @@ func TestPushNotificationInfoStructure(t *testing.T) {
|
||||
// TestConnectionPoolOptionsIntegration tests that pool options correctly include processor.
|
||||
func TestConnectionPoolOptionsIntegration(t *testing.T) {
|
||||
// Create processor
|
||||
processor := NewPushNotificationProcessor(true)
|
||||
processor := NewPushNotificationProcessor()
|
||||
|
||||
// Create options
|
||||
opt := &Options{
|
||||
@ -401,7 +401,7 @@ func TestConnectionPoolOptionsIntegration(t *testing.T) {
|
||||
|
||||
// TestProcessPendingNotificationsEdgeCases tests edge cases in ProcessPendingNotifications.
|
||||
func TestProcessPendingNotificationsEdgeCases(t *testing.T) {
|
||||
processor := NewPushNotificationProcessor(true)
|
||||
processor := NewPushNotificationProcessor()
|
||||
ctx := context.Background()
|
||||
|
||||
// Test with nil reader (should not panic)
|
||||
@ -417,10 +417,10 @@ func TestProcessPendingNotificationsEdgeCases(t *testing.T) {
|
||||
t.Errorf("Should not error with empty reader: %v", err)
|
||||
}
|
||||
|
||||
// Test with disabled processor
|
||||
disabledProcessor := NewPushNotificationProcessor(false)
|
||||
err = disabledProcessor.ProcessPendingNotifications(ctx, emptyReader)
|
||||
// Test with void processor (simulates disabled state)
|
||||
voidProcessor := NewVoidPushNotificationProcessor()
|
||||
err = voidProcessor.ProcessPendingNotifications(ctx, emptyReader)
|
||||
if err != nil {
|
||||
t.Errorf("Disabled processor should not error: %v", err)
|
||||
t.Errorf("Void processor should not error: %v", err)
|
||||
}
|
||||
}
|
||||
|
@ -106,8 +106,6 @@ func (r *PushNotificationRegistry) HasHandlers() bool {
|
||||
|
||||
// PushNotificationProcessorInterface defines the interface for push notification processors.
|
||||
type PushNotificationProcessorInterface interface {
|
||||
IsEnabled() bool
|
||||
SetEnabled(enabled bool)
|
||||
GetRegistry() *PushNotificationRegistry
|
||||
ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error
|
||||
RegisterHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error
|
||||
@ -116,32 +114,15 @@ type PushNotificationProcessorInterface interface {
|
||||
// PushNotificationProcessor handles the processing of push notifications from Redis.
|
||||
type PushNotificationProcessor struct {
|
||||
registry *PushNotificationRegistry
|
||||
enabled bool
|
||||
mu sync.RWMutex // Protects enabled field
|
||||
}
|
||||
|
||||
// NewPushNotificationProcessor creates a new push notification processor.
|
||||
func NewPushNotificationProcessor(enabled bool) *PushNotificationProcessor {
|
||||
func NewPushNotificationProcessor() *PushNotificationProcessor {
|
||||
return &PushNotificationProcessor{
|
||||
registry: NewPushNotificationRegistry(),
|
||||
enabled: enabled,
|
||||
}
|
||||
}
|
||||
|
||||
// IsEnabled returns whether push notification processing is enabled.
|
||||
func (p *PushNotificationProcessor) IsEnabled() bool {
|
||||
p.mu.RLock()
|
||||
defer p.mu.RUnlock()
|
||||
return p.enabled
|
||||
}
|
||||
|
||||
// SetEnabled enables or disables push notification processing.
|
||||
func (p *PushNotificationProcessor) SetEnabled(enabled bool) {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
p.enabled = enabled
|
||||
}
|
||||
|
||||
// GetRegistry returns the push notification registry.
|
||||
func (p *PushNotificationProcessor) GetRegistry() *PushNotificationRegistry {
|
||||
return p.registry
|
||||
@ -149,7 +130,7 @@ func (p *PushNotificationProcessor) GetRegistry() *PushNotificationRegistry {
|
||||
|
||||
// ProcessPendingNotifications checks for and processes any pending push notifications.
|
||||
func (p *PushNotificationProcessor) ProcessPendingNotifications(ctx context.Context, rd *proto.Reader) error {
|
||||
if !p.IsEnabled() || !p.registry.HasHandlers() {
|
||||
if !p.registry.HasHandlers() {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -252,16 +233,6 @@ func NewVoidPushNotificationProcessor() *VoidPushNotificationProcessor {
|
||||
return &VoidPushNotificationProcessor{}
|
||||
}
|
||||
|
||||
// IsEnabled always returns false for void processor.
|
||||
func (v *VoidPushNotificationProcessor) IsEnabled() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// SetEnabled is a no-op for void processor.
|
||||
func (v *VoidPushNotificationProcessor) SetEnabled(enabled bool) {
|
||||
// No-op: void processor is always disabled
|
||||
}
|
||||
|
||||
// GetRegistry returns nil for void processor since it doesn't maintain handlers.
|
||||
func (v *VoidPushNotificationProcessor) GetRegistry() *PushNotificationRegistry {
|
||||
return nil
|
||||
|
@ -87,10 +87,10 @@ func TestPushNotificationRegistry(t *testing.T) {
|
||||
|
||||
func TestPushNotificationProcessor(t *testing.T) {
|
||||
// Test the push notification processor
|
||||
processor := redis.NewPushNotificationProcessor(true)
|
||||
processor := redis.NewPushNotificationProcessor()
|
||||
|
||||
if !processor.IsEnabled() {
|
||||
t.Error("Processor should be enabled")
|
||||
if processor.GetRegistry() == nil {
|
||||
t.Error("Processor should have a registry")
|
||||
}
|
||||
|
||||
// Test registering handlers
|
||||
@ -124,10 +124,9 @@ func TestPushNotificationProcessor(t *testing.T) {
|
||||
t.Error("Specific handler should have been called")
|
||||
}
|
||||
|
||||
// Test disabling processor
|
||||
processor.SetEnabled(false)
|
||||
if processor.IsEnabled() {
|
||||
t.Error("Processor should be disabled")
|
||||
// Test that processor always has a registry (no enable/disable anymore)
|
||||
if processor.GetRegistry() == nil {
|
||||
t.Error("Processor should always have a registry")
|
||||
}
|
||||
}
|
||||
|
||||
@ -146,8 +145,8 @@ func TestClientPushNotificationIntegration(t *testing.T) {
|
||||
t.Error("Push notification processor should be initialized")
|
||||
}
|
||||
|
||||
if !processor.IsEnabled() {
|
||||
t.Error("Push notification processor should be enabled")
|
||||
if processor.GetRegistry() == nil {
|
||||
t.Error("Push notification processor should have a registry when enabled")
|
||||
}
|
||||
|
||||
// Test registering handlers through client
|
||||
@ -187,8 +186,9 @@ func TestClientWithoutPushNotifications(t *testing.T) {
|
||||
if processor == nil {
|
||||
t.Error("Push notification processor should never be nil")
|
||||
}
|
||||
if processor.IsEnabled() {
|
||||
t.Error("Push notification processor should be disabled when PushNotifications is false")
|
||||
// VoidPushNotificationProcessor should have nil registry
|
||||
if processor.GetRegistry() != nil {
|
||||
t.Error("VoidPushNotificationProcessor should have nil registry")
|
||||
}
|
||||
|
||||
// Registering handlers should not panic
|
||||
@ -215,8 +215,8 @@ func TestPushNotificationEnabledClient(t *testing.T) {
|
||||
t.Error("Push notification processor should be initialized when enabled")
|
||||
}
|
||||
|
||||
if !processor.IsEnabled() {
|
||||
t.Error("Push notification processor should be enabled")
|
||||
if processor.GetRegistry() == nil {
|
||||
t.Error("Push notification processor should have a registry when enabled")
|
||||
}
|
||||
|
||||
// Test registering a handler
|
||||
@ -561,10 +561,10 @@ func TestPushNotificationRegistrySpecificHandlerOnly(t *testing.T) {
|
||||
|
||||
func TestPushNotificationProcessorEdgeCases(t *testing.T) {
|
||||
// Test processor with disabled state
|
||||
processor := redis.NewPushNotificationProcessor(false)
|
||||
processor := redis.NewPushNotificationProcessor()
|
||||
|
||||
if processor.IsEnabled() {
|
||||
t.Error("Processor should be disabled")
|
||||
if processor.GetRegistry() == nil {
|
||||
t.Error("Processor should have a registry")
|
||||
}
|
||||
|
||||
// Test that disabled processor doesn't process notifications
|
||||
@ -587,15 +587,14 @@ func TestPushNotificationProcessorEdgeCases(t *testing.T) {
|
||||
t.Error("Handler should be called when using registry directly")
|
||||
}
|
||||
|
||||
// Test enabling processor
|
||||
processor.SetEnabled(true)
|
||||
if !processor.IsEnabled() {
|
||||
t.Error("Processor should be enabled after SetEnabled(true)")
|
||||
// Test that processor always has a registry
|
||||
if processor.GetRegistry() == nil {
|
||||
t.Error("Processor should always have a registry")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPushNotificationProcessorConvenienceMethods(t *testing.T) {
|
||||
processor := redis.NewPushNotificationProcessor(true)
|
||||
processor := redis.NewPushNotificationProcessor()
|
||||
|
||||
// Test RegisterHandler convenience method
|
||||
handlerCalled := false
|
||||
@ -822,7 +821,7 @@ func TestPushNotificationRegistryConcurrency(t *testing.T) {
|
||||
|
||||
func TestPushNotificationProcessorConcurrency(t *testing.T) {
|
||||
// Test thread safety of the processor
|
||||
processor := redis.NewPushNotificationProcessor(true)
|
||||
processor := redis.NewPushNotificationProcessor()
|
||||
|
||||
numGoroutines := 5
|
||||
numOperations := 50
|
||||
@ -845,13 +844,7 @@ func TestPushNotificationProcessorConcurrency(t *testing.T) {
|
||||
notification := []interface{}{command, "data"}
|
||||
processor.GetRegistry().HandleNotification(context.Background(), notification)
|
||||
|
||||
// Toggle processor state occasionally
|
||||
if j%20 == 0 {
|
||||
processor.SetEnabled(!processor.IsEnabled())
|
||||
}
|
||||
|
||||
// Access processor state
|
||||
processor.IsEnabled()
|
||||
processor.GetRegistry()
|
||||
}
|
||||
}(i)
|
||||
@ -898,7 +891,7 @@ func TestPushNotificationClientConcurrency(t *testing.T) {
|
||||
// Access processor
|
||||
processor := client.GetPushNotificationProcessor()
|
||||
if processor != nil {
|
||||
processor.IsEnabled()
|
||||
processor.GetRegistry()
|
||||
}
|
||||
}
|
||||
}(i)
|
||||
@ -929,8 +922,11 @@ func TestPushNotificationConnectionHealthCheck(t *testing.T) {
|
||||
|
||||
// Verify push notifications are enabled
|
||||
processor := client.GetPushNotificationProcessor()
|
||||
if processor == nil || !processor.IsEnabled() {
|
||||
t.Fatal("Push notifications should be enabled")
|
||||
if processor == nil {
|
||||
t.Fatal("Push notification processor should not be nil")
|
||||
}
|
||||
if processor.GetRegistry() == nil {
|
||||
t.Fatal("Push notification registry should not be nil when enabled")
|
||||
}
|
||||
|
||||
// Register a handler for testing
|
||||
@ -959,11 +955,6 @@ func TestPushNotificationConnectionHealthCheck(t *testing.T) {
|
||||
return
|
||||
}
|
||||
|
||||
if !cn.PushNotificationProcessor.IsEnabled() {
|
||||
t.Error("Push notification processor should be enabled on connection")
|
||||
return
|
||||
}
|
||||
|
||||
t.Log("✅ Connection has push notification processor correctly set")
|
||||
t.Log("✅ Connection health check integration working correctly")
|
||||
}
|
||||
|
4
redis.go
4
redis.go
@ -535,7 +535,7 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
|
||||
}
|
||||
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error {
|
||||
// Check for push notifications before reading the command reply
|
||||
if c.opt.Protocol == 3 && c.pushProcessor.IsEnabled() {
|
||||
if c.opt.Protocol == 3 {
|
||||
if err := c.pushProcessor.ProcessPendingNotifications(ctx, rd); err != nil {
|
||||
internal.Logger.Printf(ctx, "push: error processing push notifications: %v", err)
|
||||
}
|
||||
@ -818,7 +818,7 @@ func (c *Client) initializePushProcessor() {
|
||||
c.pushProcessor = c.opt.PushNotificationProcessor
|
||||
} else if c.opt.PushNotifications {
|
||||
// Create default processor when push notifications are enabled
|
||||
c.pushProcessor = NewPushNotificationProcessor(true)
|
||||
c.pushProcessor = NewPushNotificationProcessor()
|
||||
} else {
|
||||
// Create void processor when push notifications are disabled
|
||||
c.pushProcessor = NewVoidPushNotificationProcessor()
|
||||
|
Reference in New Issue
Block a user