mirror of
https://github.com/redis/go-redis.git
synced 2025-09-02 22:01:16 +03:00
fix hooks and add logging, logging will be removed before merge
This commit is contained in:
@@ -103,8 +103,8 @@ func (ca *connectionAdapter) IsUsable() bool {
|
||||
return ca.conn.IsUsable()
|
||||
}
|
||||
|
||||
// GetPoolConnection returns the underlying pool connection.
|
||||
func (ca *connectionAdapter) GetPoolConnection() *pool.Conn {
|
||||
// GetPoolConn returns the underlying pool connection.
|
||||
func (ca *connectionAdapter) GetPoolConn() *pool.Conn {
|
||||
return ca.conn
|
||||
}
|
||||
|
||||
|
@@ -3,6 +3,7 @@ package hitless
|
||||
import (
|
||||
"net"
|
||||
"runtime"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9/internal/util"
|
||||
@@ -183,8 +184,6 @@ func (c *Config) Validate() error {
|
||||
return ErrInvalidHandoffRetries
|
||||
}
|
||||
|
||||
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -284,8 +283,6 @@ func (c *Config) ApplyDefaultsWithPoolSize(poolSize int) *Config {
|
||||
result.MaxHandoffRetries = c.MaxHandoffRetries
|
||||
}
|
||||
|
||||
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
@@ -334,44 +331,92 @@ func (c *Config) applyWorkerDefaults(poolSize int) {
|
||||
|
||||
// DetectEndpointType automatically detects the appropriate endpoint type
|
||||
// based on the connection address and TLS configuration.
|
||||
//
|
||||
// For IP addresses:
|
||||
// - If TLS is enabled: requests FQDN for proper certificate validation
|
||||
// - If TLS is disabled: requests IP for better performance
|
||||
//
|
||||
// For hostnames:
|
||||
// - If TLS is enabled: always requests FQDN for proper certificate validation
|
||||
// - If TLS is disabled: requests IP for better performance
|
||||
//
|
||||
// Internal vs External detection:
|
||||
// - For IPs: uses private IP range detection
|
||||
// - For hostnames: uses heuristics based on common internal naming patterns
|
||||
func DetectEndpointType(addr string, tlsEnabled bool) EndpointType {
|
||||
// Parse the address to determine if it's an IP or hostname
|
||||
isPrivate := isPrivateIP(addr)
|
||||
|
||||
var endpointType EndpointType
|
||||
|
||||
if tlsEnabled {
|
||||
// TLS requires FQDN for certificate validation
|
||||
if isPrivate {
|
||||
endpointType = EndpointTypeInternalFQDN
|
||||
} else {
|
||||
endpointType = EndpointTypeExternalFQDN
|
||||
}
|
||||
} else {
|
||||
// No TLS, can use IP addresses
|
||||
if isPrivate {
|
||||
endpointType = EndpointTypeInternalIP
|
||||
} else {
|
||||
endpointType = EndpointTypeExternalIP
|
||||
}
|
||||
}
|
||||
|
||||
return endpointType
|
||||
}
|
||||
|
||||
// isPrivateIP checks if the given address is in a private IP range.
|
||||
func isPrivateIP(addr string) bool {
|
||||
// Extract host from "host:port" format
|
||||
host, _, err := net.SplitHostPort(addr)
|
||||
if err != nil {
|
||||
host = addr // Assume no port
|
||||
}
|
||||
|
||||
// Check if the host is an IP address or hostname
|
||||
ip := net.ParseIP(host)
|
||||
if ip == nil {
|
||||
return false // Not an IP address (likely hostname)
|
||||
isIPAddress := ip != nil
|
||||
var endpointType EndpointType
|
||||
|
||||
if isIPAddress {
|
||||
// Address is an IP - determine if it's private or public
|
||||
isPrivate := ip.IsPrivate() || ip.IsLoopback() || ip.IsLinkLocalUnicast()
|
||||
|
||||
if tlsEnabled {
|
||||
// TLS with IP addresses - still prefer FQDN for certificate validation
|
||||
if isPrivate {
|
||||
endpointType = EndpointTypeInternalFQDN
|
||||
} else {
|
||||
endpointType = EndpointTypeExternalFQDN
|
||||
}
|
||||
} else {
|
||||
// No TLS - can use IP addresses directly
|
||||
if isPrivate {
|
||||
endpointType = EndpointTypeInternalIP
|
||||
} else {
|
||||
endpointType = EndpointTypeExternalIP
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Address is a hostname
|
||||
isInternalHostname := isInternalHostname(host)
|
||||
if isInternalHostname {
|
||||
endpointType = EndpointTypeInternalFQDN
|
||||
} else {
|
||||
endpointType = EndpointTypeExternalFQDN
|
||||
}
|
||||
}
|
||||
|
||||
// Check for private/loopback ranges
|
||||
return ip.IsPrivate() || ip.IsLoopback() || ip.IsLinkLocalUnicast()
|
||||
return endpointType
|
||||
}
|
||||
|
||||
// isInternalHostname determines if a hostname appears to be internal/private.
|
||||
// This is a heuristic based on common naming patterns.
|
||||
func isInternalHostname(hostname string) bool {
|
||||
// Convert to lowercase for comparison
|
||||
hostname = strings.ToLower(hostname)
|
||||
|
||||
// Common internal hostname patterns
|
||||
internalPatterns := []string{
|
||||
"localhost",
|
||||
".local",
|
||||
".internal",
|
||||
".corp",
|
||||
".lan",
|
||||
".intranet",
|
||||
".private",
|
||||
}
|
||||
|
||||
// Check for exact match or suffix match
|
||||
for _, pattern := range internalPatterns {
|
||||
if hostname == pattern || strings.HasSuffix(hostname, pattern) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Check for RFC 1918 style hostnames (e.g., redis-1, db-server, etc.)
|
||||
// If hostname doesn't contain dots, it's likely internal
|
||||
if !strings.Contains(hostname, ".") {
|
||||
return true
|
||||
}
|
||||
|
||||
// Default to external for fully qualified domain names
|
||||
return false
|
||||
}
|
||||
|
@@ -13,8 +13,6 @@ import (
|
||||
"github.com/redis/go-redis/v9/internal/pool"
|
||||
)
|
||||
|
||||
|
||||
|
||||
// Push notification type constants for hitless upgrades
|
||||
const (
|
||||
NotificationMoving = "MOVING"
|
||||
@@ -297,3 +295,9 @@ func (hm *HitlessManager) createPoolHook(baseDialer func(context.Context, string
|
||||
|
||||
return hm.poolHooksRef
|
||||
}
|
||||
|
||||
func (hm *HitlessManager) AddNotificationHook(notificationHook NotificationHook) {
|
||||
hm.hooksMu.Lock()
|
||||
defer hm.hooksMu.Unlock()
|
||||
hm.hooks = append(hm.hooks, notificationHook)
|
||||
}
|
||||
|
@@ -22,27 +22,14 @@ func (lh *LoggingHook) PreHook(ctx context.Context, notificationType string, not
|
||||
// PostHook logs the result after processing.
|
||||
func (lh *LoggingHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) {
|
||||
if result != nil && lh.LogLevel >= 1 { // Warning level
|
||||
internal.Logger.Printf(ctx, "hitless: %s notification processing failed: %v", notificationType, result)
|
||||
internal.Logger.Printf(ctx, "hitless: %s notification processing failed: %v - %v", notificationType, result, notification)
|
||||
} else if lh.LogLevel >= 3 { // Debug level
|
||||
internal.Logger.Printf(ctx, "hitless: %s notification processed successfully", notificationType)
|
||||
}
|
||||
}
|
||||
|
||||
// FilterHook is an example hook that can filter out certain notifications.
|
||||
type FilterHook struct {
|
||||
BlockedTypes map[string]bool
|
||||
}
|
||||
|
||||
// PreHook filters notifications based on type.
|
||||
func (fh *FilterHook) PreHook(ctx context.Context, notificationType string, notification []interface{}) ([]interface{}, bool) {
|
||||
if fh.BlockedTypes[notificationType] {
|
||||
internal.Logger.Printf(ctx, "hitless: filtering out %s notification", notificationType)
|
||||
return notification, false // Skip processing
|
||||
}
|
||||
return notification, true
|
||||
}
|
||||
|
||||
// PostHook does nothing for filter hook.
|
||||
func (fh *FilterHook) PostHook(ctx context.Context, notificationType string, notification []interface{}, result error) {
|
||||
// No post-processing needed for filter hook
|
||||
// NewLoggingHook creates a new logging hook with the specified log level.
|
||||
// Log levels: 0=errors, 1=warnings, 2=info, 3=debug
|
||||
func NewLoggingHook(logLevel int) *LoggingHook {
|
||||
return &LoggingHook{LogLevel: logLevel}
|
||||
}
|
||||
|
@@ -19,11 +19,13 @@ type NotificationHandler struct {
|
||||
// HandlePushNotification processes push notifications with hook support.
|
||||
func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
|
||||
if len(notification) == 0 {
|
||||
internal.Logger.Printf(ctx, "hitless: invalid notification format: %v", notification)
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
notificationType, ok := notification[0].(string)
|
||||
if !ok {
|
||||
internal.Logger.Printf(ctx, "hitless: invalid notification type format: %v", notification[0])
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
@@ -60,16 +62,19 @@ func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, hand
|
||||
// ["MOVING", seqNum, timeS, endpoint] - per-connection handoff
|
||||
func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
|
||||
if len(notification) < 3 {
|
||||
internal.Logger.Printf(ctx, "hitless: invalid MOVING notification: %v", notification)
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
seqID, ok := notification[1].(int64)
|
||||
if !ok {
|
||||
internal.Logger.Printf(ctx, "hitless: invalid seqID in MOVING notification: %v", notification[1])
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
// Extract timeS
|
||||
timeS, ok := notification[2].(int64)
|
||||
if !ok {
|
||||
internal.Logger.Printf(ctx, "hitless: invalid timeS in MOVING notification: %v", notification[2])
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
@@ -78,6 +83,7 @@ func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx pus
|
||||
// Extract new endpoint
|
||||
newEndpoint, ok = notification[3].(string)
|
||||
if !ok {
|
||||
internal.Logger.Printf(ctx, "hitless: invalid newEndpoint in MOVING notification: %v", notification[3])
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
}
|
||||
@@ -85,6 +91,7 @@ func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx pus
|
||||
// Get the connection that received this notification
|
||||
conn := handlerCtx.Conn
|
||||
if conn == nil {
|
||||
internal.Logger.Printf(ctx, "hitless: no connection in handler context for MOVING notification")
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
@@ -95,6 +102,7 @@ func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx pus
|
||||
} else if pc, ok := conn.(*pool.Conn); ok {
|
||||
poolConn = pc
|
||||
} else {
|
||||
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MOVING notification - %T %#v", conn, handlerCtx)
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
@@ -145,17 +153,20 @@ func (snh *NotificationHandler) handleMigrating(ctx context.Context, handlerCtx
|
||||
// MIGRATING notifications indicate that a connection is about to be migrated
|
||||
// Apply relaxed timeouts to the specific connection that received this notification
|
||||
if len(notification) < 2 {
|
||||
internal.Logger.Printf(ctx, "hitless: invalid MIGRATING notification: %v", notification)
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
// Get the connection from handler context and type assert to connectionAdapter
|
||||
if handlerCtx.Conn == nil {
|
||||
internal.Logger.Printf(ctx, "hitless: no connection in handler context for MIGRATING notification")
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
// Type assert to connectionAdapter which implements ConnectionWithRelaxedTimeout
|
||||
connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout)
|
||||
if !ok {
|
||||
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MIGRATING notification")
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
@@ -169,17 +180,20 @@ func (snh *NotificationHandler) handleMigrated(ctx context.Context, handlerCtx p
|
||||
// MIGRATED notifications indicate that a connection migration has completed
|
||||
// Restore normal timeouts for the specific connection that received this notification
|
||||
if len(notification) < 2 {
|
||||
internal.Logger.Printf(ctx, "hitless: invalid MIGRATED notification: %v", notification)
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
// Get the connection from handler context and type assert to connectionAdapter
|
||||
if handlerCtx.Conn == nil {
|
||||
internal.Logger.Printf(ctx, "hitless: no connection in handler context for MIGRATED notification")
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
// Type assert to connectionAdapter which implements ConnectionWithRelaxedTimeout
|
||||
connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout)
|
||||
if !ok {
|
||||
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for MIGRATED notification")
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
@@ -193,17 +207,20 @@ func (snh *NotificationHandler) handleFailingOver(ctx context.Context, handlerCt
|
||||
// FAILING_OVER notifications indicate that a connection is about to failover
|
||||
// Apply relaxed timeouts to the specific connection that received this notification
|
||||
if len(notification) < 2 {
|
||||
internal.Logger.Printf(ctx, "hitless: invalid FAILING_OVER notification: %v", notification)
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
// Get the connection from handler context and type assert to connectionAdapter
|
||||
if handlerCtx.Conn == nil {
|
||||
internal.Logger.Printf(ctx, "hitless: no connection in handler context for FAILING_OVER notification")
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
// Type assert to connectionAdapter which implements ConnectionWithRelaxedTimeout
|
||||
connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout)
|
||||
if !ok {
|
||||
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for FAILING_OVER notification")
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
@@ -217,17 +234,20 @@ func (snh *NotificationHandler) handleFailedOver(ctx context.Context, handlerCtx
|
||||
// FAILED_OVER notifications indicate that a connection failover has completed
|
||||
// Restore normal timeouts for the specific connection that received this notification
|
||||
if len(notification) < 2 {
|
||||
internal.Logger.Printf(ctx, "hitless: invalid FAILED_OVER notification: %v", notification)
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
// Get the connection from handler context and type assert to connectionAdapter
|
||||
if handlerCtx.Conn == nil {
|
||||
internal.Logger.Printf(ctx, "hitless: no connection in handler context for FAILED_OVER notification")
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
// Type assert to connectionAdapter which implements ConnectionWithRelaxedTimeout
|
||||
connAdapter, ok := handlerCtx.Conn.(interfaces.ConnectionWithRelaxedTimeout)
|
||||
if !ok {
|
||||
internal.Logger.Printf(ctx, "hitless: invalid connection type in handler context for FAILED_OVER notification")
|
||||
return ErrInvalidNotification
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user