From b23f43c2f1a92a4ac9e6fbea7b23c71be11a533f Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Sat, 5 Jul 2025 06:18:38 +0300 Subject: [PATCH] fix(peek): non-blocking peek --- internal/pool/conn.go | 4 ++++ internal/pool/pool.go | 4 ++++ push/processor.go | 36 +++++++++++++++++++++++++++++++++--- redis.go | 7 ------- 4 files changed, 41 insertions(+), 10 deletions(-) diff --git a/internal/pool/conn.go b/internal/pool/conn.go index 570aefcd..fa93781d 100644 --- a/internal/pool/conn.go +++ b/internal/pool/conn.go @@ -58,6 +58,10 @@ func (cn *Conn) SetNetConn(netConn net.Conn) { cn.bw.Reset(netConn) } +func (cn *Conn) GetNetConn() net.Conn { + return cn.netConn +} + func (cn *Conn) Write(b []byte) (int, error) { return cn.netConn.Write(b) } diff --git a/internal/pool/pool.go b/internal/pool/pool.go index e48aaaff..22f8ea6a 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -384,6 +384,8 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) { if cn.rd.Buffered() > 0 { // Check if this might be push notification data if p.cfg.Protocol == 3 { + // we know that there is something in the buffer, so peek at the next reply type without + // the potential to block if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush { // For push notifications, we allow some buffered data // The client will process these notifications before using the connection @@ -546,6 +548,8 @@ func (p *ConnPool) isHealthyConn(cn *Conn) bool { // However, push notification processing is now handled by the client // before WithReader to ensure proper context is available to handlers if err == errUnexpectedRead && p.cfg.Protocol == 3 { + // we know that there is something in the buffer, so peek at the next reply type without + // the potential to block if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush { // For RESP3 connections with push notifications, we allow some buffered data // The client will process these notifications before using the connection diff --git a/push/processor.go b/push/processor.go index bf3dfa9a..24bca662 100644 --- a/push/processor.go +++ b/push/processor.go @@ -2,6 +2,7 @@ package push import ( "context" + "time" "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/proto" @@ -51,8 +52,23 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx if rd == nil { return nil } + conn := handlerCtx.Conn + if conn == nil { + return nil + } + netConn := handlerCtx.Conn.GetNetConn() + if netConn == nil { + return nil + } for { + // Set a short read deadline to check for available data + // otherwise we may block on Peek if there is no data available + err := netConn.SetReadDeadline(time.Now().Add(1)) + if err != nil { + return err + } + // Check if there's data available to read replyType, err := rd.PeekReplyType() if err != nil { @@ -104,7 +120,7 @@ func (p *Processor) ProcessPendingNotifications(ctx context.Context, handlerCtx } } - return nil + return netConn.SetReadDeadline(time.Time{}) } // VoidProcessor discards all push notifications without processing them @@ -133,12 +149,26 @@ func (v *VoidProcessor) UnregisterHandler(pushNotificationName string) error { // ProcessPendingNotifications for VoidProcessor does nothing since push notifications // are only available in RESP3 and this processor is used for RESP2 connections. // This avoids unnecessary buffer scanning overhead. -func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, _ NotificationHandlerContext, rd *proto.Reader) error { +func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, handlerCtx NotificationHandlerContext, rd *proto.Reader) error { // read and discard all push notifications if rd == nil { return nil } + conn := handlerCtx.Conn + if conn == nil { + return nil + } + netConn := handlerCtx.Conn.GetNetConn() + if netConn == nil { + return nil + } for { + // Set a short read deadline to check for available data + err := netConn.SetReadDeadline(time.Now().Add(1)) + if err != nil { + return err + } + // Check if there's data available to read replyType, err := rd.PeekReplyType() if err != nil { // No more data available or error reading @@ -166,7 +196,7 @@ func (v *VoidProcessor) ProcessPendingNotifications(_ context.Context, _ Notific return nil } } - return nil + return netConn.SetReadDeadline(time.Time{}) } // willHandleNotificationInClient checks if a notification type should be ignored by the push notification diff --git a/redis.go b/redis.go index 79577ba7..f0d6fb17 100644 --- a/redis.go +++ b/redis.go @@ -733,13 +733,6 @@ func (c *baseClient) txPipelineProcessCmds( } if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error { - // To be sure there are no buffered push notifications, we process them before reading the reply - if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil { - // Log the error but don't fail the command execution - // Push notification processing errors shouldn't break normal Redis operations - internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err) - } - statusCmd := cmds[0].(*StatusCmd) // Trim multi and exec. trimmedCmds := cmds[1 : len(cmds)-1]