From 30fceb81b4f7cba2cd602f21d0cbc127460f4e13 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Fri, 22 Aug 2025 17:54:40 +0300 Subject: [PATCH] fix hooks and add logging, logging will be removed before merge --- adapters.go | 4 +- hitless/config.go | 113 ++++++++++++++++++++++---------- hitless/hitless_manager.go | 8 ++- hitless/hooks.go | 23 ++----- hitless/notification_handler.go | 20 ++++++ 5 files changed, 112 insertions(+), 56 deletions(-) diff --git a/adapters.go b/adapters.go index 6f123e21..801b86d4 100644 --- a/adapters.go +++ b/adapters.go @@ -103,8 +103,8 @@ func (ca *connectionAdapter) IsUsable() bool { return ca.conn.IsUsable() } -// GetPoolConnection returns the underlying pool connection. -func (ca *connectionAdapter) GetPoolConnection() *pool.Conn { +// GetPoolConn returns the underlying pool connection. +func (ca *connectionAdapter) GetPoolConn() *pool.Conn { return ca.conn } diff --git a/hitless/config.go b/hitless/config.go index b35a0d71..ddb10954 100644 --- a/hitless/config.go +++ b/hitless/config.go @@ -3,6 +3,7 @@ package hitless import ( "net" "runtime" + "strings" "time" "github.com/redis/go-redis/v9/internal/util" @@ -183,8 +184,6 @@ func (c *Config) Validate() error { return ErrInvalidHandoffRetries } - - return nil } @@ -284,8 +283,6 @@ func (c *Config) ApplyDefaultsWithPoolSize(poolSize int) *Config { result.MaxHandoffRetries = c.MaxHandoffRetries } - - return result } @@ -334,44 +331,92 @@ func (c *Config) applyWorkerDefaults(poolSize int) { // DetectEndpointType automatically detects the appropriate endpoint type // based on the connection address and TLS configuration. +// +// For IP addresses: +// - If TLS is enabled: requests FQDN for proper certificate validation +// - If TLS is disabled: requests IP for better performance +// +// For hostnames: +// - If TLS is enabled: always requests FQDN for proper certificate validation +// - If TLS is disabled: requests IP for better performance +// +// Internal vs External detection: +// - For IPs: uses private IP range detection +// - For hostnames: uses heuristics based on common internal naming patterns func DetectEndpointType(addr string, tlsEnabled bool) EndpointType { - // Parse the address to determine if it's an IP or hostname - isPrivate := isPrivateIP(addr) - - var endpointType EndpointType - - if tlsEnabled { - // TLS requires FQDN for certificate validation - if isPrivate { - endpointType = EndpointTypeInternalFQDN - } else { - endpointType = EndpointTypeExternalFQDN - } - } else { - // No TLS, can use IP addresses - if isPrivate { - endpointType = EndpointTypeInternalIP - } else { - endpointType = EndpointTypeExternalIP - } - } - - return endpointType -} - -// isPrivateIP checks if the given address is in a private IP range. -func isPrivateIP(addr string) bool { // Extract host from "host:port" format host, _, err := net.SplitHostPort(addr) if err != nil { host = addr // Assume no port } + // Check if the host is an IP address or hostname ip := net.ParseIP(host) - if ip == nil { - return false // Not an IP address (likely hostname) + isIPAddress := ip != nil + var endpointType EndpointType + + if isIPAddress { + // Address is an IP - determine if it's private or public + isPrivate := ip.IsPrivate() || ip.IsLoopback() || ip.IsLinkLocalUnicast() + + if tlsEnabled { + // TLS with IP addresses - still prefer FQDN for certificate validation + if isPrivate { + endpointType = EndpointTypeInternalFQDN + } else { + endpointType = EndpointTypeExternalFQDN + } + } else { + // No TLS - can use IP addresses directly + if isPrivate { + endpointType = EndpointTypeInternalIP + } else { + endpointType = EndpointTypeExternalIP + } + } + } else { + // Address is a hostname + isInternalHostname := isInternalHostname(host) + if isInternalHostname { + endpointType = EndpointTypeInternalFQDN + } else { + endpointType = EndpointTypeExternalFQDN + } } - // Check for private/loopback ranges - return ip.IsPrivate() || ip.IsLoopback() || ip.IsLinkLocalUnicast() + return endpointType +} + +// isInternalHostname determines if a hostname appears to be internal/private. +// This is a heuristic based on common naming patterns. +func isInternalHostname(hostname string) bool { + // Convert to lowercase for comparison + hostname = strings.ToLower(hostname) + + // Common internal hostname patterns + internalPatterns := []string{ + "localhost", + ".local", + ".internal", + ".corp", + ".lan", + ".intranet", + ".private", + } + + // Check for exact match or suffix match + for _, pattern := range internalPatterns { + if hostname == pattern || strings.HasSuffix(hostname, pattern) { + return true + } + } + + // Check for RFC 1918 style hostnames (e.g., redis-1, db-server, etc.) + // If hostname doesn't contain dots, it's likely internal + if !strings.Contains(hostname, ".") { + return true + } + + // Default to external for fully qualified domain names + return false } diff --git a/hitless/hitless_manager.go b/hitless/hitless_manager.go index 26c379a5..309ac643 100644 --- a/hitless/hitless_manager.go +++ b/hitless/hitless_manager.go @@ -13,8 +13,6 @@ import ( "github.com/redis/go-redis/v9/internal/pool" ) - - // Push notification type constants for hitless upgrades const ( NotificationMoving = "MOVING" @@ -297,3 +295,9 @@ func (hm *HitlessManager) createPoolHook(baseDialer func(context.Context, string return hm.poolHooksRef } + +func (hm *HitlessManager) AddNotificationHook(notificationHook NotificationHook) { + hm.hooksMu.Lock() + defer hm.hooksMu.Unlock() + hm.hooks = append(hm.hooks, notificationHook) +} diff --git a/hitless/hooks.go b/hitless/hooks.go index 7e84e032..7d1b6463 100644 --- a/hitless/hooks.go +++ b/hitless/hooks.go @@ -22,27 +22,14 @@ func (lh *LoggingHook) PreHook(ctx context.Context, notificationType string, not // PostHook logs the result after processing. func (lh *LoggingHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) { if result != nil && lh.LogLevel >= 1 { // Warning level - internal.Logger.Printf(ctx, "hitless: %s notification processing failed: %v", notificationType, result) + internal.Logger.Printf(ctx, "hitless: %s notification processing failed: %v - %v", notificationType, result, notification) } else if lh.LogLevel >= 3 { // Debug level internal.Logger.Printf(ctx, "hitless: %s notification processed successfully", notificationType) } } -// FilterHook is an example hook that can filter out certain notifications. -type FilterHook struct { - BlockedTypes map[string]bool -} - -// PreHook filters notifications based on type. -func (fh *FilterHook) PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) { - if fh.BlockedTypes[notificationType] { - internal.Logger.Printf(ctx, "hitless: filtering out %s notification", notificationType) - return notification, false // Skip processing - } - return notification, true -} - -// PostHook does nothing for filter hook. -func (fh *FilterHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) { - // No post-processing needed for filter hook +// NewLoggingHook creates a new logging hook with the specified log level. +// Log levels: 0=errors, 1=warnings, 2=info, 3=debug +func NewLoggingHook(logLevel int) *LoggingHook { + return &LoggingHook{LogLevel: logLevel} } diff --git a/hitless/notification_handler.go b/hitless/notification_handler.go index cf28460c..72226442 100644 --- a/hitless/notification_handler.go +++ b/hitless/notification_handler.go @@ -19,11 +19,13 @@ type NotificationHandler struct { // HandlePushNotification processes push notifications with hook support. func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { if len(notification) == 0 { + internal.Logger.Printf(ctx, "hitless: invalid notification format: %v", notification) return ErrInvalidNotification } notificationType, ok := notification[0].(string) if !ok { + internal.Logger.Printf(ctx, "hitless: invalid notification type format: %v", notification[0]) return ErrInvalidNotification } @@ -60,16 +62,19 @@ func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, hand // ["MOVING", seqNum, timeS, endpoint] - per-connection handoff func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error { if len(notification) < 3 { + internal.Logger.Printf(ctx, "hitless: invalid MOVING notification: %v", notification) return ErrInvalidNotification } seqID, ok := notification[1].(int64) if !ok { + internal.Logger.Printf(ctx, "hitless: invalid seqID in MOVING notification: %v", notification[1]) return ErrInvalidNotification } // Extract timeS timeS, ok := notification[2].(int64) if !ok { + internal.Logger.Printf(ctx, "hitless: invalid timeS in MOVING notification: %v", notification[2]) return ErrInvalidNotification } @@ -78,6 +83,7 @@ func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx pus // Extract new endpoint newEndpoint, ok = notification[3].(string) if !ok { + internal.Logger.Printf(ctx, "hitless: invalid newEndpoint in MOVING notification: %v", notification[3]) return ErrInvalidNotification } } @@ -85,6 +91,7 @@ func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx pus // Get the connection that received this notification conn := handlerCtx.Conn if conn == nil { + internal.Logger.Printf(ctx, "hitless: no connection in handler context for MOVING notification") return ErrInvalidNotification } @@ -95,6 +102,7 @@ func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx pus } else if pc, ok := conn.(*pool.Conn); ok { poolConn = pc } else { + internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MOVING notification - %T %#v", conn, handlerCtx) return ErrInvalidNotification } @@ -145,17 +153,20 @@ func (snh *NotificationHandler) handleMigrating(ctx context.Context, handlerCtx // MIGRATING notifications indicate that a connection is about to be migrated // Apply relaxed timeouts to the specific connection that received this notification if len(notification) < 2 { + internal.Logger.Printf(ctx, "hitless: invalid MIGRATING notification: %v", notification) return ErrInvalidNotification } // Get the connection from handler context and type assert to connectionAdapter if handlerCtx.Conn == nil { + internal.Logger.Printf(ctx, "hitless: no connection in handler context for MIGRATING notification") return ErrInvalidNotification } // Type assert to connectionAdapter which implements ConnectionWithRelaxedTimeout connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout) if !ok { + internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MIGRATING notification") return ErrInvalidNotification } @@ -169,17 +180,20 @@ func (snh *NotificationHandler) handleMigrated(ctx context.Context, handlerCtx p // MIGRATED notifications indicate that a connection migration has completed // Restore normal timeouts for the specific connection that received this notification if len(notification) < 2 { + internal.Logger.Printf(ctx, "hitless: invalid MIGRATED notification: %v", notification) return ErrInvalidNotification } // Get the connection from handler context and type assert to connectionAdapter if handlerCtx.Conn == nil { + internal.Logger.Printf(ctx, "hitless: no connection in handler context for MIGRATED notification") return ErrInvalidNotification } // Type assert to connectionAdapter which implements ConnectionWithRelaxedTimeout connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout) if !ok { + internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MIGRATED notification") return ErrInvalidNotification } @@ -193,17 +207,20 @@ func (snh *NotificationHandler) handleFailingOver(ctx context.Context, handlerCt // FAILING_OVER notifications indicate that a connection is about to failover // Apply relaxed timeouts to the specific connection that received this notification if len(notification) < 2 { + internal.Logger.Printf(ctx, "hitless: invalid FAILING_OVER notification: %v", notification) return ErrInvalidNotification } // Get the connection from handler context and type assert to connectionAdapter if handlerCtx.Conn == nil { + internal.Logger.Printf(ctx, "hitless: no connection in handler context for FAILING_OVER notification") return ErrInvalidNotification } // Type assert to connectionAdapter which implements ConnectionWithRelaxedTimeout connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout) if !ok { + internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for FAILING_OVER notification") return ErrInvalidNotification } @@ -217,17 +234,20 @@ func (snh *NotificationHandler) handleFailedOver(ctx context.Context, handlerCtx // FAILED_OVER notifications indicate that a connection failover has completed // Restore normal timeouts for the specific connection that received this notification if len(notification) < 2 { + internal.Logger.Printf(ctx, "hitless: invalid FAILED_OVER notification: %v", notification) return ErrInvalidNotification } // Get the connection from handler context and type assert to connectionAdapter if handlerCtx.Conn == nil { + internal.Logger.Printf(ctx, "hitless: no connection in handler context for FAILED_OVER notification") return ErrInvalidNotification } // Type assert to connectionAdapter which implements ConnectionWithRelaxedTimeout connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout) if !ok { + internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for FAILED_OVER notification") return ErrInvalidNotification }