diff --git a/internal/auth/streaming/manager.go b/internal/auth/streaming/manager.go index 375bf994..36c3238c 100644 --- a/internal/auth/streaming/manager.go +++ b/internal/auth/streaming/manager.go @@ -39,10 +39,11 @@ func (m *Manager) Listener( connID := poolCn.GetID() // if we reconnect the underlying network connection, the streaming credentials listener will continue to work // so we can get the old listener from the cache and use it. - // subscribing the same (an already subscribed) listener for a StreamingCredentialsProvider SHOULD be a no-op listener, ok := m.credentialsListeners.Get(connID) if !ok || listener == nil { + // Create new listener for this connection + // Note: Callbacks (reAuth, onErr) are captured once and reused for the connection's lifetime newCredListener := &ConnReAuthCredentialsListener{ conn: poolCn, reAuth: reAuth, diff --git a/internal/auth/streaming/pool_hook.go b/internal/auth/streaming/pool_hook.go index 50ce0a0c..69c35ea8 100644 --- a/internal/auth/streaming/pool_hook.go +++ b/internal/auth/streaming/pool_hook.go @@ -79,6 +79,8 @@ func (r *ReAuthPoolHook) OnPut(_ context.Context, conn *pool.Conn) (bool, bool, r.shouldReAuthLock.RUnlock() if ok { + // Acquire both locks to atomically move from shouldReAuth to scheduledReAuth + // This prevents race conditions where OnGet might miss the transition r.shouldReAuthLock.Lock() r.scheduledLock.Lock() r.scheduledReAuth[connID] = true diff --git a/internal/pool/pool_single.go b/internal/pool/pool_single.go index 395fb49e..d9618f88 100644 --- a/internal/pool/pool_single.go +++ b/internal/pool/pool_single.go @@ -5,6 +5,9 @@ import ( "time" ) +// SingleConnPool is a pool that always returns the same connection. +// Note: This pool is not thread-safe. +// It is intended to be used by clients that need a single connection. type SingleConnPool struct { pool Pooler cn *Conn @@ -13,6 +16,12 @@ type SingleConnPool struct { var _ Pooler = (*SingleConnPool)(nil) +// NewSingleConnPool creates a new single connection pool. +// The pool will always return the same connection. +// The pool will not: +// - Close the connection +// - Reconnect the connection +// - Track the connection in any way func NewSingleConnPool(pool Pooler, cn *Conn) *SingleConnPool { return &SingleConnPool{ pool: pool,