1
0
mirror of https://github.com/redis/go-redis.git synced 2025-10-18 22:08:50 +03:00

feat: RESP3 notifications support & Hitless notifications handling [CAE-1088] & [CAE-1072] (#3418)

- Adds support for handling push notifications with RESP3. 
- Using this support adds handlers for hitless upgrades.

---------

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Hristo Temelski <hristo.temelski@redis.com>
This commit is contained in:
Nedyalko Dyakov
2025-09-10 22:18:01 +03:00
committed by GitHub
parent 2da6ca07c0
commit 0ef6d0727d
70 changed files with 11668 additions and 596 deletions

413
redis.go
View File

@@ -10,10 +10,12 @@ import (
"time"
"github.com/redis/go-redis/v9/auth"
"github.com/redis/go-redis/v9/hitless"
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/hscan"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/push"
)
// Scanner internal/hscan.Scanner exposed interface.
@@ -23,6 +25,7 @@ type Scanner = hscan.Scanner
const Nil = proto.Nil
// SetLogger set custom log
// Use with VoidLogger to disable logging.
func SetLogger(logger internal.Logging) {
internal.Logger = logger
}
@@ -202,16 +205,35 @@ func (hs *hooksMixin) processTxPipelineHook(ctx context.Context, cmds []Cmder) e
//------------------------------------------------------------------------------
type baseClient struct {
opt *Options
connPool pool.Pooler
opt *Options
optLock sync.RWMutex
connPool pool.Pooler
pubSubPool *pool.PubSubPool
hooksMixin
onClose func() error // hook called when client is closed
// Push notification processing
pushProcessor push.NotificationProcessor
// Hitless upgrade manager
hitlessManager *hitless.HitlessManager
hitlessManagerLock sync.RWMutex
}
func (c *baseClient) clone() *baseClient {
clone := *c
return &clone
c.hitlessManagerLock.RLock()
hitlessManager := c.hitlessManager
c.hitlessManagerLock.RUnlock()
clone := &baseClient{
opt: c.opt,
connPool: c.connPool,
onClose: c.onClose,
pushProcessor: c.pushProcessor,
hitlessManager: hitlessManager,
}
return clone
}
func (c *baseClient) withTimeout(timeout time.Duration) *baseClient {
@@ -229,21 +251,6 @@ func (c *baseClient) String() string {
return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
}
func (c *baseClient) newConn(ctx context.Context) (*pool.Conn, error) {
cn, err := c.connPool.NewConn(ctx)
if err != nil {
return nil, err
}
err = c.initConn(ctx, cn)
if err != nil {
_ = c.connPool.CloseConn(cn)
return nil, err
}
return cn, nil
}
func (c *baseClient) getConn(ctx context.Context) (*pool.Conn, error) {
if c.opt.Limiter != nil {
err := c.opt.Limiter.Allow()
@@ -269,7 +276,7 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
return nil, err
}
if cn.Inited {
if cn.IsInited() {
return cn, nil
}
@@ -351,12 +358,10 @@ func (c *baseClient) wrappedOnClose(newOnClose func() error) func() error {
}
func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
if cn.Inited {
if !cn.Inited.CompareAndSwap(false, true) {
return nil
}
var err error
cn.Inited = true
connPool := pool.NewSingleConnPool(c.connPool, cn)
conn := newConn(c.opt, connPool, &c.hooksMixin)
@@ -425,6 +430,51 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
return fmt.Errorf("failed to initialize connection options: %w", err)
}
// Enable maintenance notifications if hitless upgrades are configured
c.optLock.RLock()
hitlessEnabled := c.opt.HitlessUpgradeConfig != nil && c.opt.HitlessUpgradeConfig.Mode != hitless.MaintNotificationsDisabled
protocol := c.opt.Protocol
endpointType := c.opt.HitlessUpgradeConfig.EndpointType
c.optLock.RUnlock()
var hitlessHandshakeErr error
if hitlessEnabled && protocol == 3 {
hitlessHandshakeErr = conn.ClientMaintNotifications(
ctx,
true,
endpointType.String(),
).Err()
if hitlessHandshakeErr != nil {
if !isRedisError(hitlessHandshakeErr) {
// if not redis error, fail the connection
return hitlessHandshakeErr
}
c.optLock.Lock()
// handshake failed - check and modify config atomically
switch c.opt.HitlessUpgradeConfig.Mode {
case hitless.MaintNotificationsEnabled:
// enabled mode, fail the connection
c.optLock.Unlock()
return fmt.Errorf("failed to enable maintenance notifications: %w", hitlessHandshakeErr)
default: // will handle auto and any other
internal.Logger.Printf(ctx, "hitless: auto mode fallback: hitless upgrades disabled due to handshake error: %v", hitlessHandshakeErr)
c.opt.HitlessUpgradeConfig.Mode = hitless.MaintNotificationsDisabled
c.optLock.Unlock()
// auto mode, disable hitless upgrades and continue
if err := c.disableHitlessUpgrades(); err != nil {
// Log error but continue - auto mode should be resilient
internal.Logger.Printf(ctx, "hitless: failed to disable hitless upgrades in auto mode: %v", err)
}
}
} else {
// handshake was executed successfully
// to make sure that the handshake will be executed on other connections as well if it was successfully
// executed on this connection, we will force the handshake to be executed on all connections
c.optLock.Lock()
c.opt.HitlessUpgradeConfig.Mode = hitless.MaintNotificationsEnabled
c.optLock.Unlock()
}
}
if !c.opt.DisableIdentity && !c.opt.DisableIndentity {
libName := ""
libVer := Version()
@@ -441,6 +491,12 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
}
}
cn.SetUsable(true)
cn.Inited.Store(true)
// Set the connection initialization function for potential reconnections
cn.SetInitConnFunc(c.createInitConnFunc())
if c.opt.OnConnect != nil {
return c.opt.OnConnect(ctx, conn)
}
@@ -456,6 +512,10 @@ func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error)
if isBadConn(err, false, c.opt.Addr) {
c.connPool.Remove(ctx, cn, err)
} else {
// process any pending push notifications before returning the connection to the pool
if err := c.processPushNotifications(ctx, cn); err != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications before releasing connection: %v", err)
}
c.connPool.Put(ctx, cn)
}
}
@@ -497,16 +557,16 @@ func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
return lastErr
}
func (c *baseClient) assertUnstableCommand(cmd Cmder) bool {
func (c *baseClient) assertUnstableCommand(cmd Cmder) (bool, error) {
switch cmd.(type) {
case *AggregateCmd, *FTInfoCmd, *FTSpellCheckCmd, *FTSearchCmd, *FTSynDumpCmd:
if c.opt.UnstableResp3 {
return true
return true, nil
} else {
panic("RESP3 responses for this command are disabled because they may still change. Please set the flag UnstableResp3 . See the [README](https://github.com/redis/go-redis/blob/master/README.md) and the release notes for guidance.")
return false, fmt.Errorf("RESP3 responses for this command are disabled because they may still change. Please set the flag UnstableResp3. See the README and the release notes for guidance")
}
default:
return false
return false, nil
}
}
@@ -519,6 +579,11 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
retryTimeout := uint32(0)
if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
// Process any pending push notifications before executing the command
if err := c.processPushNotifications(ctx, cn); err != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications before command: %v", err)
}
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmd(wr, cmd)
}); err != nil {
@@ -527,10 +592,22 @@ func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool
}
readReplyFunc := cmd.readReply
// Apply unstable RESP3 search module.
if c.opt.Protocol != 2 && c.assertUnstableCommand(cmd) {
readReplyFunc = cmd.readRawReply
if c.opt.Protocol != 2 {
useRawReply, err := c.assertUnstableCommand(cmd)
if err != nil {
return err
}
if useRawReply {
readReplyFunc = cmd.readRawReply
}
}
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), readReplyFunc); err != nil {
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
return readReplyFunc(rd)
}); err != nil {
if cmd.readTimeout() == nil {
atomic.StoreUint32(&retryTimeout, 1)
} else {
@@ -573,19 +650,76 @@ func (c *baseClient) context(ctx context.Context) context.Context {
return context.Background()
}
// createInitConnFunc creates a connection initialization function that can be used for reconnections.
func (c *baseClient) createInitConnFunc() func(context.Context, *pool.Conn) error {
return func(ctx context.Context, cn *pool.Conn) error {
return c.initConn(ctx, cn)
}
}
// enableHitlessUpgrades initializes the hitless upgrade manager and pool hook.
// This function is called during client initialization.
// will register push notification handlers for all hitless upgrade events.
// will start background workers for handoff processing in the pool hook.
func (c *baseClient) enableHitlessUpgrades() error {
// Create client adapter
clientAdapterInstance := newClientAdapter(c)
// Create hitless manager directly
manager, err := hitless.NewHitlessManager(clientAdapterInstance, c.connPool, c.opt.HitlessUpgradeConfig)
if err != nil {
return err
}
// Set the manager reference and initialize pool hook
c.hitlessManagerLock.Lock()
c.hitlessManager = manager
c.hitlessManagerLock.Unlock()
// Initialize pool hook (safe to call without lock since manager is now set)
manager.InitPoolHook(c.dialHook)
return nil
}
func (c *baseClient) disableHitlessUpgrades() error {
c.hitlessManagerLock.Lock()
defer c.hitlessManagerLock.Unlock()
// Close the hitless manager
if c.hitlessManager != nil {
// Closing the manager will also shutdown the pool hook
// and remove it from the pool
c.hitlessManager.Close()
c.hitlessManager = nil
}
return nil
}
// Close closes the client, releasing any open resources.
//
// It is rare to Close a Client, as the Client is meant to be
// long-lived and shared between many goroutines.
func (c *baseClient) Close() error {
var firstErr error
// Close hitless manager first
if err := c.disableHitlessUpgrades(); err != nil {
firstErr = err
}
if c.onClose != nil {
if err := c.onClose(); err != nil {
if err := c.onClose(); err != nil && firstErr == nil {
firstErr = err
}
}
if err := c.connPool.Close(); err != nil && firstErr == nil {
firstErr = err
if c.connPool != nil {
if err := c.connPool.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
if c.pubSubPool != nil {
if err := c.pubSubPool.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}
@@ -625,6 +759,10 @@ func (c *baseClient) generalProcessPipeline(
// Enable retries by default to retry dial errors returned by withConn.
canRetry := true
lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
// Process any pending push notifications before executing the pipeline
if err := c.processPushNotifications(ctx, cn); err != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications before processing pipeline: %v", err)
}
var err error
canRetry, err = p(ctx, cn, cmds)
return err
@@ -640,6 +778,11 @@ func (c *baseClient) generalProcessPipeline(
func (c *baseClient) pipelineProcessCmds(
ctx context.Context, cn *pool.Conn, cmds []Cmder,
) (bool, error) {
// Process any pending push notifications before executing the pipeline
if err := c.processPushNotifications(ctx, cn); err != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications before writing pipeline: %v", err)
}
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds)
}); err != nil {
@@ -648,7 +791,8 @@ func (c *baseClient) pipelineProcessCmds(
}
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
return pipelineReadCmds(rd, cmds)
// read all replies
return c.pipelineReadCmds(ctx, cn, rd, cmds)
}); err != nil {
return true, err
}
@@ -656,8 +800,12 @@ func (c *baseClient) pipelineProcessCmds(
return false, nil
}
func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
func (c *baseClient) pipelineReadCmds(ctx context.Context, cn *pool.Conn, rd *proto.Reader, cmds []Cmder) error {
for i, cmd := range cmds {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
err := cmd.readReply(rd)
cmd.SetErr(err)
if err != nil && !isRedisError(err) {
@@ -672,6 +820,11 @@ func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
func (c *baseClient) txPipelineProcessCmds(
ctx context.Context, cn *pool.Conn, cmds []Cmder,
) (bool, error) {
// Process any pending push notifications before executing the transaction pipeline
if err := c.processPushNotifications(ctx, cn); err != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications before transaction: %v", err)
}
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds)
}); err != nil {
@@ -684,12 +837,13 @@ func (c *baseClient) txPipelineProcessCmds(
// Trim multi and exec.
trimmedCmds := cmds[1 : len(cmds)-1]
if err := txPipelineReadQueued(rd, statusCmd, trimmedCmds); err != nil {
if err := c.txPipelineReadQueued(ctx, cn, rd, statusCmd, trimmedCmds); err != nil {
setCmdsErr(cmds, err)
return err
}
return pipelineReadCmds(rd, trimmedCmds)
// Read replies.
return c.pipelineReadCmds(ctx, cn, rd, trimmedCmds)
}); err != nil {
return false, err
}
@@ -697,14 +851,24 @@ func (c *baseClient) txPipelineProcessCmds(
return false, nil
}
func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error {
// txPipelineReadQueued reads queued replies from the Redis server.
// It returns an error if the server returns an error or if the number of replies does not match the number of commands.
func (c *baseClient) txPipelineReadQueued(ctx context.Context, cn *pool.Conn, rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
// Parse +OK.
if err := statusCmd.readReply(rd); err != nil {
return err
}
// Parse +QUEUED.
for _, cmd := range cmds {
for _, cmd := range cmds {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
if err := statusCmd.readReply(rd); err != nil {
cmd.SetErr(err)
if !isRedisError(err) {
@@ -713,6 +877,10 @@ func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder)
}
}
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
// Parse number of replies.
line, err := rd.ReadLine()
if err != nil {
@@ -746,15 +914,56 @@ func NewClient(opt *Options) *Client {
if opt == nil {
panic("redis: NewClient nil options")
}
// clone to not share options with the caller
opt = opt.clone()
opt.init()
// Push notifications are always enabled for RESP3 (cannot be disabled)
c := Client{
baseClient: &baseClient{
opt: opt,
},
}
c.init()
c.connPool = newConnPool(opt, c.dialHook)
// Initialize push notification processor using shared helper
// Use void processor for RESP2 connections (push notifications not available)
c.pushProcessor = initializePushProcessor(opt)
// set opt push processor for child clients
c.opt.PushNotificationProcessor = c.pushProcessor
// Create connection pools
var err error
c.connPool, err = newConnPool(opt, c.dialHook)
if err != nil {
panic(fmt.Errorf("redis: failed to create connection pool: %w", err))
}
c.pubSubPool, err = newPubSubPool(opt, c.dialHook)
if err != nil {
panic(fmt.Errorf("redis: failed to create pubsub pool: %w", err))
}
// Initialize hitless upgrades first if enabled and protocol is RESP3
if opt.HitlessUpgradeConfig != nil && opt.HitlessUpgradeConfig.Mode != hitless.MaintNotificationsDisabled && opt.Protocol == 3 {
err := c.enableHitlessUpgrades()
if err != nil {
internal.Logger.Printf(context.Background(), "hitless: failed to initialize hitless upgrades: %v", err)
if opt.HitlessUpgradeConfig.Mode == hitless.MaintNotificationsEnabled {
/*
Design decision: panic here to fail fast if hitless upgrades cannot be enabled when explicitly requested.
We choose to panic instead of returning an error to avoid breaking the existing client API, which does not expect
an error from NewClient. This ensures that misconfiguration or critical initialization failures are surfaced
immediately, rather than allowing the client to continue in a partially initialized or inconsistent state.
Clients relying on hitless upgrades should be aware that initialization errors will cause a panic, and should
handle this accordingly (e.g., via recover or by validating configuration before calling NewClient).
This approach is only used when HitlessUpgradeConfig.Mode is MaintNotificationsEnabled, indicating that hitless
upgrades are required for correct operation. In other modes, initialization failures are logged but do not panic.
*/
panic(fmt.Errorf("failed to enable hitless upgrades: %w", err))
}
}
}
return &c
}
@@ -791,11 +1000,51 @@ func (c *Client) Options() *Options {
return c.opt
}
// GetHitlessManager returns the hitless manager instance for monitoring and control.
// Returns nil if hitless upgrades are not enabled.
func (c *Client) GetHitlessManager() *hitless.HitlessManager {
c.hitlessManagerLock.RLock()
defer c.hitlessManagerLock.RUnlock()
return c.hitlessManager
}
// initializePushProcessor initializes the push notification processor for any client type.
// This is a shared helper to avoid duplication across NewClient, NewFailoverClient, and NewSentinelClient.
func initializePushProcessor(opt *Options) push.NotificationProcessor {
// Always use custom processor if provided
if opt.PushNotificationProcessor != nil {
return opt.PushNotificationProcessor
}
// Push notifications are always enabled for RESP3, disabled for RESP2
if opt.Protocol == 3 {
// Create default processor for RESP3 connections
return NewPushNotificationProcessor()
}
// Create void processor for RESP2 connections (push notifications not available)
return NewVoidPushNotificationProcessor()
}
// RegisterPushNotificationHandler registers a handler for a specific push notification name.
// Returns an error if a handler is already registered for this push notification name.
// If protected is true, the handler cannot be unregistered.
func (c *Client) RegisterPushNotificationHandler(pushNotificationName string, handler push.NotificationHandler, protected bool) error {
return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected)
}
// GetPushNotificationHandler returns the handler for a specific push notification name.
// Returns nil if no handler is registered for the given name.
func (c *Client) GetPushNotificationHandler(pushNotificationName string) push.NotificationHandler {
return c.pushProcessor.GetHandler(pushNotificationName)
}
type PoolStats pool.Stats
// PoolStats returns connection pool stats.
func (c *Client) PoolStats() *PoolStats {
stats := c.connPool.Stats()
stats.PubSubStats = *(c.pubSubPool.Stats())
return (*PoolStats)(stats)
}
@@ -830,13 +1079,31 @@ func (c *Client) TxPipeline() Pipeliner {
func (c *Client) pubSub() *PubSub {
pubsub := &PubSub{
opt: c.opt,
newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
return c.newConn(ctx)
newConn: func(ctx context.Context, addr string, channels []string) (*pool.Conn, error) {
cn, err := c.pubSubPool.NewConn(ctx, c.opt.Network, addr, channels)
if err != nil {
return nil, err
}
// will return nil if already initialized
err = c.initConn(ctx, cn)
if err != nil {
_ = cn.Close()
return nil, err
}
// Track connection in PubSubPool
c.pubSubPool.TrackConn(cn)
return cn, nil
},
closeConn: c.connPool.CloseConn,
closeConn: func(cn *pool.Conn) error {
// Untrack connection from PubSubPool
c.pubSubPool.UntrackConn(cn)
_ = cn.Close()
return nil
},
pushProcessor: c.pushProcessor,
}
pubsub.init()
return pubsub
}
@@ -920,6 +1187,10 @@ func newConn(opt *Options, connPool pool.Pooler, parentHooks *hooksMixin) *Conn
c.hooksMixin = parentHooks.clone()
}
// Initialize push notification processor using shared helper
// Use void processor for RESP2 connections (push notifications not available)
c.pushProcessor = initializePushProcessor(opt)
c.cmdable = c.Process
c.statefulCmdable = c.Process
c.initHooks(hooks{
@@ -938,6 +1209,13 @@ func (c *Conn) Process(ctx context.Context, cmd Cmder) error {
return err
}
// RegisterPushNotificationHandler registers a handler for a specific push notification name.
// Returns an error if a handler is already registered for this push notification name.
// If protected is true, the handler cannot be unregistered.
func (c *Conn) RegisterPushNotificationHandler(pushNotificationName string, handler push.NotificationHandler, protected bool) error {
return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected)
}
func (c *Conn) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(ctx, fn)
}
@@ -965,3 +1243,50 @@ func (c *Conn) TxPipeline() Pipeliner {
pipe.init()
return &pipe
}
// processPushNotifications processes all pending push notifications on a connection
// This ensures that cluster topology changes are handled immediately before the connection is used
// This method should be called by the client before using WithReader for command execution
func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn) error {
// Only process push notifications for RESP3 connections with a processor
// Also check if there is any data to read before processing
// Which is an optimization on UNIX systems where MaybeHasData is a syscall
// On Windows, MaybeHasData always returns true, so this check is a no-op
if c.opt.Protocol != 3 || c.pushProcessor == nil || !cn.MaybeHasData() {
return nil
}
// Use WithReader to access the reader and process push notifications
// This is critical for hitless upgrades to work properly
// NOTE: almost no timeouts are set for this read, so it should not block
// longer than necessary, 10us should be plenty of time to read if there are any push notifications
// on the socket.
return cn.WithReader(ctx, 10*time.Microsecond, func(rd *proto.Reader) error {
// Create handler context with client, connection pool, and connection information
handlerCtx := c.pushNotificationHandlerContext(cn)
return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd)
})
}
// processPendingPushNotificationWithReader processes all pending push notifications on a connection
// This method should be called by the client in WithReader before reading the reply
func (c *baseClient) processPendingPushNotificationWithReader(ctx context.Context, cn *pool.Conn, rd *proto.Reader) error {
// if we have the reader, we don't need to check for data on the socket, we are waiting
// for either a reply or a push notification, so we can block until we get a reply or reach the timeout
if c.opt.Protocol != 3 || c.pushProcessor == nil {
return nil
}
// Create handler context with client, connection pool, and connection information
handlerCtx := c.pushNotificationHandlerContext(cn)
return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd)
}
// pushNotificationHandlerContext creates a handler context for push notification processing
func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) push.NotificationHandlerContext {
return push.NotificationHandlerContext{
Client: c,
ConnPool: c.connPool,
Conn: cn, // Wrap in adapter for easier interface access
}
}