From 21bd243bf51b9afe625bfb0f6aa1e969e219cbc2 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Fri, 24 Oct 2025 15:05:54 +0300 Subject: [PATCH] improve reauth state management. fix tests --- internal/auth/streaming/pool_hook.go | 52 ++++++++++------------------ maintnotifications/handoff_worker.go | 6 ---- maintnotifications/pool_hook_test.go | 12 +++---- 3 files changed, 24 insertions(+), 46 deletions(-) diff --git a/internal/auth/streaming/pool_hook.go b/internal/auth/streaming/pool_hook.go index f9b9dd2c..a5647be0 100644 --- a/internal/auth/streaming/pool_hook.go +++ b/internal/auth/streaming/pool_hook.go @@ -179,40 +179,27 @@ func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool, r.workers <- struct{}{} }() - var err error - timeout := time.After(r.reAuthTimeout) + // Create timeout context for connection acquisition + // This prevents indefinite waiting if the connection is stuck + ctx, cancel := context.WithTimeout(context.Background(), r.reAuthTimeout) + defer cancel() // Try to acquire the connection for re-authentication // We need to ensure the connection is IDLE (not IN_USE) before transitioning to UNUSABLE // This prevents re-authentication from interfering with active commands - const baseDelay = 10 * time.Microsecond - acquired := false - attempt := 0 - for !acquired { - select { - case <-timeout: - // Timeout occurred, cannot acquire connection - err = pool.ErrConnUnusableTimeout - reAuthFn(err) - return - default: - // Try to atomically transition from IDLE to UNUSABLE - // This ensures we only acquire connections that are not actively in use - stateMachine := conn.GetStateMachine() - if stateMachine != nil { - _, err := stateMachine.TryTransition([]pool.ConnState{pool.StateIdle}, pool.StateUnusable) - if err == nil { - // Successfully acquired: connection was IDLE, now UNUSABLE - acquired = true - } - } - if !acquired { - // Exponential backoff: 10, 20, 40, 80... up to 5120 microseconds - delay := baseDelay * time.Duration(1< 0 // if shouldHandoff is false and retries is 0, then we are not retrying and not do a handoff if !shouldHandoff && conn.HandoffRetries() == 0 { diff --git a/maintnotifications/pool_hook_test.go b/maintnotifications/pool_hook_test.go index c21f4221..c94bd67d 100644 --- a/maintnotifications/pool_hook_test.go +++ b/maintnotifications/pool_hook_test.go @@ -391,8 +391,8 @@ func TestConnectionHook(t *testing.T) { ctx := context.Background() acceptCon, err := processor.OnGet(ctx, conn, false) - if err != ErrConnectionMarkedForHandoff { - t.Errorf("Expected ErrConnectionMarkedForHandoff, got %v", err) + if err != ErrConnectionMarkedForHandoffWithState { + t.Errorf("Expected ErrConnectionMarkedForHandoffWithState, got %v", err) } if acceptCon { t.Error("Connection should not be accepted when marked for handoff") @@ -425,8 +425,8 @@ func TestConnectionHook(t *testing.T) { // Test OnGet with pending handoff ctx := context.Background() acceptCon, err := processor.OnGet(ctx, conn, false) - if err != ErrConnectionMarkedForHandoff { - t.Error("Should return ErrConnectionMarkedForHandoff for pending connection") + if err != ErrConnectionMarkedForHandoffWithState { + t.Errorf("Should return ErrConnectionMarkedForHandoffWithState for pending connection, got %v", err) } if acceptCon { t.Error("Should not accept connection with pending handoff") @@ -678,8 +678,8 @@ func TestConnectionHook(t *testing.T) { if err == nil { t.Error("OnGet should fail for connection marked for handoff") } - if err != ErrConnectionMarkedForHandoff { - t.Errorf("Expected ErrConnectionMarkedForHandoff, got %v", err) + if err != ErrConnectionMarkedForHandoffWithState { + t.Errorf("Expected ErrConnectionMarkedForHandoffWithState, got %v", err) } if acceptConn { t.Error("Connection should not be accepted when marked for handoff")