1
0
mirror of https://github.com/redis/go-redis.git synced 2025-12-03 18:31:14 +03:00

proper notification format

This commit is contained in:
Nedyalko Dyakov
2025-11-17 18:55:34 +02:00
parent a247360a76
commit 4631320c94
7 changed files with 191 additions and 89 deletions

View File

@@ -45,29 +45,35 @@ func TestClusterClientSMigratedCallback(t *testing.T) {
return
}
// Temporarily replace the cluster state reload with our test version
var receivedSlot int
originalCallback := manager
manager.SetClusterStateReloadCallback(func(ctx context.Context, slot int) {
// Set up cluster state reload callback for testing
var receivedHostPort string
var receivedSlotRanges []string
manager.SetClusterStateReloadCallback(func(ctx context.Context, hostPort string, slotRanges []string) {
reloadCalled.Store(true)
receivedSlot = slot
receivedHostPort = hostPort
receivedSlotRanges = slotRanges
})
// Trigger the callback (this is what SMIGRATED notification would do)
ctx := context.Background()
testSlot := 1234
manager.TriggerClusterStateReload(ctx, testSlot)
testHostPort := "127.0.0.1:6379"
testSlotRanges := []string{"1234", "5000-6000"}
manager.TriggerClusterStateReload(ctx, testHostPort, testSlotRanges)
// Verify callback was called
if !reloadCalled.Load() {
t.Error("Cluster state reload callback should have been called")
}
// Verify slot was passed correctly
if receivedSlot != testSlot {
t.Errorf("Expected slot %d, got %d", testSlot, receivedSlot)
// Verify host:port was passed correctly
if receivedHostPort != testHostPort {
t.Errorf("Expected host:port %s, got %s", testHostPort, receivedHostPort)
}
// Verify slot ranges were passed correctly
if len(receivedSlotRanges) != len(testSlotRanges) {
t.Errorf("Expected %d slot ranges, got %d", len(testSlotRanges), len(receivedSlotRanges))
}
_ = originalCallback
})
t.Run("NoCallbackWithoutMaintNotifications", func(t *testing.T) {
@@ -121,22 +127,29 @@ func TestClusterClientSMigratedIntegration(t *testing.T) {
// We can't directly test LazyReload being called without a real cluster,
// but we can verify the callback mechanism works
var callbackWorks atomic.Bool
var receivedSlot int
manager.SetClusterStateReloadCallback(func(ctx context.Context, slot int) {
var receivedHostPort string
var receivedSlotRanges []string
manager.SetClusterStateReloadCallback(func(ctx context.Context, hostPort string, slotRanges []string) {
callbackWorks.Store(true)
receivedSlot = slot
receivedHostPort = hostPort
receivedSlotRanges = slotRanges
})
ctx := context.Background()
testSlot := 5678
manager.TriggerClusterStateReload(ctx, testSlot)
testHostPort := "127.0.0.1:7000"
testSlotRanges := []string{"5678"}
manager.TriggerClusterStateReload(ctx, testHostPort, testSlotRanges)
if !callbackWorks.Load() {
t.Error("Callback mechanism should work")
}
if receivedSlot != testSlot {
t.Errorf("Expected slot %d, got %d", testSlot, receivedSlot)
if receivedHostPort != testHostPort {
t.Errorf("Expected host:port %s, got %s", testHostPort, receivedHostPort)
}
if len(receivedSlotRanges) != 1 || receivedSlotRanges[0] != "5678" {
t.Errorf("Expected slot ranges [5678], got %v", receivedSlotRanges)
}
})
}

View File

@@ -121,10 +121,11 @@ const (
UnrelaxedTimeoutMessage = "clearing relaxed timeout"
ManagerNotInitializedMessage = "manager not initialized"
FailedToMarkForHandoffMessage = "failed to mark connection for handoff"
InvalidSlotInSMigratingNotificationMessage = "invalid slot in SMIGRATING notification"
InvalidSlotInSMigratedNotificationMessage = "invalid slot in SMIGRATED notification"
SlotMigratingMessage = "slot is migrating, applying relaxed timeout"
SlotMigratedMessage = "slot has migrated, triggering cluster state reload"
InvalidSeqIDInSMigratingNotificationMessage = "invalid SeqID in SMIGRATING notification"
InvalidSeqIDInSMigratedNotificationMessage = "invalid SeqID in SMIGRATED notification"
InvalidHostPortInSMigratedNotificationMessage = "invalid host:port in SMIGRATED notification"
SlotMigratingMessage = "slots migrating, applying relaxed timeout"
SlotMigratedMessage = "slots migrated, triggering cluster state reload"
// ========================================
// used in pool/conn
@@ -629,31 +630,41 @@ func ExtractDataFromLogMessage(logMessage string) map[string]interface{} {
}
// Cluster notification functions
func InvalidSlotInSMigratingNotification(slot interface{}) string {
message := fmt.Sprintf("%s: %v", InvalidSlotInSMigratingNotificationMessage, slot)
func InvalidSeqIDInSMigratingNotification(seqID interface{}) string {
message := fmt.Sprintf("%s: %v", InvalidSeqIDInSMigratingNotificationMessage, seqID)
return appendJSONIfDebug(message, map[string]interface{}{
"slot": fmt.Sprintf("%v", slot),
"seqID": fmt.Sprintf("%v", seqID),
})
}
func InvalidSlotInSMigratedNotification(slot interface{}) string {
message := fmt.Sprintf("%s: %v", InvalidSlotInSMigratedNotificationMessage, slot)
func InvalidSeqIDInSMigratedNotification(seqID interface{}) string {
message := fmt.Sprintf("%s: %v", InvalidSeqIDInSMigratedNotificationMessage, seqID)
return appendJSONIfDebug(message, map[string]interface{}{
"slot": fmt.Sprintf("%v", slot),
"seqID": fmt.Sprintf("%v", seqID),
})
}
func SlotMigrating(connID uint64, slot int64) string {
message := fmt.Sprintf("conn[%d] %s %d", connID, SlotMigratingMessage, slot)
func InvalidHostPortInSMigratedNotification(hostPort interface{}) string {
message := fmt.Sprintf("%s: %v", InvalidHostPortInSMigratedNotificationMessage, hostPort)
return appendJSONIfDebug(message, map[string]interface{}{
"hostPort": fmt.Sprintf("%v", hostPort),
})
}
func SlotMigrating(connID uint64, seqID int64, slotRanges []string) string {
message := fmt.Sprintf("conn[%d] %s seqID=%d slots=%v", connID, SlotMigratingMessage, seqID, slotRanges)
return appendJSONIfDebug(message, map[string]interface{}{
"connID": connID,
"slot": slot,
"seqID": seqID,
"slotRanges": slotRanges,
})
}
func SlotMigrated(slot int64) string {
message := fmt.Sprintf("%s %d", SlotMigratedMessage, slot)
func SlotMigrated(seqID int64, hostPort string, slotRanges []string) string {
message := fmt.Sprintf("%s seqID=%d host:port=%s slots=%v", SlotMigratedMessage, seqID, hostPort, slotRanges)
return appendJSONIfDebug(message, map[string]interface{}{
"slot": slot,
"seqID": seqID,
"hostPort": hostPort,
"slotRanges": slotRanges,
})
}

View File

@@ -6,8 +6,8 @@ Seamless Redis connection handoffs during cluster maintenance operations without
**Cluster notifications are now supported for ClusterClient!**
- **SMIGRATING**: Relaxes timeouts when a slot is being migrated
- **SMIGRATED**: Reloads cluster state when a slot migration completes
- **SMIGRATING**: `["SMIGRATING", SeqID, slot/range, ...]` - Relaxes timeouts when slots are being migrated
- **SMIGRATED**: `["SMIGRATED", SeqID, host:port, slot/range, ...]` - Reloads cluster state when slot migration completes
**Note:** Other maintenance notifications (MOVING, MIGRATING, MIGRATED, FAILING_OVER, FAILED_OVER) are supported only in standalone Redis clients. Cluster clients support SMIGRATING and SMIGRATED for cluster-specific slot migration handling.

View File

@@ -92,10 +92,11 @@ type MovingOperation struct {
// ClusterStateReloadCallback is a callback function that triggers cluster state reload.
// This is used by node clients to notify their parent ClusterClient about SMIGRATED notifications.
// The slot parameter indicates which slot has migrated (0-16383).
// The hostPort parameter indicates the destination node (e.g., "127.0.0.1:6379").
// The slotRanges parameter contains the migrated slots (e.g., ["1234", "5000-6000"]).
// Currently, implementations typically reload the entire cluster state, but in the future
// this could be optimized to reload only the specific slot.
type ClusterStateReloadCallback func(ctx context.Context, slot int)
// this could be optimized to reload only the specific slots.
type ClusterStateReloadCallback func(ctx context.Context, hostPort string, slotRanges []string)
// NewManager creates a new simplified manager.
func NewManager(client interfaces.ClientInterface, pool pool.Pooler, config *Config) (*Manager, error) {
@@ -340,9 +341,9 @@ func (hm *Manager) SetClusterStateReloadCallback(callback ClusterStateReloadCall
}
// TriggerClusterStateReload calls the cluster state reload callback if it's set.
// This is called when a SMOVED notification is received.
func (hm *Manager) TriggerClusterStateReload(ctx context.Context, slot int) {
// This is called when a SMIGRATED notification is received.
func (hm *Manager) TriggerClusterStateReload(ctx context.Context, hostPort string, slotRanges []string) {
if hm.clusterStateReloadCallback != nil {
hm.clusterStateReloadCallback(ctx, slot)
hm.clusterStateReloadCallback(ctx, hostPort, slotRanges)
}
}

View File

@@ -294,19 +294,30 @@ func (snh *NotificationHandler) handleFailedOver(ctx context.Context, handlerCtx
// handleSMigrating processes SMIGRATING notifications.
// SMIGRATING indicates that a cluster slot is in the process of migrating to a different node.
// This is a per-connection notification that applies relaxed timeouts during slot migration.
// Expected format: ["SMIGRATING", slot, ...]
// Expected format: ["SMIGRATING", SeqID, slot/range1-range2, ...]
func (snh *NotificationHandler) handleSMigrating(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
if len(notification) < 2 {
if len(notification) < 3 {
internal.Logger.Printf(ctx, logs.InvalidNotification("SMIGRATING", notification))
return ErrInvalidNotification
}
slot, ok := notification[1].(int64)
// Extract SeqID (position 1)
seqID, ok := notification[1].(int64)
if !ok {
internal.Logger.Printf(ctx, logs.InvalidSlotInSMigratingNotification(notification[1]))
internal.Logger.Printf(ctx, logs.InvalidSeqIDInSMigratingNotification(notification[1]))
return ErrInvalidNotification
}
// Extract slot ranges (position 2+)
// For now, we just extract them for logging
// Format can be: single slot "1234" or range "100-200"
var slotRanges []string
for i := 2; i < len(notification); i++ {
if slotRange, ok := notification[i].(string); ok {
slotRanges = append(slotRanges, slotRange)
}
}
if handlerCtx.Conn == nil {
internal.Logger.Printf(ctx, logs.NoConnectionInHandlerContext("SMIGRATING"))
return ErrInvalidNotification
@@ -320,7 +331,7 @@ func (snh *NotificationHandler) handleSMigrating(ctx context.Context, handlerCtx
// Apply relaxed timeout to this specific connection
if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(ctx, logs.SlotMigrating(conn.GetID(), slot))
internal.Logger.Printf(ctx, logs.SlotMigrating(conn.GetID(), seqID, slotRanges))
}
conn.SetRelaxedTimeout(snh.manager.config.RelaxedTimeout, snh.manager.config.RelaxedTimeout)
return nil
@@ -329,26 +340,48 @@ func (snh *NotificationHandler) handleSMigrating(ctx context.Context, handlerCtx
// handleSMigrated processes SMIGRATED notifications.
// SMIGRATED indicates that a cluster slot has finished migrating to a different node.
// This is a cluster-level notification that triggers cluster state reload.
// Expected format: ["SMIGRATED", slot, ...]
// Expected format: ["SMIGRATED", SeqID, host:port, slot1/range1-range2, ...]
func (snh *NotificationHandler) handleSMigrated(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
if len(notification) < 2 {
if len(notification) < 4 {
internal.Logger.Printf(ctx, logs.InvalidNotification("SMIGRATED", notification))
return ErrInvalidNotification
}
slot, ok := notification[1].(int64)
// Extract SeqID (position 1)
seqID, ok := notification[1].(int64)
if !ok {
internal.Logger.Printf(ctx, logs.InvalidSlotInSMigratedNotification(notification[1]))
internal.Logger.Printf(ctx, logs.InvalidSeqIDInSMigratedNotification(notification[1]))
return ErrInvalidNotification
}
if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(ctx, logs.SlotMigrated(slot))
// Extract host:port (position 2)
hostPort, ok := notification[2].(string)
if !ok {
internal.Logger.Printf(ctx, logs.InvalidHostPortInSMigratedNotification(notification[2]))
return ErrInvalidNotification
}
// Trigger cluster state reload via callback, passing the slot ID
// This allows for future optimization of partial slot reloads
snh.manager.TriggerClusterStateReload(ctx, int(slot))
// Extract slot ranges (position 3+)
// For now, we just extract them for logging
// Format can be: single slot "1234" or range "100-200"
var slotRanges []string
for i := 3; i < len(notification); i++ {
if slotRange, ok := notification[i].(string); ok {
slotRanges = append(slotRanges, slotRange)
}
}
if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(ctx, logs.SlotMigrated(seqID, hostPort, slotRanges))
}
// Trigger cluster state reload via callback, passing host:port and slot ranges
// For now, implementations just log these and trigger a full reload
// In the future, this could be optimized to reload only the specific slots
snh.manager.TriggerClusterStateReload(ctx, hostPort, slotRanges)
// TODO: Should we also clear the relaxed timeout here (like MIGRATED does)?
// Currently we only trigger state reload, but the timeout stays relaxed
return nil
}

View File

@@ -34,8 +34,8 @@ func TestSMigratingNotificationHandler(t *testing.T) {
// Create a mock connection
conn := createMockConnection()
// Create SMIGRATING notification: ["SMIGRATING", slot]
notification := []interface{}{"SMIGRATING", int64(1234)}
// Create SMIGRATING notification: ["SMIGRATING", SeqID, slot/range, ...]
notification := []interface{}{"SMIGRATING", int64(123), "1234", "5000-6000"}
ctx := context.Background()
handlerCtx := push.NotificationHandlerContext{
@@ -76,7 +76,7 @@ func TestSMigratingNotificationHandler(t *testing.T) {
}
})
t.Run("InvalidSMigratingNotification_InvalidSlot", func(t *testing.T) {
t.Run("InvalidSMigratingNotification_InvalidSeqID", func(t *testing.T) {
config := DefaultConfig()
manager := &Manager{
config: config,
@@ -86,8 +86,8 @@ func TestSMigratingNotificationHandler(t *testing.T) {
operationsManager: manager,
}
// Invalid notification - slot is not int64
notification := []interface{}{"SMIGRATING", "not-a-number"}
// Invalid notification - SeqID is not int64
notification := []interface{}{"SMIGRATING", "not-a-number", "1234"}
ctx := context.Background()
handlerCtx := push.NotificationHandlerContext{}
@@ -108,7 +108,7 @@ func TestSMigratingNotificationHandler(t *testing.T) {
operationsManager: manager,
}
notification := []interface{}{"SMIGRATING", int64(1234)}
notification := []interface{}{"SMIGRATING", int64(123), "1234"}
ctx := context.Background()
handlerCtx := push.NotificationHandlerContext{} // No connection
@@ -147,13 +147,15 @@ func TestSMigratedNotificationHandler(t *testing.T) {
t.Run("ValidSMigratedNotification", func(t *testing.T) {
// Track if callback was called
var callbackCalled atomic.Bool
var receivedSlot int
var receivedHostPort string
var receivedSlotRanges []string
// Create a mock manager with callback
manager := &Manager{
clusterStateReloadCallback: func(ctx context.Context, slot int) {
clusterStateReloadCallback: func(ctx context.Context, hostPort string, slotRanges []string) {
callbackCalled.Store(true)
receivedSlot = slot
receivedHostPort = hostPort
receivedSlotRanges = slotRanges
},
}
@@ -163,8 +165,8 @@ func TestSMigratedNotificationHandler(t *testing.T) {
operationsManager: manager,
}
// Create SMIGRATED notification: ["SMIGRATED", slot]
notification := []interface{}{"SMIGRATED", int64(1234)}
// Create SMIGRATED notification: ["SMIGRATED", SeqID, host:port, slot/range, ...]
notification := []interface{}{"SMIGRATED", int64(123), "127.0.0.1:6379", "1234", "5000-6000"}
ctx := context.Background()
handlerCtx := push.NotificationHandlerContext{}
@@ -180,9 +182,22 @@ func TestSMigratedNotificationHandler(t *testing.T) {
t.Error("Cluster state reload callback should have been called")
}
// Verify slot was passed correctly
if receivedSlot != 1234 {
t.Errorf("Expected slot 1234, got %d", receivedSlot)
// Verify host:port was passed correctly
if receivedHostPort != "127.0.0.1:6379" {
t.Errorf("Expected host:port '127.0.0.1:6379', got '%s'", receivedHostPort)
}
// Verify slot ranges were passed correctly
if len(receivedSlotRanges) != 2 {
t.Errorf("Expected 2 slot ranges, got %d", len(receivedSlotRanges))
}
if len(receivedSlotRanges) >= 2 {
if receivedSlotRanges[0] != "1234" {
t.Errorf("Expected first slot range '1234', got '%s'", receivedSlotRanges[0])
}
if receivedSlotRanges[1] != "5000-6000" {
t.Errorf("Expected second slot range '5000-6000', got '%s'", receivedSlotRanges[1])
}
}
})
@@ -205,15 +220,34 @@ func TestSMigratedNotificationHandler(t *testing.T) {
}
})
t.Run("InvalidSMigratedNotification_InvalidSlot", func(t *testing.T) {
t.Run("InvalidSMigratedNotification_InvalidSeqID", func(t *testing.T) {
manager := &Manager{}
handler := &NotificationHandler{
manager: manager,
operationsManager: manager,
}
// Invalid notification - slot is not int64
notification := []interface{}{"SMIGRATED", "not-a-number"}
// Invalid notification - SeqID is not int64
notification := []interface{}{"SMIGRATED", "not-a-number", "127.0.0.1:6379", "1234"}
ctx := context.Background()
handlerCtx := push.NotificationHandlerContext{}
err := handler.handleSMigrated(ctx, handlerCtx, notification)
if err != ErrInvalidNotification {
t.Errorf("Expected ErrInvalidNotification, got: %v", err)
}
})
t.Run("InvalidSMigratedNotification_InvalidHostPort", func(t *testing.T) {
manager := &Manager{}
handler := &NotificationHandler{
manager: manager,
operationsManager: manager,
}
// Invalid notification - host:port is not string
notification := []interface{}{"SMIGRATED", int64(123), int64(999), "1234"}
ctx := context.Background()
handlerCtx := push.NotificationHandlerContext{}
@@ -232,7 +266,7 @@ func TestSMigratedNotificationHandler(t *testing.T) {
operationsManager: manager,
}
notification := []interface{}{"SMIGRATED", int64(1234)}
notification := []interface{}{"SMIGRATED", int64(123), "127.0.0.1:6379", "1234"}
ctx := context.Background()
handlerCtx := push.NotificationHandlerContext{}
@@ -271,20 +305,23 @@ func TestClusterStateReloadCallback(t *testing.T) {
t.Run("SetAndTriggerCallback", func(t *testing.T) {
var callbackCalled atomic.Bool
var receivedCtx context.Context
var receivedSlot int
var receivedHostPort string
var receivedSlotRanges []string
manager := &Manager{}
callback := func(ctx context.Context, slot int) {
callback := func(ctx context.Context, hostPort string, slotRanges []string) {
callbackCalled.Store(true)
receivedCtx = ctx
receivedSlot = slot
receivedHostPort = hostPort
receivedSlotRanges = slotRanges
}
manager.SetClusterStateReloadCallback(callback)
ctx := context.Background()
testSlot := 1234
manager.TriggerClusterStateReload(ctx, testSlot)
testHostPort := "127.0.0.1:6379"
testSlotRanges := []string{"1234", "5000-6000"}
manager.TriggerClusterStateReload(ctx, testHostPort, testSlotRanges)
if !callbackCalled.Load() {
t.Error("Callback should have been called")
@@ -294,8 +331,12 @@ func TestClusterStateReloadCallback(t *testing.T) {
t.Error("Callback should receive the correct context")
}
if receivedSlot != testSlot {
t.Errorf("Callback should receive the correct slot, got %d, want %d", receivedSlot, testSlot)
if receivedHostPort != testHostPort {
t.Errorf("Callback should receive the correct host:port, got %s, want %s", receivedHostPort, testHostPort)
}
if len(receivedSlotRanges) != len(testSlotRanges) {
t.Errorf("Callback should receive the correct slot ranges, got %v, want %v", receivedSlotRanges, testSlotRanges)
}
})
@@ -303,7 +344,7 @@ func TestClusterStateReloadCallback(t *testing.T) {
manager := &Manager{}
// Should not panic
ctx := context.Background()
manager.TriggerClusterStateReload(ctx, 1234)
manager.TriggerClusterStateReload(ctx, "127.0.0.1:6379", []string{"1234"})
})
}

View File

@@ -146,7 +146,7 @@ type ClusterOptions struct {
// cluster upgrade notifications gracefully and manage connection/pool state
// transitions seamlessly. Requires Protocol: 3 (RESP3) for push notifications.
// If nil, maintnotifications upgrades are in "auto" mode and will be enabled if the server supports it.
// The ClusterClient supports SMOVING notifications for cluster state management.
// The ClusterClient supports SMIGRATING and SMIGRATED notifications for cluster state management.
// Individual node clients handle other maintenance notifications (MOVING, MIGRATING, etc.).
MaintNotificationsConfig *maintnotifications.Config
}
@@ -1039,17 +1039,20 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
txPipeline: c.processTxPipeline,
})
// Set up SMOVING notification handling for cluster state reload
// When a node client receives a SMOVING notification, it should trigger
// Set up SMIGRATED notification handling for cluster state reload
// When a node client receives a SMIGRATED notification, it should trigger
// cluster state reload on the parent ClusterClient
if opt.MaintNotificationsConfig != nil {
c.nodes.OnNewNode(func(nodeClient *Client) {
manager := nodeClient.GetMaintNotificationsManager()
if manager != nil {
manager.SetClusterStateReloadCallback(func(ctx context.Context, slot int) {
manager.SetClusterStateReloadCallback(func(ctx context.Context, hostPort string, slotRanges []string) {
// Log the migration details for now
if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(ctx, "cluster: slots %v migrated to %s, reloading cluster state", slotRanges, hostPort)
}
// Currently we reload the entire cluster state
// In the future, this could be optimized to reload only the specific slot
_ = slot // slot parameter available for future optimization
// In the future, this could be optimized to reload only the specific slots
c.state.LazyReload()
})
}