From cb8a4e5721cfcdc334ef197a0b84d0fbd7d06a6b Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Wed, 2 Jul 2025 17:04:28 +0300 Subject: [PATCH] feat: process push notifications before returning connections from pool MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement push notification processing in baseClient._getConn() to ensure that all cluster topology changes are handled immediately before connections are used for commands. This is critical for hitless upgrades and real-time cluster state awareness. Key Enhancements: 1. Enhanced Connection Retrieval (_getConn): - Process push notifications for both existing and new connections - Added processPushNotifications() call before returning connections - Ensures immediate handling of cluster topology changes - Proper error handling with connection removal on processing failures 2. Push Notification Processing Method: - Added processPushNotifications() method to baseClient - Only processes notifications for RESP3 connections with processors - Uses WithReader() to safely access connection reader - Integrates with existing push notification infrastructure 3. Connection Flow Enhancement: - Existing connections: Health check → Push notification processing → Return - New connections: Initialization → Push notification processing → Return - Failed processing results in connection removal and error return - Seamless integration with existing connection management 4. RESP3 Protocol Integration: - Protocol version check (only process for RESP3) - Push processor availability check - Graceful handling when processors are not available - Consistent behavior with existing push notification system 5. Error Handling and Recovery: - Remove connections if push notification processing fails - Return errors to trigger connection retry mechanisms - Maintain connection pool health and reliability - Prevent returning connections with unprocessed notifications Implementation Details: - processPushNotifications() checks protocol and processor availability - Uses cn.WithReader() to safely access the connection reader - Calls pushProcessor.ProcessPendingNotifications() for actual processing - Applied to both pooled connections and newly initialized connections - Consistent error handling across all connection retrieval paths Flow Enhancement: 1. Connection requested via _getConn() 2. Connection retrieved from pool (existing or new) 3. Connection initialization (if new) 4. Push notification processing (NEW) 5. Connection returned to caller 6. Commands executed with up-to-date cluster state Benefits: - Immediate cluster topology awareness before command execution - Enhanced hitless upgrade reliability with real-time notifications - Reduced command failures during cluster topology changes - Consistent push notification handling across all connection types - Better integration with Redis cluster operations This ensures that Redis cluster topology changes (MOVING, MIGRATING, MIGRATED, FAILING_OVER, FAILED_OVER) are always processed before connections are used, providing the foundation for reliable hitless upgrades and seamless cluster operations. --- redis.go | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/redis.go b/redis.go index b9e54fb8..e78cea42 100644 --- a/redis.go +++ b/redis.go @@ -273,6 +273,13 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) { } if cn.Inited { + // Process all pending push notifications before returning the connection + // This ensures that cluster topology changes are handled immediately + if err := c.processPushNotifications(ctx, cn); err != nil { + // If push notification processing fails, remove the connection + c.connPool.Remove(ctx, cn, err) + return nil, err + } return cn, nil } @@ -284,9 +291,32 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) { return nil, err } + // Process any pending push notifications on the newly initialized connection + // This ensures that any notifications received during connection setup are handled + if err := c.processPushNotifications(ctx, cn); err != nil { + // If push notification processing fails, remove the connection + c.connPool.Remove(ctx, cn, err) + return nil, err + } + return cn, nil } +// processPushNotifications processes all pending push notifications on a connection +// This ensures that cluster topology changes are handled immediately before the connection is used +func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn) error { + // Only process push notifications for RESP3 connections with a processor + if c.opt.Protocol != 3 || c.pushProcessor == nil { + return nil + } + + // Use WithReader to access the reader and process push notifications + // This is critical for hitless upgrades to work properly + return cn.WithReader(ctx, 0, func(rd *proto.Reader) error { + return c.pushProcessor.ProcessPendingNotifications(ctx, rd) + }) +} + func (c *baseClient) newReAuthCredentialsListener(poolCn *pool.Conn) auth.CredentialsListener { return auth.NewReAuthCredentialsListener( c.reAuthConnection(poolCn),