diff --git a/redis.go b/redis.go index b9e54fb8..e78cea42 100644 --- a/redis.go +++ b/redis.go @@ -273,6 +273,13 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) { } if cn.Inited { + // Process all pending push notifications before returning the connection + // This ensures that cluster topology changes are handled immediately + if err := c.processPushNotifications(ctx, cn); err != nil { + // If push notification processing fails, remove the connection + c.connPool.Remove(ctx, cn, err) + return nil, err + } return cn, nil } @@ -284,9 +291,32 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) { return nil, err } + // Process any pending push notifications on the newly initialized connection + // This ensures that any notifications received during connection setup are handled + if err := c.processPushNotifications(ctx, cn); err != nil { + // If push notification processing fails, remove the connection + c.connPool.Remove(ctx, cn, err) + return nil, err + } + return cn, nil } +// processPushNotifications processes all pending push notifications on a connection +// This ensures that cluster topology changes are handled immediately before the connection is used +func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn) error { + // Only process push notifications for RESP3 connections with a processor + if c.opt.Protocol != 3 || c.pushProcessor == nil { + return nil + } + + // Use WithReader to access the reader and process push notifications + // This is critical for hitless upgrades to work properly + return cn.WithReader(ctx, 0, func(rd *proto.Reader) error { + return c.pushProcessor.ProcessPendingNotifications(ctx, rd) + }) +} + func (c *baseClient) newReAuthCredentialsListener(poolCn *pool.Conn) auth.CredentialsListener { return auth.NewReAuthCredentialsListener( c.reAuthConnection(poolCn),