diff --git a/.github/copilot-instructions.md b/.github/copilot-instructions.md index b25460c6..384e0d82 100644 --- a/.github/copilot-instructions.md +++ b/.github/copilot-instructions.md @@ -311,9 +311,50 @@ func (opt *Options) init() { ```go type NotificationProcessor interface { - ProcessPushNotification(ctx context.Context, data []byte) error - RegisterHandler(notificationType string, handler NotificationHandler) error - Close() error + RegisterHandler(pushNotificationName string, handler interface{}, protected bool) error + UnregisterHandler(pushNotificationName string) error + GetHandler(pushNotificationName string) interface{} +} + +type NotificationHandler interface { + HandlePushNotification(ctx context.Context, handlerCtx NotificationHandlerContext, notification []interface{}) error +} +``` + +### Notification Hooks + +```go +type NotificationHook interface { + PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) + PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) +} + +// NotificationHandlerContext provides context for notification processing +type NotificationHandlerContext struct { + Client interface{} // Redis client instance + Pool interface{} // Connection pool + Conn interface{} // Specific connection (*pool.Conn) + IsBlocking bool // Whether notification was on blocking connection +} +``` + +### Hook Implementation Pattern + +```go +func (h *CustomHook) PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) { + // Access connection information + if conn, ok := notificationCtx.Conn.(*pool.Conn); ok { + connID := conn.GetID() + // Process with connection context + } + return notification, true // Continue processing +} + +func (h *CustomHook) PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) { + // Handle processing result + if result != nil { + // Log or handle error + } } ``` diff --git a/adapters.go b/adapters.go index 801b86d4..4146153b 100644 --- a/adapters.go +++ b/adapters.go @@ -7,7 +7,6 @@ import ( "time" "github.com/redis/go-redis/v9/internal/interfaces" - "github.com/redis/go-redis/v9/internal/pool" "github.com/redis/go-redis/v9/push" ) @@ -88,43 +87,6 @@ func (oa *optionsAdapter) NewDialer() func(context.Context) (net.Conn, error) { } } -// connectionAdapter adapts a Redis connection to interfaces.ConnectionWithRelaxedTimeout -type connectionAdapter struct { - conn *pool.Conn -} - -// Close closes the connection. -func (ca *connectionAdapter) Close() error { - return ca.conn.Close() -} - -// IsUsable returns true if the connection is safe to use for new commands. -func (ca *connectionAdapter) IsUsable() bool { - return ca.conn.IsUsable() -} - -// GetPoolConn returns the underlying pool connection. -func (ca *connectionAdapter) GetPoolConn() *pool.Conn { - return ca.conn -} - -// SetRelaxedTimeout sets relaxed timeouts for this connection during hitless upgrades. -// These timeouts remain active until explicitly cleared. -func (ca *connectionAdapter) SetRelaxedTimeout(readTimeout, writeTimeout time.Duration) { - ca.conn.SetRelaxedTimeout(readTimeout, writeTimeout) -} - -// SetRelaxedTimeoutWithDeadline sets relaxed timeouts with an expiration deadline. -// After the deadline, timeouts automatically revert to normal values. -func (ca *connectionAdapter) SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout time.Duration, deadline time.Time) { - ca.conn.SetRelaxedTimeoutWithDeadline(readTimeout, writeTimeout, deadline) -} - -// ClearRelaxedTimeout clears relaxed timeouts for this connection. -func (ca *connectionAdapter) ClearRelaxedTimeout() { - ca.conn.ClearRelaxedTimeout() -} - // pushProcessorAdapter adapts a push.NotificationProcessor to implement interfaces.NotificationProcessor. type pushProcessorAdapter struct { processor push.NotificationProcessor diff --git a/example/pubsub/main.go b/example/pubsub/main.go index ddc0604d..c7330554 100644 --- a/example/pubsub/main.go +++ b/example/pubsub/main.go @@ -5,6 +5,7 @@ import ( "fmt" "log" "sync" + "sync/atomic" "time" "github.com/redis/go-redis/v9" @@ -12,11 +13,15 @@ import ( ) var ctx = context.Background() +var cntErrors atomic.Int64 +var cntSuccess atomic.Int64 +var startTime = time.Now() // This example is not supposed to be run as is. It is just a test to see how pubsub behaves in relation to pool management. // It was used to find regressions in pool management in hitless mode. // Please don't use it as a reference for how to use pubsub. func main() { + startTime = time.Now() wg := &sync.WaitGroup{} rdb := redis.NewClient(&redis.Options{ Addr: ":6379", @@ -25,6 +30,12 @@ func main() { }, }) _ = rdb.FlushDB(ctx).Err() + hitlessManager := rdb.GetHitlessManager() + if hitlessManager == nil { + panic("hitless manager is nil") + } + loggingHook := hitless.NewLoggingHook(3) + hitlessManager.AddNotificationHook(loggingHook) go func() { for { @@ -62,7 +73,8 @@ func main() { subCtx, cancelSubCtx = context.WithCancel(ctx) for i := 0; i < 10; i++ { if err := rdb.Incr(ctx, "publishers").Err(); err != nil { - panic(err) + fmt.Println("incr error:", err) + cntErrors.Add(1) } wg.Add(1) go floodThePool(pubCtx, rdb, wg) @@ -70,12 +82,14 @@ func main() { for i := 0; i < 500; i++ { if err := rdb.Incr(ctx, "subscribers").Err(); err != nil { - panic(err) + fmt.Println("incr error:", err) + cntErrors.Add(1) } + wg.Add(1) go subscribe(subCtx, rdb, "test2", i, wg) } - time.Sleep(5 * time.Second) + time.Sleep(120 * time.Second) fmt.Println("canceling publishers") cancelPublishers() time.Sleep(10 * time.Second) @@ -95,6 +109,9 @@ func main() { fmt.Printf("if drained = published*subscribers: %d\n", publishedInt*subscribersInt) time.Sleep(2 * time.Second) + fmt.Println("errors:", cntErrors.Load()) + fmt.Println("success:", cntSuccess.Load()) + fmt.Println("time:", time.Since(startTime)) } func floodThePool(ctx context.Context, rdb *redis.Client, wg *sync.WaitGroup) { @@ -107,14 +124,18 @@ func floodThePool(ctx context.Context, rdb *redis.Client, wg *sync.WaitGroup) { } err := rdb.Publish(ctx, "test2", "hello").Err() if err != nil { - // noop - //log.Println("publish error:", err) + if err.Error() != "context canceled" { + log.Println("publish error:", err) + cntErrors.Add(1) + } } err = rdb.Incr(ctx, "published").Err() if err != nil { - // noop - //log.Println("incr error:", err) + if err.Error() != "context canceled" { + log.Println("incr error:", err) + cntErrors.Add(1) + } } time.Sleep(10 * time.Nanosecond) } @@ -137,7 +158,10 @@ func subscribe(ctx context.Context, rdb *redis.Client, topic string, subscriberI case msg := <-recChan: err := rdb.Incr(ctx, "received").Err() if err != nil { - log.Println("incr error:", err) + if err.Error() != "context canceled" { + log.Printf("%s\n", err.Error()) + cntErrors.Add(1) + } } _ = msg // Use the message to avoid unused variable warning } diff --git a/hitless/README.md b/hitless/README.md index b82b33a3..7d117a2a 100644 --- a/hitless/README.md +++ b/hitless/README.md @@ -49,23 +49,62 @@ Config: &hitless.Config{ - **Auto-calculated**: `10 × MaxWorkers`, capped by pool size - **Always capped**: Queue size never exceeds pool size -## Metrics Hook Example +## Notification Hooks -A metrics collection hook is available in `example_hooks.go` that demonstrates how to monitor hitless upgrade operations: +Notification hooks allow you to monitor and customize hitless upgrade operations. The `NotificationHook` interface provides pre and post processing hooks: + +```go +type NotificationHook interface { + PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) + PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) +} +``` + +### Example: Metrics Collection Hook + +A metrics collection hook is available in `example_hooks.go`: ```go import "github.com/redis/go-redis/v9/hitless" metricsHook := hitless.NewMetricsHook() -// Use with your monitoring system +manager.AddNotificationHook(metricsHook) + +// Access metrics +metrics := metricsHook.GetMetrics() ``` -The metrics hook tracks: +### Example: Custom Logging Hook + +```go +type CustomHook struct{} + +func (h *CustomHook) PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) { + // Log notification with connection details + if conn, ok := notificationCtx.Conn.(*pool.Conn); ok { + log.Printf("Processing %s on connection %d", notificationType, conn.GetID()) + } + return notification, true // Continue processing +} + +func (h *CustomHook) PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) { + if result != nil { + log.Printf("Failed to process %s: %v", notificationType, result) + } +} +``` + +The notification context provides access to: +- **Client**: The Redis client instance +- **Pool**: The connection pool +- **Conn**: The specific connection that received the notification +- **IsBlocking**: Whether the notification was received on a blocking connection + +Hooks can track: - Handoff success/failure rates -- Handoff duration -- Queue depth -- Worker utilization -- Connection lifecycle events +- Processing duration +- Connection-specific metrics +- Custom business logic ## Requirements diff --git a/hitless/example_hooks.go b/hitless/example_hooks.go index f03ea3ed..0b65f1f5 100644 --- a/hitless/example_hooks.go +++ b/hitless/example_hooks.go @@ -3,6 +3,10 @@ package hitless import ( "context" "time" + + "github.com/redis/go-redis/v9/internal" + "github.com/redis/go-redis/v9/internal/pool" + "github.com/redis/go-redis/v9/push" ) // contextKey is a custom type for context keys to avoid collisions @@ -29,9 +33,14 @@ func NewMetricsHook() *MetricsHook { } // PreHook records the start time for processing metrics. -func (mh *MetricsHook) PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) { +func (mh *MetricsHook) PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) { mh.NotificationCounts[notificationType]++ + // Log connection information if available + if conn, ok := notificationCtx.Conn.(*pool.Conn); ok { + internal.Logger.Printf(ctx, "hitless: metrics hook processing %s notification on connection %d", notificationType, conn.GetID()) + } + // Store start time in context for duration calculation startTime := time.Now() _ = context.WithValue(ctx, startTimeKey, startTime) // Context not used further @@ -40,7 +49,7 @@ func (mh *MetricsHook) PreHook(ctx context.Context, notificationType string, not } // PostHook records processing completion and any errors. -func (mh *MetricsHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) { +func (mh *MetricsHook) PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) { // Calculate processing duration if startTime, ok := ctx.Value(startTimeKey).(time.Time); ok { duration := time.Since(startTime) @@ -50,6 +59,11 @@ func (mh *MetricsHook) PostHook(ctx context.Context, notificationType string, no // Record errors if result != nil { mh.ErrorCounts[notificationType]++ + + // Log error details with connection information + if conn, ok := notificationCtx.Conn.(*pool.Conn); ok { + internal.Logger.Printf(ctx, "hitless: metrics hook recorded error for %s notification on connection %d: %v", notificationType, conn.GetID(), result) + } } } diff --git a/hitless/hitless_manager.go b/hitless/hitless_manager.go index 309ac643..364ba5a4 100644 --- a/hitless/hitless_manager.go +++ b/hitless/hitless_manager.go @@ -11,6 +11,7 @@ import ( "github.com/redis/go-redis/v9/internal" "github.com/redis/go-redis/v9/internal/interfaces" "github.com/redis/go-redis/v9/internal/pool" + "github.com/redis/go-redis/v9/push" ) // Push notification type constants for hitless upgrades @@ -35,8 +36,8 @@ var hitlessNotificationTypes = []string{ // PreHook can modify the notification and return false to skip processing // PostHook is called after successful processing type NotificationHook interface { - PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) - PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) + PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) + PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) } // MovingOperationKey provides a unique key for tracking MOVING operations @@ -252,14 +253,14 @@ func (hm *HitlessManager) GetState() State { } // processPreHooks calls all pre-hooks and returns the modified notification and whether to continue processing. -func (hm *HitlessManager) processPreHooks(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) { +func (hm *HitlessManager) processPreHooks(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) { hm.hooksMu.RLock() defer hm.hooksMu.RUnlock() currentNotification := notification for _, hook := range hm.hooks { - modifiedNotification, shouldContinue := hook.PreHook(ctx, notificationType, currentNotification) + modifiedNotification, shouldContinue := hook.PreHook(ctx, notificationCtx, notificationType, currentNotification) if !shouldContinue { return modifiedNotification, false } @@ -270,12 +271,12 @@ func (hm *HitlessManager) processPreHooks(ctx context.Context, notificationType } // processPostHooks calls all post-hooks with the processing result. -func (hm *HitlessManager) processPostHooks(ctx context.Context, notificationType string, notification []interface{}, result error) { +func (hm *HitlessManager) processPostHooks(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) { hm.hooksMu.RLock() defer hm.hooksMu.RUnlock() for _, hook := range hm.hooks { - hook.PostHook(ctx, notificationType, notification, result) + hook.PostHook(ctx, notificationCtx, notificationType, notification, result) } } diff --git a/hitless/hooks.go b/hitless/hooks.go index 7d1b6463..d0093bca 100644 --- a/hitless/hooks.go +++ b/hitless/hooks.go @@ -4,6 +4,8 @@ import ( "context" "github.com/redis/go-redis/v9/internal" + "github.com/redis/go-redis/v9/internal/pool" + "github.com/redis/go-redis/v9/push" ) // LoggingHook is an example hook implementation that logs all notifications. @@ -12,19 +14,28 @@ type LoggingHook struct { } // PreHook logs the notification before processing and allows modification. -func (lh *LoggingHook) PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) { +func (lh *LoggingHook) PreHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}) ([]interface{}, bool) { if lh.LogLevel >= 2 { // Info level - internal.Logger.Printf(ctx, "hitless: processing %s notification: %v", notificationType, notification) + // Log the notification type and content + connID := uint64(0) + if conn, ok := notificationCtx.Conn.(*pool.Conn); ok { + connID = conn.GetID() + } + internal.Logger.Printf(ctx, "hitless: conn[%d] processing %s notification: %v", connID, notificationType, notification) } return notification, true // Continue processing with unmodified notification } // PostHook logs the result after processing. -func (lh *LoggingHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) { +func (lh *LoggingHook) PostHook(ctx context.Context, notificationCtx push.NotificationHandlerContext, notificationType string, notification []interface{}, result error) { + connID := uint64(0) + if conn, ok := notificationCtx.Conn.(*pool.Conn); ok { + connID = conn.GetID() + } if result != nil && lh.LogLevel >= 1 { // Warning level - internal.Logger.Printf(ctx, "hitless: %s notification processing failed: %v - %v", notificationType, result, notification) + internal.Logger.Printf(ctx, "hitless: conn[%d] %s notification processing failed: %v - %v", connID, notificationType, result, notification) } else if lh.LogLevel >= 3 { // Debug level - internal.Logger.Printf(ctx, "hitless: %s notification processed successfully", notificationType) + internal.Logger.Printf(ctx, "hitless: conn[%d] %s notification processed successfully", connID, notificationType) } } diff --git a/hitless/notification_handler.go b/hitless/notification_handler.go index 72226442..246e887a 100644 --- a/hitless/notification_handler.go +++ b/hitless/notification_handler.go @@ -30,7 +30,7 @@ func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, hand } // Process pre-hooks - they can modify the notification or skip processing - modifiedNotification, shouldContinue := snh.manager.processPreHooks(ctx, notificationType, notification) + modifiedNotification, shouldContinue := snh.manager.processPreHooks(ctx, handlerCtx, notificationType, notification) if !shouldContinue { return nil // Hooks decided to skip processing } @@ -53,7 +53,7 @@ func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, hand } // Process post-hooks with the result - snh.manager.processPostHooks(ctx, notificationType, modifiedNotification, err) + snh.manager.processPostHooks(ctx, handlerCtx, notificationType, modifiedNotification, err) return err } @@ -97,9 +97,7 @@ func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx pus // Type assert to get the underlying pool connection var poolConn *pool.Conn - if connAdapter, ok := conn.(interface{ GetPoolConn() *pool.Conn }); ok { - poolConn = connAdapter.GetPoolConn() - } else if pc, ok := conn.(*pool.Conn); ok { + if pc, ok := conn.(*pool.Conn); ok { poolConn = pc } else { internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MOVING notification - %T %#v", conn, handlerCtx) @@ -157,13 +155,11 @@ func (snh *NotificationHandler) handleMigrating(ctx context.Context, handlerCtx return ErrInvalidNotification } - // Get the connection from handler context and type assert to connectionAdapter if handlerCtx.Conn == nil { internal.Logger.Printf(ctx, "hitless: no connection in handler context for MIGRATING notification") return ErrInvalidNotification } - // Type assert to connectionAdapter which implements ConnectionWithRelaxedTimeout connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout) if !ok { internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MIGRATING notification") @@ -184,13 +180,11 @@ func (snh *NotificationHandler) handleMigrated(ctx context.Context, handlerCtx p return ErrInvalidNotification } - // Get the connection from handler context and type assert to connectionAdapter if handlerCtx.Conn == nil { internal.Logger.Printf(ctx, "hitless: no connection in handler context for MIGRATED notification") return ErrInvalidNotification } - // Type assert to connectionAdapter which implements ConnectionWithRelaxedTimeout connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout) if !ok { internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MIGRATED notification") @@ -211,13 +205,11 @@ func (snh *NotificationHandler) handleFailingOver(ctx context.Context, handlerCt return ErrInvalidNotification } - // Get the connection from handler context and type assert to connectionAdapter if handlerCtx.Conn == nil { internal.Logger.Printf(ctx, "hitless: no connection in handler context for FAILING_OVER notification") return ErrInvalidNotification } - // Type assert to connectionAdapter which implements ConnectionWithRelaxedTimeout connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout) if !ok { internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for FAILING_OVER notification") @@ -238,13 +230,11 @@ func (snh *NotificationHandler) handleFailedOver(ctx context.Context, handlerCtx return ErrInvalidNotification } - // Get the connection from handler context and type assert to connectionAdapter if handlerCtx.Conn == nil { internal.Logger.Printf(ctx, "hitless: no connection in handler context for FAILED_OVER notification") return ErrInvalidNotification } - // Type assert to connectionAdapter which implements ConnectionWithRelaxedTimeout connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout) if !ok { internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for FAILED_OVER notification") diff --git a/hitless/pool_hook.go b/hitless/pool_hook.go index e1b7d603..9a1d7d35 100644 --- a/hitless/pool_hook.go +++ b/hitless/pool_hook.go @@ -117,7 +117,7 @@ func (ph *PoolHook) IsHandoffPending(conn *pool.Conn) bool { } // OnGet is called when a connection is retrieved from the pool -func (ph *PoolHook) OnGet(ctx context.Context, conn *pool.Conn, isNewConn bool) error { +func (ph *PoolHook) OnGet(ctx context.Context, conn *pool.Conn, _ bool) error { // NOTE: There are two conditions to make sure we don't return a connection that should be handed off or is // in a handoff state at the moment. @@ -234,6 +234,7 @@ func (ph *PoolHook) onDemandWorker() { func (ph *PoolHook) processHandoffRequest(request HandoffRequest) { // Remove from pending map defer ph.pending.Delete(request.Conn.GetID()) + internal.Logger.Printf(context.Background(), "hitless: conn[%d] Processing handoff request start", request.Conn.GetID()) // Create a context with handoff timeout from config handoffTimeout := 30 * time.Second // Default fallback @@ -366,6 +367,7 @@ func (ph *PoolHook) performConnectionHandoffWithPool(ctx context.Context, conn * } retries := conn.IncrementAndGetHandoffRetries(1) + internal.Logger.Printf(ctx, "hitless: conn[%d] Retry %d: Performing handoff to %s(was %s)", conn.GetID(), retries, newEndpoint, conn.RemoteAddr().String()) maxRetries := 3 // Default fallback if ph.config != nil { maxRetries = ph.config.MaxHandoffRetries @@ -387,6 +389,7 @@ func (ph *PoolHook) performConnectionHandoffWithPool(ctx context.Context, conn * // Create new connection to the new endpoint newNetConn, err := endpointDialer(ctx) if err != nil { + internal.Logger.Printf(ctx, "hitless: conn[%d] Failed to dial new endpoint %s: %v", conn.GetID(), newEndpoint, err) // hitless: will retry // Maybe a network error - retry after a delay return true, err @@ -409,6 +412,7 @@ func (ph *PoolHook) performConnectionHandoffWithPool(ctx context.Context, conn * }() conn.ClearHandoffState() + internal.Logger.Printf(ctx, "hitless: conn[%d] Handoff to %s successful", conn.GetID(), newEndpoint) // Apply relaxed timeout to the new connection for the configured post-handoff duration // This gives the new connection more time to handle operations during cluster transition diff --git a/internal/pool/pubsub.go b/internal/pool/pubsub.go index a06abcd6..c616300f 100644 --- a/internal/pool/pubsub.go +++ b/internal/pool/pubsub.go @@ -5,6 +5,8 @@ import ( "net" "sync" "sync/atomic" + + "github.com/redis/go-redis/v9/internal" ) type PubSubStats struct { @@ -52,6 +54,9 @@ func (p *PubSubPool) TrackConn(cn *Conn) { } func (p *PubSubPool) UntrackConn(cn *Conn) { + if !cn.IsUsable() || cn.ShouldHandoff() { + internal.Logger.Printf(context.Background(), "pubsub: untracking connection %d [usable, handoff] = [%v, %v]", cn.GetID(), cn.IsUsable(), cn.ShouldHandoff()) + } atomic.AddUint32(&p.stats.Active, ^uint32(0)) atomic.AddUint32(&p.stats.Untracked, 1) p.activeConns.Delete(cn.GetID()) diff --git a/pubsub.go b/pubsub.go index 6db13a9a..506ce1e6 100644 --- a/pubsub.go +++ b/pubsub.go @@ -170,10 +170,16 @@ func (c *PubSub) releaseConn(ctx context.Context, cn *pool.Conn, err error, allo } if !cn.IsUsable() || cn.ShouldHandoff() { + if cn.ShouldHandoff() { + internal.Logger.Printf(ctx, "pubsub: connection[%d] is marked for handoff, reconnecting", cn.GetID()) + } else { + internal.Logger.Printf(ctx, "pubsub: connection[%d] is not usable, reconnecting", cn.GetID()) + } c.reconnect(ctx, fmt.Errorf("pubsub: connection is not usable")) } if isBadConn(err, allowTimeout, c.opt.Addr) { + internal.Logger.Printf(ctx, "pubsub: releasing connection[%d]: %v", cn.GetID(), err) c.reconnect(ctx, err) } } @@ -187,7 +193,10 @@ func (c *PubSub) reconnect(ctx context.Context, reason error) { } if newEndpoint != "" { + // Update the address in the options + oldAddr := c.cn.RemoteAddr().String() c.opt.Addr = newEndpoint + internal.Logger.Printf(ctx, "pubsub: reconnecting to new endpoint %s (was %s)", newEndpoint, oldAddr) } } _ = c.closeTheCn(reason) @@ -199,7 +208,7 @@ func (c *PubSub) closeTheCn(reason error) error { return nil } if !c.closed { - internal.Logger.Printf(c.getContext(), "redis: discarding bad PubSub connection: %s", reason) + internal.Logger.Printf(c.getContext(), "redis: discarding bad PubSub connection[%d]: %s, %v", c.cn.GetID(), reason, c.cn.RemoteAddr()) } err := c.closeConn(c.cn) c.cn = nil diff --git a/push/handler_context.go b/push/handler_context.go index f89f87fa..c39e186b 100644 --- a/push/handler_context.go +++ b/push/handler_context.go @@ -37,7 +37,6 @@ type NotificationHandlerContext struct { // circular dependencies. The developer is responsible for type assertion. // It can be one of the following types: // - *pool.Conn - // - *connectionAdapter (for hitless upgrades) Conn interface{} // IsBlocking indicates if the notification was received on a blocking connection. diff --git a/redis.go b/redis.go index 13c7feca..d6e7b4d3 100644 --- a/redis.go +++ b/redis.go @@ -1077,6 +1077,7 @@ func (c *Client) pubSub() *PubSub { // will return nil if already initialized err = c.initConn(ctx, cn) if err != nil { + internal.Logger.Printf(ctx, "pubsub: conn[%d] to ADDR %s [usable, handoff] = [%v, %v] after initConn returned %v", cn.GetID(), addr, cn.IsUsable(), cn.ShouldHandoff(), err) _ = cn.Close() return nil, err } @@ -1277,7 +1278,7 @@ func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) push.Notifica return push.NotificationHandlerContext{ Client: c, ConnPool: c.connPool, - Conn: &connectionAdapter{conn: cn}, // Wrap in adapter for easier interface access + Conn: cn, // Wrap in adapter for easier interface access } }