diff --git a/internal/pool/pool.go b/internal/pool/pool.go index 3b74eccc..892326a5 100644 --- a/internal/pool/pool.go +++ b/internal/pool/pool.go @@ -381,21 +381,26 @@ func (p *ConnPool) popIdle() (*Conn, error) { } func (p *ConnPool) Put(ctx context.Context, cn *Conn) { + shouldRemove := false if cn.rd.Buffered() > 0 { // Check if this might be push notification data if p.cfg.Protocol == 3 { // we know that there is something in the buffer, so peek at the next reply type without - // the potential to block - if replyType, err := cn.rd.PeekReplyType(); err == nil && replyType == proto.RespPush { - // For push notifications, we allow some buffered data - // The client will process these notifications before using the connection - return + // the potential to block and check if it's a push notification + if replyType, err := cn.rd.PeekReplyType(); err != nil || replyType != proto.RespPush { + shouldRemove = true } + } else { + // not a push notification since protocol 2 doesn't support them + shouldRemove = true + } + + if shouldRemove { + // For non-RESP3 or data that is not a push notification, buffered data is unexpected + internal.Logger.Printf(ctx, "Conn has unread data, closing it") + p.Remove(ctx, cn, BadConnError{}) + return } - // For non-RESP3 or data that is not a push notification, buffered data is unexpected - internal.Logger.Printf(ctx, "Conn has unread data, closing it") - p.Remove(ctx, cn, BadConnError{}) - return } if !cn.pooled {