mirror of
https://github.com/redis/go-redis.git
synced 2025-11-02 15:33:16 +03:00
* fix(pool): wip, pool reauth should not interfere with handoff * fix credListeners map * fix race in tests * better conn usable timeout * add design decision comment * few small improvements * update marked as queued * add Used to clarify the state of the conn * rename test * fix(test): fix flaky test * lock inside the listeners collection * address pr comments * Update internal/auth/cred_listeners.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update internal/pool/buffer_size_test.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * wip refactor entraid * fix maintnotif pool hook * fix mocks * fix nil listener * sync and async reauth based on conn lifecycle * be able to reject connection OnGet * pass hooks so the tests can observe reauth * give some time for the background to execute commands * fix tests * only async reauth * Update internal/pool/pool.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update internal/auth/streaming/pool_hook.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Update internal/pool/conn.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * chore(redisotel): use metric.WithAttributeSet to avoid copy (#3552) In order to improve performance replace `WithAttributes` with `WithAttributeSet`. This avoids the slice allocation and copy that is done in `WithAttributes`. For more information see https://github.com/open-telemetry/opentelemetry-go/blob/v1.38.0/metric/instrument.go#L357-L376 * chore(docs): explain why MaxRetries is disabled for ClusterClient (#3551) Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com> * exponential backoff * address pr comments * address pr comments * remove rlock * add some comments * add comments --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Warnar Boekkooi <wboekkooi@impossiblecloud.com> Co-authored-by: Justin <justindsouza80@gmail.com>
138 lines
4.9 KiB
Go
138 lines
4.9 KiB
Go
package streaming
|
|
|
|
import (
|
|
"errors"
|
|
"time"
|
|
|
|
"github.com/redis/go-redis/v9/auth"
|
|
"github.com/redis/go-redis/v9/internal/pool"
|
|
)
|
|
|
|
// Manager coordinates streaming credentials and re-authentication for a connection pool.
|
|
//
|
|
// The manager is responsible for:
|
|
// - Creating and managing per-connection credentials listeners
|
|
// - Providing the pool hook for re-authentication
|
|
// - Coordinating between credentials updates and pool operations
|
|
//
|
|
// When credentials change via a StreamingCredentialsProvider:
|
|
// 1. The credentials listener (ConnReAuthCredentialsListener) receives the update
|
|
// 2. It calls MarkForReAuth on the manager
|
|
// 3. The manager delegates to the pool hook
|
|
// 4. The pool hook schedules background re-authentication
|
|
//
|
|
// The manager maintains a registry of credentials listeners indexed by connection ID,
|
|
// allowing listener reuse when connections are reinitialized (e.g., after handoff).
|
|
type Manager struct {
|
|
// credentialsListeners maps connection ID to credentials listener
|
|
credentialsListeners *CredentialsListeners
|
|
|
|
// pool is the connection pool being managed
|
|
pool pool.Pooler
|
|
|
|
// poolHookRef is the re-authentication pool hook
|
|
poolHookRef *ReAuthPoolHook
|
|
}
|
|
|
|
// NewManager creates a new streaming credentials manager.
|
|
//
|
|
// Parameters:
|
|
// - pl: The connection pool to manage
|
|
// - reAuthTimeout: Maximum time to wait for acquiring a connection for re-authentication
|
|
//
|
|
// The manager creates a ReAuthPoolHook sized to match the pool size, ensuring that
|
|
// re-auth operations don't exhaust the connection pool.
|
|
func NewManager(pl pool.Pooler, reAuthTimeout time.Duration) *Manager {
|
|
m := &Manager{
|
|
pool: pl,
|
|
poolHookRef: NewReAuthPoolHook(pl.Size(), reAuthTimeout),
|
|
credentialsListeners: NewCredentialsListeners(),
|
|
}
|
|
m.poolHookRef.manager = m
|
|
return m
|
|
}
|
|
|
|
// PoolHook returns the pool hook for re-authentication.
|
|
//
|
|
// This hook should be registered with the connection pool to enable
|
|
// automatic re-authentication when credentials change.
|
|
func (m *Manager) PoolHook() pool.PoolHook {
|
|
return m.poolHookRef
|
|
}
|
|
|
|
// Listener returns or creates a credentials listener for a connection.
|
|
//
|
|
// This method is called during connection initialization to set up the
|
|
// credentials listener. If a listener already exists for the connection ID
|
|
// (e.g., after a handoff), it is reused.
|
|
//
|
|
// Parameters:
|
|
// - poolCn: The connection to create/get a listener for
|
|
// - reAuth: Function to re-authenticate the connection with new credentials
|
|
// - onErr: Function to call when re-authentication fails
|
|
//
|
|
// Returns:
|
|
// - auth.CredentialsListener: The listener to subscribe to the credentials provider
|
|
// - error: Non-nil if poolCn is nil
|
|
//
|
|
// Note: The reAuth and onErr callbacks are captured once when the listener is
|
|
// created and reused for the connection's lifetime. They should not change.
|
|
//
|
|
// Thread-safe: Can be called concurrently during connection initialization.
|
|
func (m *Manager) Listener(
|
|
poolCn *pool.Conn,
|
|
reAuth func(*pool.Conn, auth.Credentials) error,
|
|
onErr func(*pool.Conn, error),
|
|
) (auth.CredentialsListener, error) {
|
|
if poolCn == nil {
|
|
return nil, errors.New("poolCn cannot be nil")
|
|
}
|
|
connID := poolCn.GetID()
|
|
// if we reconnect the underlying network connection, the streaming credentials listener will continue to work
|
|
// so we can get the old listener from the cache and use it.
|
|
// subscribing the same (an already subscribed) listener for a StreamingCredentialsProvider SHOULD be a no-op
|
|
listener, ok := m.credentialsListeners.Get(connID)
|
|
if !ok || listener == nil {
|
|
// Create new listener for this connection
|
|
// Note: Callbacks (reAuth, onErr) are captured once and reused for the connection's lifetime
|
|
newCredListener := &ConnReAuthCredentialsListener{
|
|
conn: poolCn,
|
|
reAuth: reAuth,
|
|
onErr: onErr,
|
|
manager: m,
|
|
}
|
|
|
|
m.credentialsListeners.Add(connID, newCredListener)
|
|
listener = newCredListener
|
|
}
|
|
return listener, nil
|
|
}
|
|
|
|
// MarkForReAuth marks a connection for re-authentication.
|
|
//
|
|
// This method is called by the credentials listener when new credentials are
|
|
// received. It delegates to the pool hook to schedule background re-authentication.
|
|
//
|
|
// Parameters:
|
|
// - poolCn: The connection to re-authenticate
|
|
// - reAuthFn: Function to call for re-authentication, receives error if acquisition fails
|
|
//
|
|
// Thread-safe: Called by credentials listeners when credentials change.
|
|
func (m *Manager) MarkForReAuth(poolCn *pool.Conn, reAuthFn func(error)) {
|
|
connID := poolCn.GetID()
|
|
m.poolHookRef.MarkForReAuth(connID, reAuthFn)
|
|
}
|
|
|
|
// RemoveListener removes the credentials listener for a connection.
|
|
//
|
|
// This method is called by the pool hook's OnRemove to clean up listeners
|
|
// when connections are removed from the pool.
|
|
//
|
|
// Parameters:
|
|
// - connID: The connection ID whose listener should be removed
|
|
//
|
|
// Thread-safe: Called during connection removal.
|
|
func (m *Manager) RemoveListener(connID uint64) {
|
|
m.credentialsListeners.Remove(connID)
|
|
}
|