mirror of
				https://github.com/redis/go-redis.git
				synced 2025-11-04 02:33:24 +03:00 
			
		
		
		
	* update e2e test, change script * update script and tests * fixed bdbid parsing * disabled majority of tests, swapped event order * change the config tag * revert test order * fix typo * reenable all e2e tests * change the clonfig flag key for all e2e tests * improve logging for debug purposes of tests * longer deadline for FI in CI * increase waiting for notifications * extend tests * dont fail on flaky third client * fi new params * fix test build * more time for migrating * first wait for FI action, then assert notification * fix test build * fix tests * fix tests * change output * global print logs for tests * better output * fix error format * maybe the notification is already received * second and third client fix * print output if failed * better second and third client checks * output action data if notification is not received * stop command runner * database create / delete actions * database create / delete actions used in tests * fix import * remove example * remove unused var * use different port than the one in env * wait for action to get the response * fix output * fix create db config * fix create db config * use new database for client * fix create db config * db per scenario * less logs, correct check * Add CTRF to the scenario tests (#3545) * add some json ctrf improvements * fix -v * attempt to separate the output --------- Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com> --------- Co-authored-by: Nedyalko Dyakov <nedyalko.dyakov@gmail.com> Co-authored-by: kiryazovi-redis <ivaylo.kiryazov@redis.com> Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com>
		
			
				
	
	
		
			358 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			358 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package e2e
 | 
						|
 | 
						|
import (
 | 
						|
	"context"
 | 
						|
	"fmt"
 | 
						|
	"os"
 | 
						|
	"strings"
 | 
						|
	"testing"
 | 
						|
	"time"
 | 
						|
 | 
						|
	logs2 "github.com/redis/go-redis/v9/internal/maintnotifications/logs"
 | 
						|
	"github.com/redis/go-redis/v9/logging"
 | 
						|
	"github.com/redis/go-redis/v9/maintnotifications"
 | 
						|
)
 | 
						|
 | 
						|
// TestTimeoutConfigurationsPushNotifications tests push notifications with different timeout configurations
 | 
						|
func TestTimeoutConfigurationsPushNotifications(t *testing.T) {
 | 
						|
	if os.Getenv("E2E_SCENARIO_TESTS") != "true" {
 | 
						|
		t.Skip("Scenario tests require E2E_SCENARIO_TESTS=true")
 | 
						|
	}
 | 
						|
 | 
						|
	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
 | 
						|
	defer cancel()
 | 
						|
 | 
						|
	var dump = true
 | 
						|
 | 
						|
	var errorsDetected = false
 | 
						|
	var p = func(format string, args ...interface{}) {
 | 
						|
		printLog("TIMEOUT-CONFIGS", false, format, args...)
 | 
						|
	}
 | 
						|
 | 
						|
	var e = func(format string, args ...interface{}) {
 | 
						|
		errorsDetected = true
 | 
						|
		printLog("TIMEOUT-CONFIGS", true, format, args...)
 | 
						|
	}
 | 
						|
 | 
						|
	// Test different timeout configurations
 | 
						|
	timeoutConfigs := []struct {
 | 
						|
		name                       string
 | 
						|
		handoffTimeout             time.Duration
 | 
						|
		relaxedTimeout             time.Duration
 | 
						|
		postHandoffRelaxedDuration time.Duration
 | 
						|
		description                string
 | 
						|
		expectedBehavior           string
 | 
						|
	}{
 | 
						|
		{
 | 
						|
			name:                       "Conservative",
 | 
						|
			handoffTimeout:             60 * time.Second,
 | 
						|
			relaxedTimeout:             30 * time.Second,
 | 
						|
			postHandoffRelaxedDuration: 2 * time.Minute,
 | 
						|
			description:                "Conservative timeouts for stable environments",
 | 
						|
			expectedBehavior:           "Longer timeouts, fewer timeout errors",
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:                       "Aggressive",
 | 
						|
			handoffTimeout:             5 * time.Second,
 | 
						|
			relaxedTimeout:             3 * time.Second,
 | 
						|
			postHandoffRelaxedDuration: 1 * time.Second,
 | 
						|
			description:                "Aggressive timeouts for fast failover",
 | 
						|
			expectedBehavior:           "Shorter timeouts, faster recovery",
 | 
						|
		},
 | 
						|
		{
 | 
						|
			name:                       "HighLatency",
 | 
						|
			handoffTimeout:             90 * time.Second,
 | 
						|
			relaxedTimeout:             30 * time.Second,
 | 
						|
			postHandoffRelaxedDuration: 10 * time.Minute,
 | 
						|
			description:                "High latency environment timeouts",
 | 
						|
			expectedBehavior:           "Very long timeouts for high latency networks",
 | 
						|
		},
 | 
						|
	}
 | 
						|
 | 
						|
	logCollector.ClearLogs()
 | 
						|
	defer func() {
 | 
						|
		logCollector.Clear()
 | 
						|
	}()
 | 
						|
 | 
						|
	// Test each timeout configuration with its own fresh database
 | 
						|
	for _, timeoutTest := range timeoutConfigs {
 | 
						|
		t.Run(timeoutTest.name, func(t *testing.T) {
 | 
						|
			// Setup: Create fresh database and client factory for THIS timeout config test
 | 
						|
			bdbID, factory, cleanup := SetupTestDatabaseAndFactory(t, ctx, "standalone")
 | 
						|
			defer cleanup()
 | 
						|
			t.Logf("[TIMEOUT-CONFIGS-%s] Created test database with bdb_id: %d", timeoutTest.name, bdbID)
 | 
						|
 | 
						|
			// Get endpoint config from factory (now connected to new database)
 | 
						|
			endpointConfig := factory.GetConfig()
 | 
						|
 | 
						|
			// Create fault injector
 | 
						|
			faultInjector, err := CreateTestFaultInjector()
 | 
						|
			if err != nil {
 | 
						|
				t.Fatalf("[ERROR] Failed to create fault injector: %v", err)
 | 
						|
			}
 | 
						|
 | 
						|
			defer func() {
 | 
						|
				if dump {
 | 
						|
					p("Pool stats:")
 | 
						|
					factory.PrintPoolStats(t)
 | 
						|
				}
 | 
						|
			}()
 | 
						|
 | 
						|
			errorsDetected = false
 | 
						|
			var ef = func(format string, args ...interface{}) {
 | 
						|
				printLog("TIMEOUT-CONFIGS", true, format, args...)
 | 
						|
				t.FailNow()
 | 
						|
			}
 | 
						|
 | 
						|
			p("Testing timeout configuration: %s - %s", timeoutTest.name, timeoutTest.description)
 | 
						|
			p("Expected behavior: %s", timeoutTest.expectedBehavior)
 | 
						|
			p("Handoff timeout: %v, Relaxed timeout: %v, Post-handoff duration: %v",
 | 
						|
				timeoutTest.handoffTimeout, timeoutTest.relaxedTimeout, timeoutTest.postHandoffRelaxedDuration)
 | 
						|
 | 
						|
			minIdleConns := 4
 | 
						|
			poolSize := 10
 | 
						|
			maxConnections := 15
 | 
						|
 | 
						|
			// Create Redis client with specific timeout configuration
 | 
						|
			client, err := factory.Create(fmt.Sprintf("timeout-test-%s", timeoutTest.name), &CreateClientOptions{
 | 
						|
				Protocol:       3, // RESP3 required for push notifications
 | 
						|
				PoolSize:       poolSize,
 | 
						|
				MinIdleConns:   minIdleConns,
 | 
						|
				MaxActiveConns: maxConnections,
 | 
						|
				MaintNotificationsConfig: &maintnotifications.Config{
 | 
						|
					Mode:                       maintnotifications.ModeEnabled,
 | 
						|
					HandoffTimeout:             timeoutTest.handoffTimeout,
 | 
						|
					RelaxedTimeout:             timeoutTest.relaxedTimeout,
 | 
						|
					PostHandoffRelaxedDuration: timeoutTest.postHandoffRelaxedDuration,
 | 
						|
					MaxWorkers:                 20,
 | 
						|
					EndpointType:               maintnotifications.EndpointTypeExternalIP,
 | 
						|
				},
 | 
						|
				ClientName: fmt.Sprintf("timeout-test-%s", timeoutTest.name),
 | 
						|
			})
 | 
						|
			if err != nil {
 | 
						|
				ef("Failed to create client for %s: %v", timeoutTest.name, err)
 | 
						|
			}
 | 
						|
 | 
						|
			// Create timeout tracker
 | 
						|
			tracker := NewTrackingNotificationsHook()
 | 
						|
			logger := maintnotifications.NewLoggingHook(int(logging.LogLevelDebug))
 | 
						|
			setupNotificationHooks(client, tracker, logger)
 | 
						|
			defer func() {
 | 
						|
				tracker.Clear()
 | 
						|
			}()
 | 
						|
 | 
						|
			// Verify initial connectivity
 | 
						|
			err = client.Ping(ctx).Err()
 | 
						|
			if err != nil {
 | 
						|
				ef("Failed to ping Redis with %s timeout config: %v", timeoutTest.name, err)
 | 
						|
			}
 | 
						|
 | 
						|
			p("Client connected successfully with %s timeout configuration", timeoutTest.name)
 | 
						|
 | 
						|
			commandsRunner, _ := NewCommandRunner(client)
 | 
						|
			defer func() {
 | 
						|
				if dump {
 | 
						|
					stats := commandsRunner.GetStats()
 | 
						|
					p("%s timeout config stats: Operations: %d, Errors: %d, Timeout Errors: %d",
 | 
						|
						timeoutTest.name, stats.Operations, stats.Errors, stats.TimeoutErrors)
 | 
						|
				}
 | 
						|
				commandsRunner.Stop()
 | 
						|
			}()
 | 
						|
 | 
						|
			// Start command traffic
 | 
						|
			go func() {
 | 
						|
				commandsRunner.FireCommandsUntilStop(ctx)
 | 
						|
			}()
 | 
						|
 | 
						|
			// Record start time for timeout analysis
 | 
						|
			testStartTime := time.Now()
 | 
						|
 | 
						|
			// Test failover with this timeout configuration
 | 
						|
			p("Testing failover with %s timeout configuration...", timeoutTest.name)
 | 
						|
			failoverResp, err := faultInjector.TriggerAction(ctx, ActionRequest{
 | 
						|
				Type: "failover",
 | 
						|
				Parameters: map[string]interface{}{
 | 
						|
					"bdb_id": endpointConfig.BdbID,
 | 
						|
				},
 | 
						|
			})
 | 
						|
			if err != nil {
 | 
						|
				ef("Failed to trigger failover action for %s: %v", timeoutTest.name, err)
 | 
						|
			}
 | 
						|
 | 
						|
			// Wait for FAILING_OVER notification
 | 
						|
			match, found := logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool {
 | 
						|
				return strings.Contains(s, logs2.ProcessingNotificationMessage) && notificationType(s, "FAILING_OVER")
 | 
						|
			}, 3*time.Minute)
 | 
						|
			if !found {
 | 
						|
				ef("FAILING_OVER notification was not received for %s timeout config", timeoutTest.name)
 | 
						|
			}
 | 
						|
			failingOverData := logs2.ExtractDataFromLogMessage(match)
 | 
						|
			p("FAILING_OVER notification received for %s. %v", timeoutTest.name, failingOverData)
 | 
						|
 | 
						|
			// Wait for FAILED_OVER notification
 | 
						|
			seqIDToObserve := int64(failingOverData["seqID"].(float64))
 | 
						|
			connIDToObserve := uint64(failingOverData["connID"].(float64))
 | 
						|
			match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool {
 | 
						|
				return notificationType(s, "FAILED_OVER") && connID(s, connIDToObserve) && seqID(s, seqIDToObserve+1)
 | 
						|
			}, 3*time.Minute)
 | 
						|
			if !found {
 | 
						|
				ef("FAILED_OVER notification was not received for %s timeout config", timeoutTest.name)
 | 
						|
			}
 | 
						|
			failedOverData := logs2.ExtractDataFromLogMessage(match)
 | 
						|
			p("FAILED_OVER notification received for %s. %v", timeoutTest.name, failedOverData)
 | 
						|
 | 
						|
			// Wait for failover to complete
 | 
						|
			status, err := faultInjector.WaitForAction(ctx, failoverResp.ActionID,
 | 
						|
				WithMaxWaitTime(180*time.Second),
 | 
						|
				WithPollInterval(2*time.Second),
 | 
						|
			)
 | 
						|
			if err != nil {
 | 
						|
				ef("[FI] Failover action failed for %s: %v", timeoutTest.name, err)
 | 
						|
			}
 | 
						|
			p("[FI] Failover action completed for %s: %s %s", timeoutTest.name, status.Status, actionOutputIfFailed(status))
 | 
						|
 | 
						|
			// Continue traffic to observe timeout behavior
 | 
						|
			p("Continuing traffic for %v to observe timeout behavior...", timeoutTest.relaxedTimeout*2)
 | 
						|
			time.Sleep(timeoutTest.relaxedTimeout * 2)
 | 
						|
 | 
						|
			// Test migration to trigger more timeout scenarios
 | 
						|
			p("Testing migration with %s timeout configuration...", timeoutTest.name)
 | 
						|
			migrateResp, err := faultInjector.TriggerAction(ctx, ActionRequest{
 | 
						|
				Type: "migrate",
 | 
						|
				Parameters: map[string]interface{}{
 | 
						|
					"bdb_id": endpointConfig.BdbID,
 | 
						|
				},
 | 
						|
			})
 | 
						|
			if err != nil {
 | 
						|
				ef("Failed to trigger migrate action for %s: %v", timeoutTest.name, err)
 | 
						|
			}
 | 
						|
 | 
						|
			// Wait for migration to complete
 | 
						|
			status, err = faultInjector.WaitForAction(ctx, migrateResp.ActionID,
 | 
						|
				WithMaxWaitTime(240*time.Second),
 | 
						|
				WithPollInterval(2*time.Second),
 | 
						|
			)
 | 
						|
			if err != nil {
 | 
						|
				ef("[FI] Migrate action failed for %s: %v", timeoutTest.name, err)
 | 
						|
			}
 | 
						|
 | 
						|
			p("[FI] Migrate action completed for %s: %s %s", timeoutTest.name, status.Status, actionOutputIfFailed(status))
 | 
						|
 | 
						|
			// Wait for MIGRATING notification
 | 
						|
			match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool {
 | 
						|
				return strings.Contains(s, logs2.ProcessingNotificationMessage) && strings.Contains(s, "MIGRATING")
 | 
						|
			}, 60*time.Second)
 | 
						|
			if !found {
 | 
						|
				ef("MIGRATING notification was not received for %s timeout config", timeoutTest.name)
 | 
						|
			}
 | 
						|
			migrateData := logs2.ExtractDataFromLogMessage(match)
 | 
						|
			p("MIGRATING notification received for %s: %v", timeoutTest.name, migrateData)
 | 
						|
 | 
						|
			// do a bind action
 | 
						|
			bindResp, err := faultInjector.TriggerAction(ctx, ActionRequest{
 | 
						|
				Type: "bind",
 | 
						|
				Parameters: map[string]interface{}{
 | 
						|
					"bdb_id": endpointConfig.BdbID,
 | 
						|
				},
 | 
						|
			})
 | 
						|
			if err != nil {
 | 
						|
				ef("Failed to trigger bind action for %s: %v", timeoutTest.name, err)
 | 
						|
			}
 | 
						|
			status, err = faultInjector.WaitForAction(ctx, bindResp.ActionID,
 | 
						|
				WithMaxWaitTime(240*time.Second),
 | 
						|
				WithPollInterval(2*time.Second),
 | 
						|
			)
 | 
						|
			if err != nil {
 | 
						|
				ef("[FI] Bind action failed for %s: %v", timeoutTest.name, err)
 | 
						|
			}
 | 
						|
			p("[FI] Bind action completed for %s: %s %s", timeoutTest.name, status.Status, actionOutputIfFailed(status))
 | 
						|
 | 
						|
			// waiting for moving notification
 | 
						|
			match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool {
 | 
						|
				return strings.Contains(s, logs2.ProcessingNotificationMessage) && notificationType(s, "MOVING")
 | 
						|
			}, 3*time.Minute)
 | 
						|
			if !found {
 | 
						|
				ef("MOVING notification was not received for %s timeout config", timeoutTest.name)
 | 
						|
			}
 | 
						|
 | 
						|
			movingData := logs2.ExtractDataFromLogMessage(match)
 | 
						|
			p("MOVING notification received for %s. %v", timeoutTest.name, movingData)
 | 
						|
 | 
						|
			// Continue traffic for post-handoff timeout observation
 | 
						|
			p("Continuing traffic for %v to observe post-handoff timeout behavior...", 1*time.Minute)
 | 
						|
			time.Sleep(1 * time.Minute)
 | 
						|
 | 
						|
			commandsRunner.Stop()
 | 
						|
			testDuration := time.Since(testStartTime)
 | 
						|
 | 
						|
			// Analyze timeout behavior
 | 
						|
			trackerAnalysis := tracker.GetAnalysis()
 | 
						|
			logAnalysis := logCollector.GetAnalysis()
 | 
						|
			if trackerAnalysis.NotificationProcessingErrors > 0 {
 | 
						|
				e("Notification processing errors with %s timeout config: %d", timeoutTest.name, trackerAnalysis.NotificationProcessingErrors)
 | 
						|
			}
 | 
						|
 | 
						|
			// Validate timeout-specific behavior
 | 
						|
			switch timeoutTest.name {
 | 
						|
			case "Conservative":
 | 
						|
				if trackerAnalysis.UnrelaxedTimeoutCount > trackerAnalysis.RelaxedTimeoutCount {
 | 
						|
					e("Conservative config should have more relaxed than unrelaxed timeouts, got relaxed=%d, unrelaxed=%d",
 | 
						|
						trackerAnalysis.RelaxedTimeoutCount, trackerAnalysis.UnrelaxedTimeoutCount)
 | 
						|
				}
 | 
						|
			case "Aggressive":
 | 
						|
				// Aggressive timeouts should complete faster
 | 
						|
				if testDuration > 5*time.Minute {
 | 
						|
					e("Aggressive config took too long: %v", testDuration)
 | 
						|
				}
 | 
						|
				if logAnalysis.TotalHandoffRetries > logAnalysis.TotalHandoffCount {
 | 
						|
					e("Expect handoff retries since aggressive timeouts are shorter, got %d retries for %d handoffs",
 | 
						|
						logAnalysis.TotalHandoffRetries, logAnalysis.TotalHandoffCount)
 | 
						|
				}
 | 
						|
			case "HighLatency":
 | 
						|
				// High latency config should have very few unrelaxed after moving
 | 
						|
				if logAnalysis.UnrelaxedAfterMoving > 2 {
 | 
						|
					e("High latency config should have minimal unrelaxed timeouts after moving, got %d", logAnalysis.UnrelaxedAfterMoving)
 | 
						|
				}
 | 
						|
			}
 | 
						|
 | 
						|
			// Validate we received expected notifications
 | 
						|
			if trackerAnalysis.FailingOverCount == 0 {
 | 
						|
				e("Expected FAILING_OVER notifications with %s timeout config, got none", timeoutTest.name)
 | 
						|
			}
 | 
						|
			if trackerAnalysis.FailedOverCount == 0 {
 | 
						|
				e("Expected FAILED_OVER notifications with %s timeout config, got none", timeoutTest.name)
 | 
						|
			}
 | 
						|
			if trackerAnalysis.MigratingCount == 0 {
 | 
						|
				e("Expected MIGRATING notifications with %s timeout config, got none", timeoutTest.name)
 | 
						|
			}
 | 
						|
 | 
						|
			// Validate timeout counts are reasonable
 | 
						|
			if trackerAnalysis.RelaxedTimeoutCount == 0 {
 | 
						|
				e("Expected relaxed timeouts with %s config, got none", timeoutTest.name)
 | 
						|
			}
 | 
						|
 | 
						|
			if logAnalysis.SucceededHandoffCount == 0 {
 | 
						|
				e("Expected successful handoffs with %s config, got none", timeoutTest.name)
 | 
						|
			}
 | 
						|
 | 
						|
			if errorsDetected {
 | 
						|
				logCollector.DumpLogs()
 | 
						|
				trackerAnalysis.Print(t)
 | 
						|
				logCollector.Clear()
 | 
						|
				tracker.Clear()
 | 
						|
				ef("[FAIL] Errors detected with %s timeout config", timeoutTest.name)
 | 
						|
			}
 | 
						|
			p("Timeout configuration %s test completed successfully in %v", timeoutTest.name, testDuration)
 | 
						|
			p("Command runner stats:")
 | 
						|
			p("Operations: %d, Errors: %d, Timeout Errors: %d",
 | 
						|
				commandsRunner.GetStats().Operations, commandsRunner.GetStats().Errors, commandsRunner.GetStats().TimeoutErrors)
 | 
						|
			p("Relaxed timeouts: %d, Unrelaxed timeouts: %d", trackerAnalysis.RelaxedTimeoutCount, trackerAnalysis.UnrelaxedTimeoutCount)
 | 
						|
		})
 | 
						|
 | 
						|
		// Clear logs between timeout configuration tests
 | 
						|
		logCollector.ClearLogs()
 | 
						|
	}
 | 
						|
 | 
						|
	p("All timeout configurations tested successfully")
 | 
						|
}
 |