1
0
mirror of https://github.com/redis/go-redis.git synced 2025-11-04 02:33:24 +03:00
Files
go-redis/maintnotifications/e2e/scenario_timeout_configs_test.go
Hristo Temelski 1e6ee06740 test(e2e): testing framework upgrade (#3541)
* update e2e test, change script

* update script and tests

* fixed bdbid parsing

* disabled majority of tests, swapped event order

* change the config tag

* revert test order

* fix typo

* reenable all e2e tests

* change the clonfig flag key for all e2e tests

* improve logging for debug purposes of tests

* longer deadline for FI in CI

* increase waiting for notifications

* extend tests

* dont fail on flaky third client

* fi new params

* fix test build

* more time for migrating

* first wait for FI action, then assert notification

* fix test build

* fix tests

* fix tests

* change output

* global print logs for tests

* better output

* fix error format

* maybe the notification is already received

* second and third client fix

* print output if failed

* better second and third client checks

* output action data if notification is not received

* stop command runner

* database create / delete actions

* database create / delete actions used in tests

* fix import

* remove example

* remove unused var

* use different port than the one in env

* wait for action to get the response

* fix output

* fix create db config

* fix create db config

* use new database for client

* fix create db config

* db per scenario

* less logs, correct check

* Add CTRF to the scenario tests (#3545)

* add some json ctrf improvements

* fix -v

* attempt to separate the output

---------

Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com>

---------

Co-authored-by: Nedyalko Dyakov <nedyalko.dyakov@gmail.com>
Co-authored-by: kiryazovi-redis <ivaylo.kiryazov@redis.com>
Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com>
2025-10-17 17:23:10 +03:00

358 lines
13 KiB
Go

package e2e
import (
"context"
"fmt"
"os"
"strings"
"testing"
"time"
logs2 "github.com/redis/go-redis/v9/internal/maintnotifications/logs"
"github.com/redis/go-redis/v9/logging"
"github.com/redis/go-redis/v9/maintnotifications"
)
// TestTimeoutConfigurationsPushNotifications tests push notifications with different timeout configurations
func TestTimeoutConfigurationsPushNotifications(t *testing.T) {
if os.Getenv("E2E_SCENARIO_TESTS") != "true" {
t.Skip("Scenario tests require E2E_SCENARIO_TESTS=true")
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
var dump = true
var errorsDetected = false
var p = func(format string, args ...interface{}) {
printLog("TIMEOUT-CONFIGS", false, format, args...)
}
var e = func(format string, args ...interface{}) {
errorsDetected = true
printLog("TIMEOUT-CONFIGS", true, format, args...)
}
// Test different timeout configurations
timeoutConfigs := []struct {
name string
handoffTimeout time.Duration
relaxedTimeout time.Duration
postHandoffRelaxedDuration time.Duration
description string
expectedBehavior string
}{
{
name: "Conservative",
handoffTimeout: 60 * time.Second,
relaxedTimeout: 30 * time.Second,
postHandoffRelaxedDuration: 2 * time.Minute,
description: "Conservative timeouts for stable environments",
expectedBehavior: "Longer timeouts, fewer timeout errors",
},
{
name: "Aggressive",
handoffTimeout: 5 * time.Second,
relaxedTimeout: 3 * time.Second,
postHandoffRelaxedDuration: 1 * time.Second,
description: "Aggressive timeouts for fast failover",
expectedBehavior: "Shorter timeouts, faster recovery",
},
{
name: "HighLatency",
handoffTimeout: 90 * time.Second,
relaxedTimeout: 30 * time.Second,
postHandoffRelaxedDuration: 10 * time.Minute,
description: "High latency environment timeouts",
expectedBehavior: "Very long timeouts for high latency networks",
},
}
logCollector.ClearLogs()
defer func() {
logCollector.Clear()
}()
// Test each timeout configuration with its own fresh database
for _, timeoutTest := range timeoutConfigs {
t.Run(timeoutTest.name, func(t *testing.T) {
// Setup: Create fresh database and client factory for THIS timeout config test
bdbID, factory, cleanup := SetupTestDatabaseAndFactory(t, ctx, "standalone")
defer cleanup()
t.Logf("[TIMEOUT-CONFIGS-%s] Created test database with bdb_id: %d", timeoutTest.name, bdbID)
// Get endpoint config from factory (now connected to new database)
endpointConfig := factory.GetConfig()
// Create fault injector
faultInjector, err := CreateTestFaultInjector()
if err != nil {
t.Fatalf("[ERROR] Failed to create fault injector: %v", err)
}
defer func() {
if dump {
p("Pool stats:")
factory.PrintPoolStats(t)
}
}()
errorsDetected = false
var ef = func(format string, args ...interface{}) {
printLog("TIMEOUT-CONFIGS", true, format, args...)
t.FailNow()
}
p("Testing timeout configuration: %s - %s", timeoutTest.name, timeoutTest.description)
p("Expected behavior: %s", timeoutTest.expectedBehavior)
p("Handoff timeout: %v, Relaxed timeout: %v, Post-handoff duration: %v",
timeoutTest.handoffTimeout, timeoutTest.relaxedTimeout, timeoutTest.postHandoffRelaxedDuration)
minIdleConns := 4
poolSize := 10
maxConnections := 15
// Create Redis client with specific timeout configuration
client, err := factory.Create(fmt.Sprintf("timeout-test-%s", timeoutTest.name), &CreateClientOptions{
Protocol: 3, // RESP3 required for push notifications
PoolSize: poolSize,
MinIdleConns: minIdleConns,
MaxActiveConns: maxConnections,
MaintNotificationsConfig: &maintnotifications.Config{
Mode: maintnotifications.ModeEnabled,
HandoffTimeout: timeoutTest.handoffTimeout,
RelaxedTimeout: timeoutTest.relaxedTimeout,
PostHandoffRelaxedDuration: timeoutTest.postHandoffRelaxedDuration,
MaxWorkers: 20,
EndpointType: maintnotifications.EndpointTypeExternalIP,
},
ClientName: fmt.Sprintf("timeout-test-%s", timeoutTest.name),
})
if err != nil {
ef("Failed to create client for %s: %v", timeoutTest.name, err)
}
// Create timeout tracker
tracker := NewTrackingNotificationsHook()
logger := maintnotifications.NewLoggingHook(int(logging.LogLevelDebug))
setupNotificationHooks(client, tracker, logger)
defer func() {
tracker.Clear()
}()
// Verify initial connectivity
err = client.Ping(ctx).Err()
if err != nil {
ef("Failed to ping Redis with %s timeout config: %v", timeoutTest.name, err)
}
p("Client connected successfully with %s timeout configuration", timeoutTest.name)
commandsRunner, _ := NewCommandRunner(client)
defer func() {
if dump {
stats := commandsRunner.GetStats()
p("%s timeout config stats: Operations: %d, Errors: %d, Timeout Errors: %d",
timeoutTest.name, stats.Operations, stats.Errors, stats.TimeoutErrors)
}
commandsRunner.Stop()
}()
// Start command traffic
go func() {
commandsRunner.FireCommandsUntilStop(ctx)
}()
// Record start time for timeout analysis
testStartTime := time.Now()
// Test failover with this timeout configuration
p("Testing failover with %s timeout configuration...", timeoutTest.name)
failoverResp, err := faultInjector.TriggerAction(ctx, ActionRequest{
Type: "failover",
Parameters: map[string]interface{}{
"bdb_id": endpointConfig.BdbID,
},
})
if err != nil {
ef("Failed to trigger failover action for %s: %v", timeoutTest.name, err)
}
// Wait for FAILING_OVER notification
match, found := logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool {
return strings.Contains(s, logs2.ProcessingNotificationMessage) && notificationType(s, "FAILING_OVER")
}, 3*time.Minute)
if !found {
ef("FAILING_OVER notification was not received for %s timeout config", timeoutTest.name)
}
failingOverData := logs2.ExtractDataFromLogMessage(match)
p("FAILING_OVER notification received for %s. %v", timeoutTest.name, failingOverData)
// Wait for FAILED_OVER notification
seqIDToObserve := int64(failingOverData["seqID"].(float64))
connIDToObserve := uint64(failingOverData["connID"].(float64))
match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool {
return notificationType(s, "FAILED_OVER") && connID(s, connIDToObserve) && seqID(s, seqIDToObserve+1)
}, 3*time.Minute)
if !found {
ef("FAILED_OVER notification was not received for %s timeout config", timeoutTest.name)
}
failedOverData := logs2.ExtractDataFromLogMessage(match)
p("FAILED_OVER notification received for %s. %v", timeoutTest.name, failedOverData)
// Wait for failover to complete
status, err := faultInjector.WaitForAction(ctx, failoverResp.ActionID,
WithMaxWaitTime(180*time.Second),
WithPollInterval(2*time.Second),
)
if err != nil {
ef("[FI] Failover action failed for %s: %v", timeoutTest.name, err)
}
p("[FI] Failover action completed for %s: %s %s", timeoutTest.name, status.Status, actionOutputIfFailed(status))
// Continue traffic to observe timeout behavior
p("Continuing traffic for %v to observe timeout behavior...", timeoutTest.relaxedTimeout*2)
time.Sleep(timeoutTest.relaxedTimeout * 2)
// Test migration to trigger more timeout scenarios
p("Testing migration with %s timeout configuration...", timeoutTest.name)
migrateResp, err := faultInjector.TriggerAction(ctx, ActionRequest{
Type: "migrate",
Parameters: map[string]interface{}{
"bdb_id": endpointConfig.BdbID,
},
})
if err != nil {
ef("Failed to trigger migrate action for %s: %v", timeoutTest.name, err)
}
// Wait for migration to complete
status, err = faultInjector.WaitForAction(ctx, migrateResp.ActionID,
WithMaxWaitTime(240*time.Second),
WithPollInterval(2*time.Second),
)
if err != nil {
ef("[FI] Migrate action failed for %s: %v", timeoutTest.name, err)
}
p("[FI] Migrate action completed for %s: %s %s", timeoutTest.name, status.Status, actionOutputIfFailed(status))
// Wait for MIGRATING notification
match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool {
return strings.Contains(s, logs2.ProcessingNotificationMessage) && strings.Contains(s, "MIGRATING")
}, 60*time.Second)
if !found {
ef("MIGRATING notification was not received for %s timeout config", timeoutTest.name)
}
migrateData := logs2.ExtractDataFromLogMessage(match)
p("MIGRATING notification received for %s: %v", timeoutTest.name, migrateData)
// do a bind action
bindResp, err := faultInjector.TriggerAction(ctx, ActionRequest{
Type: "bind",
Parameters: map[string]interface{}{
"bdb_id": endpointConfig.BdbID,
},
})
if err != nil {
ef("Failed to trigger bind action for %s: %v", timeoutTest.name, err)
}
status, err = faultInjector.WaitForAction(ctx, bindResp.ActionID,
WithMaxWaitTime(240*time.Second),
WithPollInterval(2*time.Second),
)
if err != nil {
ef("[FI] Bind action failed for %s: %v", timeoutTest.name, err)
}
p("[FI] Bind action completed for %s: %s %s", timeoutTest.name, status.Status, actionOutputIfFailed(status))
// waiting for moving notification
match, found = logCollector.MatchOrWaitForLogMatchFunc(func(s string) bool {
return strings.Contains(s, logs2.ProcessingNotificationMessage) && notificationType(s, "MOVING")
}, 3*time.Minute)
if !found {
ef("MOVING notification was not received for %s timeout config", timeoutTest.name)
}
movingData := logs2.ExtractDataFromLogMessage(match)
p("MOVING notification received for %s. %v", timeoutTest.name, movingData)
// Continue traffic for post-handoff timeout observation
p("Continuing traffic for %v to observe post-handoff timeout behavior...", 1*time.Minute)
time.Sleep(1 * time.Minute)
commandsRunner.Stop()
testDuration := time.Since(testStartTime)
// Analyze timeout behavior
trackerAnalysis := tracker.GetAnalysis()
logAnalysis := logCollector.GetAnalysis()
if trackerAnalysis.NotificationProcessingErrors > 0 {
e("Notification processing errors with %s timeout config: %d", timeoutTest.name, trackerAnalysis.NotificationProcessingErrors)
}
// Validate timeout-specific behavior
switch timeoutTest.name {
case "Conservative":
if trackerAnalysis.UnrelaxedTimeoutCount > trackerAnalysis.RelaxedTimeoutCount {
e("Conservative config should have more relaxed than unrelaxed timeouts, got relaxed=%d, unrelaxed=%d",
trackerAnalysis.RelaxedTimeoutCount, trackerAnalysis.UnrelaxedTimeoutCount)
}
case "Aggressive":
// Aggressive timeouts should complete faster
if testDuration > 5*time.Minute {
e("Aggressive config took too long: %v", testDuration)
}
if logAnalysis.TotalHandoffRetries > logAnalysis.TotalHandoffCount {
e("Expect handoff retries since aggressive timeouts are shorter, got %d retries for %d handoffs",
logAnalysis.TotalHandoffRetries, logAnalysis.TotalHandoffCount)
}
case "HighLatency":
// High latency config should have very few unrelaxed after moving
if logAnalysis.UnrelaxedAfterMoving > 2 {
e("High latency config should have minimal unrelaxed timeouts after moving, got %d", logAnalysis.UnrelaxedAfterMoving)
}
}
// Validate we received expected notifications
if trackerAnalysis.FailingOverCount == 0 {
e("Expected FAILING_OVER notifications with %s timeout config, got none", timeoutTest.name)
}
if trackerAnalysis.FailedOverCount == 0 {
e("Expected FAILED_OVER notifications with %s timeout config, got none", timeoutTest.name)
}
if trackerAnalysis.MigratingCount == 0 {
e("Expected MIGRATING notifications with %s timeout config, got none", timeoutTest.name)
}
// Validate timeout counts are reasonable
if trackerAnalysis.RelaxedTimeoutCount == 0 {
e("Expected relaxed timeouts with %s config, got none", timeoutTest.name)
}
if logAnalysis.SucceededHandoffCount == 0 {
e("Expected successful handoffs with %s config, got none", timeoutTest.name)
}
if errorsDetected {
logCollector.DumpLogs()
trackerAnalysis.Print(t)
logCollector.Clear()
tracker.Clear()
ef("[FAIL] Errors detected with %s timeout config", timeoutTest.name)
}
p("Timeout configuration %s test completed successfully in %v", timeoutTest.name, testDuration)
p("Command runner stats:")
p("Operations: %d, Errors: %d, Timeout Errors: %d",
commandsRunner.GetStats().Operations, commandsRunner.GetStats().Errors, commandsRunner.GetStats().TimeoutErrors)
p("Relaxed timeouts: %d, Unrelaxed timeouts: %d", trackerAnalysis.RelaxedTimeoutCount, trackerAnalysis.UnrelaxedTimeoutCount)
})
// Clear logs between timeout configuration tests
logCollector.ClearLogs()
}
p("All timeout configurations tested successfully")
}