1
0
mirror of https://github.com/redis/go-redis.git synced 2025-12-02 06:22:31 +03:00

smigrating/smigrated intro

This commit is contained in:
Nedyalko Dyakov
2025-11-17 18:25:23 +02:00
parent 55aa026404
commit a247360a76
8 changed files with 642 additions and 17 deletions

156
cluster_smigrating_test.go Normal file
View File

@@ -0,0 +1,156 @@
package redis
import (
"context"
"sync/atomic"
"testing"
"github.com/redis/go-redis/v9/maintnotifications"
)
// TestClusterClientSMigratedCallback tests that ClusterClient sets up SMIGRATED callback on node clients
func TestClusterClientSMigratedCallback(t *testing.T) {
t.Run("CallbackSetupWithMaintNotifications", func(t *testing.T) {
// Track if state reload was called
var reloadCalled atomic.Bool
// Create cluster options with maintnotifications enabled
opt := &ClusterOptions{
Addrs: []string{"localhost:7000"}, // Dummy address
MaintNotificationsConfig: &maintnotifications.Config{
Mode: maintnotifications.ModeEnabled,
},
// Use custom NewClient to track when nodes are created
NewClient: func(opt *Options) *Client {
client := NewClient(opt)
return client
},
}
// Create cluster client
cluster := NewClusterClient(opt)
defer cluster.Close()
// Manually trigger node creation by calling GetOrCreate
// This simulates what happens during normal cluster operations
node, err := cluster.nodes.GetOrCreate("localhost:7000")
if err != nil {
t.Fatalf("Failed to create node: %v", err)
}
// Get the maintnotifications manager from the node client
manager := node.Client.GetMaintNotificationsManager()
if manager == nil {
t.Skip("MaintNotifications manager not initialized (expected if not connected to real Redis)")
return
}
// Temporarily replace the cluster state reload with our test version
var receivedSlot int
originalCallback := manager
manager.SetClusterStateReloadCallback(func(ctx context.Context, slot int) {
reloadCalled.Store(true)
receivedSlot = slot
})
// Trigger the callback (this is what SMIGRATED notification would do)
ctx := context.Background()
testSlot := 1234
manager.TriggerClusterStateReload(ctx, testSlot)
// Verify callback was called
if !reloadCalled.Load() {
t.Error("Cluster state reload callback should have been called")
}
// Verify slot was passed correctly
if receivedSlot != testSlot {
t.Errorf("Expected slot %d, got %d", testSlot, receivedSlot)
}
_ = originalCallback
})
t.Run("NoCallbackWithoutMaintNotifications", func(t *testing.T) {
// Create cluster options WITHOUT maintnotifications
opt := &ClusterOptions{
Addrs: []string{"localhost:7000"}, // Dummy address
// MaintNotificationsConfig is nil
}
// Create cluster client
cluster := NewClusterClient(opt)
defer cluster.Close()
// The OnNewNode callback should not be registered when MaintNotificationsConfig is nil
// This test just verifies that the cluster client doesn't panic
})
}
// TestClusterClientSMigratedIntegration tests SMIGRATED notification handling in cluster context
func TestClusterClientSMigratedIntegration(t *testing.T) {
t.Run("SMigratedTriggersStateReload", func(t *testing.T) {
// This test verifies the integration between SMIGRATED notification and cluster state reload
// We verify that the callback is properly set up to call cluster.state.LazyReload()
// Create cluster options with maintnotifications enabled
opt := &ClusterOptions{
Addrs: []string{"localhost:7000"},
MaintNotificationsConfig: &maintnotifications.Config{
Mode: maintnotifications.ModeEnabled,
},
}
// Create cluster client
cluster := NewClusterClient(opt)
defer cluster.Close()
// Create a node
node, err := cluster.nodes.GetOrCreate("localhost:7000")
if err != nil {
t.Fatalf("Failed to create node: %v", err)
}
// Get the maintnotifications manager
manager := node.Client.GetMaintNotificationsManager()
if manager == nil {
t.Skip("MaintNotifications manager not initialized (expected if not connected to real Redis)")
return
}
// Verify that the callback is set by checking it's not nil
// We can't directly test LazyReload being called without a real cluster,
// but we can verify the callback mechanism works
var callbackWorks atomic.Bool
var receivedSlot int
manager.SetClusterStateReloadCallback(func(ctx context.Context, slot int) {
callbackWorks.Store(true)
receivedSlot = slot
})
ctx := context.Background()
testSlot := 5678
manager.TriggerClusterStateReload(ctx, testSlot)
if !callbackWorks.Load() {
t.Error("Callback mechanism should work")
}
if receivedSlot != testSlot {
t.Errorf("Expected slot %d, got %d", testSlot, receivedSlot)
}
})
}
// TestSMigratingAndSMigratedConstants verifies the SMIGRATING and SMIGRATED constants are exported
func TestSMigratingAndSMigratedConstants(t *testing.T) {
// This test verifies that the SMIGRATING and SMIGRATED constants are properly defined
// and accessible from the maintnotifications package
if maintnotifications.NotificationSMigrating != "SMIGRATING" {
t.Errorf("Expected NotificationSMigrating to be 'SMIGRATING', got: %s", maintnotifications.NotificationSMigrating)
}
if maintnotifications.NotificationSMigrated != "SMIGRATED" {
t.Errorf("Expected NotificationSMigrated to be 'SMIGRATED', got: %s", maintnotifications.NotificationSMigrated)
}
}

View File

@@ -121,6 +121,10 @@ const (
UnrelaxedTimeoutMessage = "clearing relaxed timeout"
ManagerNotInitializedMessage = "manager not initialized"
FailedToMarkForHandoffMessage = "failed to mark connection for handoff"
InvalidSlotInSMigratingNotificationMessage = "invalid slot in SMIGRATING notification"
InvalidSlotInSMigratedNotificationMessage = "invalid slot in SMIGRATED notification"
SlotMigratingMessage = "slot is migrating, applying relaxed timeout"
SlotMigratedMessage = "slot has migrated, triggering cluster state reload"
// ========================================
// used in pool/conn
@@ -623,3 +627,33 @@ func ExtractDataFromLogMessage(logMessage string) map[string]interface{} {
// If JSON parsing fails, return empty map
return result
}
// Cluster notification functions
func InvalidSlotInSMigratingNotification(slot interface{}) string {
message := fmt.Sprintf("%s: %v", InvalidSlotInSMigratingNotificationMessage, slot)
return appendJSONIfDebug(message, map[string]interface{}{
"slot": fmt.Sprintf("%v", slot),
})
}
func InvalidSlotInSMigratedNotification(slot interface{}) string {
message := fmt.Sprintf("%s: %v", InvalidSlotInSMigratedNotificationMessage, slot)
return appendJSONIfDebug(message, map[string]interface{}{
"slot": fmt.Sprintf("%v", slot),
})
}
func SlotMigrating(connID uint64, slot int64) string {
message := fmt.Sprintf("conn[%d] %s %d", connID, SlotMigratingMessage, slot)
return appendJSONIfDebug(message, map[string]interface{}{
"connID": connID,
"slot": slot,
})
}
func SlotMigrated(slot int64) string {
message := fmt.Sprintf("%s %d", SlotMigratedMessage, slot)
return appendJSONIfDebug(message, map[string]interface{}{
"slot": slot,
})
}

View File

@@ -2,8 +2,14 @@
Seamless Redis connection handoffs during cluster maintenance operations without dropping connections.
## ⚠️ **Important Note**
**Maintenance notifications are currently supported only in standalone Redis clients.** Cluster clients (ClusterClient, FailoverClient, etc.) do not yet support this functionality.
## Cluster Support
**Cluster notifications are now supported for ClusterClient!**
- **SMIGRATING**: Relaxes timeouts when a slot is being migrated
- **SMIGRATED**: Reloads cluster state when a slot migration completes
**Note:** Other maintenance notifications (MOVING, MIGRATING, MIGRATED, FAILING_OVER, FAILED_OVER) are supported only in standalone Redis clients. Cluster clients support SMIGRATING and SMIGRATED for cluster-specific slot migration handling.
## Quick Start

View File

@@ -18,11 +18,13 @@ import (
// Push notification type constants for maintenance
const (
NotificationMoving = "MOVING"
NotificationMigrating = "MIGRATING"
NotificationMigrated = "MIGRATED"
NotificationFailingOver = "FAILING_OVER"
NotificationFailedOver = "FAILED_OVER"
NotificationMoving = "MOVING" // Per-connection handoff notification
NotificationMigrating = "MIGRATING" // Per-connection migration start notification - relaxes timeouts
NotificationMigrated = "MIGRATED" // Per-connection migration complete notification - clears relaxed timeouts
NotificationFailingOver = "FAILING_OVER" // Per-connection failover start notification - relaxes timeouts
NotificationFailedOver = "FAILED_OVER" // Per-connection failover complete notification - clears relaxed timeouts
NotificationSMigrating = "SMIGRATING" // Cluster slot migrating notification - relaxes timeouts
NotificationSMigrated = "SMIGRATED" // Cluster slot migrated notification - triggers cluster state reload
)
// maintenanceNotificationTypes contains all notification types that maintenance handles
@@ -32,6 +34,8 @@ var maintenanceNotificationTypes = []string{
NotificationMigrated,
NotificationFailingOver,
NotificationFailedOver,
NotificationSMigrating,
NotificationSMigrated,
}
// NotificationHook is called before and after notification processing
@@ -73,6 +77,9 @@ type Manager struct {
hooks []NotificationHook
hooksMu sync.RWMutex // Protects hooks slice
poolHooksRef *PoolHook
// Cluster state reload callback for SMIGRATED notifications
clusterStateReloadCallback ClusterStateReloadCallback
}
// MovingOperation tracks an active MOVING operation.
@@ -83,6 +90,13 @@ type MovingOperation struct {
Deadline time.Time
}
// ClusterStateReloadCallback is a callback function that triggers cluster state reload.
// This is used by node clients to notify their parent ClusterClient about SMIGRATED notifications.
// The slot parameter indicates which slot has migrated (0-16383).
// Currently, implementations typically reload the entire cluster state, but in the future
// this could be optimized to reload only the specific slot.
type ClusterStateReloadCallback func(ctx context.Context, slot int)
// NewManager creates a new simplified manager.
func NewManager(client interfaces.ClientInterface, pool pool.Pooler, config *Config) (*Manager, error) {
if client == nil {
@@ -318,3 +332,17 @@ func (hm *Manager) AddNotificationHook(notificationHook NotificationHook) {
defer hm.hooksMu.Unlock()
hm.hooks = append(hm.hooks, notificationHook)
}
// SetClusterStateReloadCallback sets the callback function that will be called when a SMOVED notification is received.
// This allows node clients to notify their parent ClusterClient to reload cluster state.
func (hm *Manager) SetClusterStateReloadCallback(callback ClusterStateReloadCallback) {
hm.clusterStateReloadCallback = callback
}
// TriggerClusterStateReload calls the cluster state reload callback if it's set.
// This is called when a SMOVED notification is received.
func (hm *Manager) TriggerClusterStateReload(ctx context.Context, slot int) {
if hm.clusterStateReloadCallback != nil {
hm.clusterStateReloadCallback(ctx, slot)
}
}

View File

@@ -217,6 +217,8 @@ func TestManagerRefactoring(t *testing.T) {
NotificationMigrated,
NotificationFailingOver,
NotificationFailedOver,
NotificationSMigrating,
NotificationSMigrated,
}
if len(maintenanceNotificationTypes) != len(expectedTypes) {

View File

@@ -49,6 +49,10 @@ func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, hand
err = snh.handleFailingOver(ctx, handlerCtx, modifiedNotification)
case NotificationFailedOver:
err = snh.handleFailedOver(ctx, handlerCtx, modifiedNotification)
case NotificationSMigrating:
err = snh.handleSMigrating(ctx, handlerCtx, modifiedNotification)
case NotificationSMigrated:
err = snh.handleSMigrated(ctx, handlerCtx, modifiedNotification)
default:
// Ignore other notification types (e.g., pub/sub messages)
err = nil
@@ -61,7 +65,9 @@ func (snh *NotificationHandler) HandlePushNotification(ctx context.Context, hand
}
// handleMoving processes MOVING notifications.
// ["MOVING", seqNum, timeS, endpoint] - per-connection handoff
// MOVING indicates that a connection should be handed off to a new endpoint.
// This is a per-connection notification that triggers connection handoff.
// Expected format: ["MOVING", seqNum, timeS, endpoint]
func (snh *NotificationHandler) handleMoving(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
if len(notification) < 3 {
internal.Logger.Printf(ctx, logs.InvalidNotification("MOVING", notification))
@@ -167,9 +173,10 @@ func (snh *NotificationHandler) markConnForHandoff(conn *pool.Conn, newEndpoint
}
// handleMigrating processes MIGRATING notifications.
// MIGRATING indicates that a connection migration is starting.
// This is a per-connection notification that applies relaxed timeouts.
// Expected format: ["MIGRATING", ...]
func (snh *NotificationHandler) handleMigrating(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
// MIGRATING notifications indicate that a connection is about to be migrated
// Apply relaxed timeouts to the specific connection that received this notification
if len(notification) < 2 {
internal.Logger.Printf(ctx, logs.InvalidNotification("MIGRATING", notification))
return ErrInvalidNotification
@@ -195,9 +202,10 @@ func (snh *NotificationHandler) handleMigrating(ctx context.Context, handlerCtx
}
// handleMigrated processes MIGRATED notifications.
// MIGRATED indicates that a connection migration has completed.
// This is a per-connection notification that clears relaxed timeouts.
// Expected format: ["MIGRATED", ...]
func (snh *NotificationHandler) handleMigrated(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
// MIGRATED notifications indicate that a connection migration has completed
// Restore normal timeouts for the specific connection that received this notification
if len(notification) < 2 {
internal.Logger.Printf(ctx, logs.InvalidNotification("MIGRATED", notification))
return ErrInvalidNotification
@@ -224,9 +232,10 @@ func (snh *NotificationHandler) handleMigrated(ctx context.Context, handlerCtx p
}
// handleFailingOver processes FAILING_OVER notifications.
// FAILING_OVER indicates that a failover is starting.
// This is a per-connection notification that applies relaxed timeouts.
// Expected format: ["FAILING_OVER", ...]
func (snh *NotificationHandler) handleFailingOver(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
// FAILING_OVER notifications indicate that a connection is about to failover
// Apply relaxed timeouts to the specific connection that received this notification
if len(notification) < 2 {
internal.Logger.Printf(ctx, logs.InvalidNotification("FAILING_OVER", notification))
return ErrInvalidNotification
@@ -253,9 +262,10 @@ func (snh *NotificationHandler) handleFailingOver(ctx context.Context, handlerCt
}
// handleFailedOver processes FAILED_OVER notifications.
// FAILED_OVER indicates that a failover has completed.
// This is a per-connection notification that clears relaxed timeouts.
// Expected format: ["FAILED_OVER", ...]
func (snh *NotificationHandler) handleFailedOver(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
// FAILED_OVER notifications indicate that a connection failover has completed
// Restore normal timeouts for the specific connection that received this notification
if len(notification) < 2 {
internal.Logger.Printf(ctx, logs.InvalidNotification("FAILED_OVER", notification))
return ErrInvalidNotification
@@ -280,3 +290,65 @@ func (snh *NotificationHandler) handleFailedOver(ctx context.Context, handlerCtx
conn.ClearRelaxedTimeout()
return nil
}
// handleSMigrating processes SMIGRATING notifications.
// SMIGRATING indicates that a cluster slot is in the process of migrating to a different node.
// This is a per-connection notification that applies relaxed timeouts during slot migration.
// Expected format: ["SMIGRATING", slot, ...]
func (snh *NotificationHandler) handleSMigrating(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
if len(notification) < 2 {
internal.Logger.Printf(ctx, logs.InvalidNotification("SMIGRATING", notification))
return ErrInvalidNotification
}
slot, ok := notification[1].(int64)
if !ok {
internal.Logger.Printf(ctx, logs.InvalidSlotInSMigratingNotification(notification[1]))
return ErrInvalidNotification
}
if handlerCtx.Conn == nil {
internal.Logger.Printf(ctx, logs.NoConnectionInHandlerContext("SMIGRATING"))
return ErrInvalidNotification
}
conn, ok := handlerCtx.Conn.(*pool.Conn)
if !ok {
internal.Logger.Printf(ctx, logs.InvalidConnectionTypeInHandlerContext("SMIGRATING", handlerCtx.Conn, handlerCtx))
return ErrInvalidNotification
}
// Apply relaxed timeout to this specific connection
if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(ctx, logs.SlotMigrating(conn.GetID(), slot))
}
conn.SetRelaxedTimeout(snh.manager.config.RelaxedTimeout, snh.manager.config.RelaxedTimeout)
return nil
}
// handleSMigrated processes SMIGRATED notifications.
// SMIGRATED indicates that a cluster slot has finished migrating to a different node.
// This is a cluster-level notification that triggers cluster state reload.
// Expected format: ["SMIGRATED", slot, ...]
func (snh *NotificationHandler) handleSMigrated(ctx context.Context, handlerCtx push.NotificationHandlerContext, notification []interface{}) error {
if len(notification) < 2 {
internal.Logger.Printf(ctx, logs.InvalidNotification("SMIGRATED", notification))
return ErrInvalidNotification
}
slot, ok := notification[1].(int64)
if !ok {
internal.Logger.Printf(ctx, logs.InvalidSlotInSMigratedNotification(notification[1]))
return ErrInvalidNotification
}
if internal.LogLevel.InfoOrAbove() {
internal.Logger.Printf(ctx, logs.SlotMigrated(slot))
}
// Trigger cluster state reload via callback, passing the slot ID
// This allows for future optimization of partial slot reloads
snh.manager.TriggerClusterStateReload(ctx, int(slot))
return nil
}

View File

@@ -0,0 +1,309 @@
package maintnotifications
import (
"context"
"sync/atomic"
"testing"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/push"
)
// createMockConnection creates a mock connection for testing
// Uses the mockNetConn from pool_hook_test.go
func createMockConnection() *pool.Conn {
mockNetConn := &mockNetConn{}
return pool.NewConn(mockNetConn)
}
// TestSMigratingNotificationHandler tests the SMIGRATING notification handler
func TestSMigratingNotificationHandler(t *testing.T) {
t.Run("ValidSMigratingNotification", func(t *testing.T) {
// Create a mock manager with config
config := DefaultConfig()
manager := &Manager{
config: config,
}
// Create notification handler
handler := &NotificationHandler{
manager: manager,
operationsManager: manager,
}
// Create a mock connection
conn := createMockConnection()
// Create SMIGRATING notification: ["SMIGRATING", slot]
notification := []interface{}{"SMIGRATING", int64(1234)}
ctx := context.Background()
handlerCtx := push.NotificationHandlerContext{
Conn: conn,
}
// Handle the notification
err := handler.handleSMigrating(ctx, handlerCtx, notification)
if err != nil {
t.Errorf("handleSMigrating should not error: %v", err)
}
// Verify relaxed timeout was applied
if !conn.HasRelaxedTimeout() {
t.Error("Relaxed timeout should have been set on the connection")
}
})
t.Run("InvalidSMigratingNotification_TooShort", func(t *testing.T) {
config := DefaultConfig()
manager := &Manager{
config: config,
}
handler := &NotificationHandler{
manager: manager,
operationsManager: manager,
}
// Invalid notification - too short
notification := []interface{}{"SMIGRATING"}
ctx := context.Background()
handlerCtx := push.NotificationHandlerContext{}
err := handler.handleSMigrating(ctx, handlerCtx, notification)
if err != ErrInvalidNotification {
t.Errorf("Expected ErrInvalidNotification, got: %v", err)
}
})
t.Run("InvalidSMigratingNotification_InvalidSlot", func(t *testing.T) {
config := DefaultConfig()
manager := &Manager{
config: config,
}
handler := &NotificationHandler{
manager: manager,
operationsManager: manager,
}
// Invalid notification - slot is not int64
notification := []interface{}{"SMIGRATING", "not-a-number"}
ctx := context.Background()
handlerCtx := push.NotificationHandlerContext{}
err := handler.handleSMigrating(ctx, handlerCtx, notification)
if err != ErrInvalidNotification {
t.Errorf("Expected ErrInvalidNotification, got: %v", err)
}
})
t.Run("SMigratingNotification_NoConnection", func(t *testing.T) {
config := DefaultConfig()
manager := &Manager{
config: config,
}
handler := &NotificationHandler{
manager: manager,
operationsManager: manager,
}
notification := []interface{}{"SMIGRATING", int64(1234)}
ctx := context.Background()
handlerCtx := push.NotificationHandlerContext{} // No connection
err := handler.handleSMigrating(ctx, handlerCtx, notification)
if err != ErrInvalidNotification {
t.Errorf("Expected ErrInvalidNotification when no connection, got: %v", err)
}
})
}
// TestSMigratingNotificationRegistration tests that SMIGRATING is registered in the notification types
func TestSMigratingNotificationRegistration(t *testing.T) {
found := false
for _, notifType := range maintenanceNotificationTypes {
if notifType == NotificationSMigrating {
found = true
break
}
}
if !found {
t.Error("SMIGRATING should be registered in maintenanceNotificationTypes")
}
}
// TestSMigratingConstant tests that the SMIGRATING constant is defined correctly
func TestSMigratingConstant(t *testing.T) {
if NotificationSMigrating != "SMIGRATING" {
t.Errorf("NotificationSMigrating constant should be 'SMIGRATING', got: %s", NotificationSMigrating)
}
}
// TestSMigratedNotificationHandler tests the SMIGRATED notification handler
func TestSMigratedNotificationHandler(t *testing.T) {
t.Run("ValidSMigratedNotification", func(t *testing.T) {
// Track if callback was called
var callbackCalled atomic.Bool
var receivedSlot int
// Create a mock manager with callback
manager := &Manager{
clusterStateReloadCallback: func(ctx context.Context, slot int) {
callbackCalled.Store(true)
receivedSlot = slot
},
}
// Create notification handler
handler := &NotificationHandler{
manager: manager,
operationsManager: manager,
}
// Create SMIGRATED notification: ["SMIGRATED", slot]
notification := []interface{}{"SMIGRATED", int64(1234)}
ctx := context.Background()
handlerCtx := push.NotificationHandlerContext{}
// Handle the notification
err := handler.handleSMigrated(ctx, handlerCtx, notification)
if err != nil {
t.Errorf("handleSMigrated should not error: %v", err)
}
// Verify callback was called
if !callbackCalled.Load() {
t.Error("Cluster state reload callback should have been called")
}
// Verify slot was passed correctly
if receivedSlot != 1234 {
t.Errorf("Expected slot 1234, got %d", receivedSlot)
}
})
t.Run("InvalidSMigratedNotification_TooShort", func(t *testing.T) {
manager := &Manager{}
handler := &NotificationHandler{
manager: manager,
operationsManager: manager,
}
// Invalid notification - too short
notification := []interface{}{"SMIGRATED"}
ctx := context.Background()
handlerCtx := push.NotificationHandlerContext{}
err := handler.handleSMigrated(ctx, handlerCtx, notification)
if err != ErrInvalidNotification {
t.Errorf("Expected ErrInvalidNotification, got: %v", err)
}
})
t.Run("InvalidSMigratedNotification_InvalidSlot", func(t *testing.T) {
manager := &Manager{}
handler := &NotificationHandler{
manager: manager,
operationsManager: manager,
}
// Invalid notification - slot is not int64
notification := []interface{}{"SMIGRATED", "not-a-number"}
ctx := context.Background()
handlerCtx := push.NotificationHandlerContext{}
err := handler.handleSMigrated(ctx, handlerCtx, notification)
if err != ErrInvalidNotification {
t.Errorf("Expected ErrInvalidNotification, got: %v", err)
}
})
t.Run("SMigratedNotification_NoCallback", func(t *testing.T) {
// Manager without callback should not panic
manager := &Manager{}
handler := &NotificationHandler{
manager: manager,
operationsManager: manager,
}
notification := []interface{}{"SMIGRATED", int64(1234)}
ctx := context.Background()
handlerCtx := push.NotificationHandlerContext{}
err := handler.handleSMigrated(ctx, handlerCtx, notification)
if err != nil {
t.Errorf("handleSMigrated should not error even without callback: %v", err)
}
})
}
// TestSMigratedNotificationRegistration tests that SMIGRATED is registered in the notification types
func TestSMigratedNotificationRegistration(t *testing.T) {
found := false
for _, notifType := range maintenanceNotificationTypes {
if notifType == NotificationSMigrated {
found = true
break
}
}
if !found {
t.Error("SMIGRATED should be registered in maintenanceNotificationTypes")
}
}
// TestSMigratedConstant tests that the SMIGRATED constant is defined correctly
func TestSMigratedConstant(t *testing.T) {
if NotificationSMigrated != "SMIGRATED" {
t.Errorf("NotificationSMigrated constant should be 'SMIGRATED', got: %s", NotificationSMigrated)
}
}
// TestClusterStateReloadCallback tests the callback setter and trigger
func TestClusterStateReloadCallback(t *testing.T) {
t.Run("SetAndTriggerCallback", func(t *testing.T) {
var callbackCalled atomic.Bool
var receivedCtx context.Context
var receivedSlot int
manager := &Manager{}
callback := func(ctx context.Context, slot int) {
callbackCalled.Store(true)
receivedCtx = ctx
receivedSlot = slot
}
manager.SetClusterStateReloadCallback(callback)
ctx := context.Background()
testSlot := 1234
manager.TriggerClusterStateReload(ctx, testSlot)
if !callbackCalled.Load() {
t.Error("Callback should have been called")
}
if receivedCtx != ctx {
t.Error("Callback should receive the correct context")
}
if receivedSlot != testSlot {
t.Errorf("Callback should receive the correct slot, got %d, want %d", receivedSlot, testSlot)
}
})
t.Run("TriggerWithoutCallback", func(t *testing.T) {
manager := &Manager{}
// Should not panic
ctx := context.Background()
manager.TriggerClusterStateReload(ctx, 1234)
})
}

View File

@@ -146,7 +146,8 @@ type ClusterOptions struct {
// cluster upgrade notifications gracefully and manage connection/pool state
// transitions seamlessly. Requires Protocol: 3 (RESP3) for push notifications.
// If nil, maintnotifications upgrades are in "auto" mode and will be enabled if the server supports it.
// The ClusterClient does not directly work with maintnotifications, it is up to the clients in the Nodes map to work with maintnotifications.
// The ClusterClient supports SMOVING notifications for cluster state management.
// Individual node clients handle other maintenance notifications (MOVING, MIGRATING, etc.).
MaintNotificationsConfig *maintnotifications.Config
}
@@ -1038,6 +1039,23 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
txPipeline: c.processTxPipeline,
})
// Set up SMOVING notification handling for cluster state reload
// When a node client receives a SMOVING notification, it should trigger
// cluster state reload on the parent ClusterClient
if opt.MaintNotificationsConfig != nil {
c.nodes.OnNewNode(func(nodeClient *Client) {
manager := nodeClient.GetMaintNotificationsManager()
if manager != nil {
manager.SetClusterStateReloadCallback(func(ctx context.Context, slot int) {
// Currently we reload the entire cluster state
// In the future, this could be optimized to reload only the specific slot
_ = slot // slot parameter available for future optimization
c.state.LazyReload()
})
}
})
}
return c
}