1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-29 17:41:15 +03:00

feat: add proactive push notification processing to WithReader

- Add push notification processing to Conn.WithReader method
- Process notifications immediately before every read operation
- Provides proactive notification handling vs reactive processing
- Add proper error handling with internal.Logger
- Non-blocking implementation that doesn't break Redis operations
- Complements existing processing in Pool.Put and isHealthyConn

Benefits:
- Immediate processing when notifications arrive
- Called before every read operation for optimal timing
- Prevents notification backlog accumulation
- More responsive to Redis cluster changes
- Better user experience during migrations
- Optimal placement for catching asynchronous notifications

Implementation:
- Type-safe interface assertion for processor
- Context-aware error handling with logging
- Maintains backward compatibility
- Consistent with existing pool patterns
- Three-layer processing strategy: WithReader (proactive) + Pool.Put + isHealthyConn (reactive)

Use cases:
- MOVING/MIGRATING/MIGRATED notifications for slot migrations
- FAILING_OVER/FAILED_OVER notifications for failover scenarios
- Real-time cluster topology change awareness
- Improved connection utilization efficiency
This commit is contained in:
Nedyalko Dyakov
2025-06-27 22:49:39 +03:00
parent d820ade9e4
commit b6e712b41a
3 changed files with 21 additions and 14 deletions

View File

@ -7,6 +7,7 @@ import (
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/proto" "github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/internal/pushnotif" "github.com/redis/go-redis/v9/internal/pushnotif"
) )
@ -77,11 +78,23 @@ func (cn *Conn) RemoteAddr() net.Addr {
func (cn *Conn) WithReader( func (cn *Conn) WithReader(
ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error, ctx context.Context, timeout time.Duration, fn func(rd *proto.Reader) error,
) error { ) error {
// Process any pending push notifications before executing the read function
// This ensures push notifications are handled as soon as they arrive
if cn.PushNotificationProcessor != nil {
// Type assert to the processor interface
if err := cn.PushNotificationProcessor.ProcessPendingNotifications(ctx, cn.rd); err != nil {
// Log the error but don't fail the read operation
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications in WithReader: %v", err)
}
}
if timeout >= 0 { if timeout >= 0 {
if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil { if err := cn.netConn.SetReadDeadline(cn.deadline(ctx, timeout)); err != nil {
return err return err
} }
} }
return fn(cn.rd) return fn(cn.rd)
} }

View File

@ -386,7 +386,7 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
// for redis-server versions that do not support the HELLO command, // for redis-server versions that do not support the HELLO command,
// RESP2 will continue to be used. // RESP2 will continue to be used.
if err = conn.Hello(ctx, c.opt.Protocol, username, password, c.opt.ClientName).Err(); err == nil { if err = conn.Hello(ctx, c.opt.Protocol, username, password, c.opt.ClientName).Err(); err == nil {
// Authentication successful with HELLO command // Authentication successful with HELLO command
} else if !isRedisError(err) { } else if !isRedisError(err) {
// When the server responds with the RESP protocol and the result is not a normal // When the server responds with the RESP protocol and the result is not a normal
@ -534,12 +534,6 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
readReplyFunc = cmd.readRawReply readReplyFunc = cmd.readRawReply
} }
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error { if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error {
// Check for push notifications before reading the command reply
if c.opt.Protocol == 3 {
if err := c.pushProcessor.ProcessPendingNotifications(ctx, rd); err != nil {
internal.Logger.Printf(ctx, "push: error processing push notifications: %v", err)
}
}
return readReplyFunc(rd) return readReplyFunc(rd)
}); err != nil { }); err != nil {
if cmd.readTimeout() == nil { if cmd.readTimeout() == nil {
@ -813,25 +807,25 @@ func (c *Client) Options() *Options {
// initializePushProcessor initializes the push notification processor for any client type. // initializePushProcessor initializes the push notification processor for any client type.
// This is a shared helper to avoid duplication across NewClient, NewFailoverClient, and NewSentinelClient. // This is a shared helper to avoid duplication across NewClient, NewFailoverClient, and NewSentinelClient.
func initializePushProcessor(opt *Options, useVoidByDefault bool) PushNotificationProcessorInterface { func initializePushProcessor(opt *Options) PushNotificationProcessorInterface {
// Always use custom processor if provided // Always use custom processor if provided
if opt.PushNotificationProcessor != nil { if opt.PushNotificationProcessor != nil {
return opt.PushNotificationProcessor return opt.PushNotificationProcessor
} }
// For regular clients, respect the PushNotifications setting // For regular clients, respect the PushNotifications setting
if !useVoidByDefault && opt.PushNotifications { if opt.PushNotifications {
// Create default processor when push notifications are enabled // Create default processor when push notifications are enabled
return NewPushNotificationProcessor() return NewPushNotificationProcessor()
} }
// Create void processor when push notifications are disabled or for specialized clients // Create void processor when push notifications are disabled
return NewVoidPushNotificationProcessor() return NewVoidPushNotificationProcessor()
} }
// initializePushProcessor initializes the push notification processor for this client. // initializePushProcessor initializes the push notification processor for this client.
func (c *Client) initializePushProcessor() { func (c *Client) initializePushProcessor() {
c.pushProcessor = initializePushProcessor(c.opt, false) c.pushProcessor = initializePushProcessor(c.opt)
} }
// RegisterPushNotificationHandler registers a handler for a specific push notification name. // RegisterPushNotificationHandler registers a handler for a specific push notification name.
@ -987,7 +981,7 @@ func newConn(opt *Options, connPool pool.Pooler, parentHooks *hooksMixin) *Conn
// Initialize push notification processor using shared helper // Initialize push notification processor using shared helper
// Use void processor by default for connections (typically don't need push notifications) // Use void processor by default for connections (typically don't need push notifications)
c.pushProcessor = initializePushProcessor(opt, true) c.pushProcessor = initializePushProcessor(opt)
c.cmdable = c.Process c.cmdable = c.Process
c.statefulCmdable = c.Process c.statefulCmdable = c.Process

View File

@ -433,7 +433,7 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
// Initialize push notification processor using shared helper // Initialize push notification processor using shared helper
// Use void processor by default for failover clients (typically don't need push notifications) // Use void processor by default for failover clients (typically don't need push notifications)
rdb.pushProcessor = initializePushProcessor(opt, true) rdb.pushProcessor = initializePushProcessor(opt)
connPool = newConnPool(opt, rdb.dialHook) connPool = newConnPool(opt, rdb.dialHook)
rdb.connPool = connPool rdb.connPool = connPool
@ -503,7 +503,7 @@ func NewSentinelClient(opt *Options) *SentinelClient {
// Initialize push notification processor using shared helper // Initialize push notification processor using shared helper
// Use void processor by default for sentinel clients (typically don't need push notifications) // Use void processor by default for sentinel clients (typically don't need push notifications)
c.pushProcessor = initializePushProcessor(opt, true) c.pushProcessor = initializePushProcessor(opt)
c.initHooks(hooks{ c.initHooks(hooks{
dial: c.baseClient.dial, dial: c.baseClient.dial,