mirror of
https://github.com/redis/go-redis.git
synced 2025-09-10 07:11:50 +03:00
[CAE-1072] Hitless Upgrades (#3447)
* feat(hitless): Introduce handlers for hitless upgrades This commit includes all the work on hitless upgrades with the addition of: - Pubsub Pool - Examples - Refactor of push - Refactor of pool (using atomics for most things) - Introducing of hooks in pool --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
This commit is contained in:
234
redis.go
234
redis.go
@@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/redis/go-redis/v9/auth"
|
||||
"github.com/redis/go-redis/v9/hitless"
|
||||
"github.com/redis/go-redis/v9/internal"
|
||||
"github.com/redis/go-redis/v9/internal/hscan"
|
||||
"github.com/redis/go-redis/v9/internal/pool"
|
||||
@@ -204,19 +205,35 @@ func (hs *hooksMixin) processTxPipelineHook(ctx context.Context, cmds []Cmder) e
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
type baseClient struct {
|
||||
opt *Options
|
||||
connPool pool.Pooler
|
||||
opt *Options
|
||||
optLock sync.RWMutex
|
||||
connPool pool.Pooler
|
||||
pubSubPool *pool.PubSubPool
|
||||
hooksMixin
|
||||
|
||||
onClose func() error // hook called when client is closed
|
||||
|
||||
// Push notification processing
|
||||
pushProcessor push.NotificationProcessor
|
||||
|
||||
// Hitless upgrade manager
|
||||
hitlessManager *hitless.HitlessManager
|
||||
hitlessManagerLock sync.RWMutex
|
||||
}
|
||||
|
||||
func (c *baseClient) clone() *baseClient {
|
||||
clone := *c
|
||||
return &clone
|
||||
c.hitlessManagerLock.RLock()
|
||||
hitlessManager := c.hitlessManager
|
||||
c.hitlessManagerLock.RUnlock()
|
||||
|
||||
clone := &baseClient{
|
||||
opt: c.opt,
|
||||
connPool: c.connPool,
|
||||
onClose: c.onClose,
|
||||
pushProcessor: c.pushProcessor,
|
||||
hitlessManager: hitlessManager,
|
||||
}
|
||||
return clone
|
||||
}
|
||||
|
||||
func (c *baseClient) withTimeout(timeout time.Duration) *baseClient {
|
||||
@@ -234,21 +251,6 @@ func (c *baseClient) String() string {
|
||||
return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
|
||||
}
|
||||
|
||||
func (c *baseClient) newConn(ctx context.Context) (*pool.Conn, error) {
|
||||
cn, err := c.connPool.NewConn(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = c.initConn(ctx, cn)
|
||||
if err != nil {
|
||||
_ = c.connPool.CloseConn(cn)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cn, nil
|
||||
}
|
||||
|
||||
func (c *baseClient) getConn(ctx context.Context) (*pool.Conn, error) {
|
||||
if c.opt.Limiter != nil {
|
||||
err := c.opt.Limiter.Allow()
|
||||
@@ -274,7 +276,7 @@ func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if cn.Inited {
|
||||
if cn.IsInited() {
|
||||
return cn, nil
|
||||
}
|
||||
|
||||
@@ -356,12 +358,10 @@ func (c *baseClient) wrappedOnClose(newOnClose func() error) func() error {
|
||||
}
|
||||
|
||||
func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
|
||||
if cn.Inited {
|
||||
if !cn.Inited.CompareAndSwap(false, true) {
|
||||
return nil
|
||||
}
|
||||
|
||||
var err error
|
||||
cn.Inited = true
|
||||
connPool := pool.NewSingleConnPool(c.connPool, cn)
|
||||
conn := newConn(c.opt, connPool, &c.hooksMixin)
|
||||
|
||||
@@ -430,6 +430,51 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
|
||||
return fmt.Errorf("failed to initialize connection options: %w", err)
|
||||
}
|
||||
|
||||
// Enable maintenance notifications if hitless upgrades are configured
|
||||
c.optLock.RLock()
|
||||
hitlessEnabled := c.opt.HitlessUpgradeConfig != nil && c.opt.HitlessUpgradeConfig.Mode != hitless.MaintNotificationsDisabled
|
||||
protocol := c.opt.Protocol
|
||||
endpointType := c.opt.HitlessUpgradeConfig.EndpointType
|
||||
c.optLock.RUnlock()
|
||||
var hitlessHandshakeErr error
|
||||
if hitlessEnabled && protocol == 3 {
|
||||
hitlessHandshakeErr = conn.ClientMaintNotifications(
|
||||
ctx,
|
||||
true,
|
||||
endpointType.String(),
|
||||
).Err()
|
||||
if hitlessHandshakeErr != nil {
|
||||
if !isRedisError(hitlessHandshakeErr) {
|
||||
// if not redis error, fail the connection
|
||||
return hitlessHandshakeErr
|
||||
}
|
||||
c.optLock.Lock()
|
||||
// handshake failed - check and modify config atomically
|
||||
switch c.opt.HitlessUpgradeConfig.Mode {
|
||||
case hitless.MaintNotificationsEnabled:
|
||||
// enabled mode, fail the connection
|
||||
c.optLock.Unlock()
|
||||
return fmt.Errorf("failed to enable maintenance notifications: %w", hitlessHandshakeErr)
|
||||
default: // will handle auto and any other
|
||||
internal.Logger.Printf(ctx, "hitless: auto mode fallback: hitless upgrades disabled due to handshake error: %v", hitlessHandshakeErr)
|
||||
c.opt.HitlessUpgradeConfig.Mode = hitless.MaintNotificationsDisabled
|
||||
c.optLock.Unlock()
|
||||
// auto mode, disable hitless upgrades and continue
|
||||
if err := c.disableHitlessUpgrades(); err != nil {
|
||||
// Log error but continue - auto mode should be resilient
|
||||
internal.Logger.Printf(ctx, "hitless: failed to disable hitless upgrades in auto mode: %v", err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// handshake was executed successfully
|
||||
// to make sure that the handshake will be executed on other connections as well if it was successfully
|
||||
// executed on this connection, we will force the handshake to be executed on all connections
|
||||
c.optLock.Lock()
|
||||
c.opt.HitlessUpgradeConfig.Mode = hitless.MaintNotificationsEnabled
|
||||
c.optLock.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
if !c.opt.DisableIdentity && !c.opt.DisableIndentity {
|
||||
libName := ""
|
||||
libVer := Version()
|
||||
@@ -446,6 +491,12 @@ func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
|
||||
}
|
||||
}
|
||||
|
||||
cn.SetUsable(true)
|
||||
cn.Inited.Store(true)
|
||||
|
||||
// Set the connection initialization function for potential reconnections
|
||||
cn.SetInitConnFunc(c.createInitConnFunc())
|
||||
|
||||
if c.opt.OnConnect != nil {
|
||||
return c.opt.OnConnect(ctx, conn)
|
||||
}
|
||||
@@ -512,6 +563,8 @@ func (c *baseClient) assertUnstableCommand(cmd Cmder) bool {
|
||||
if c.opt.UnstableResp3 {
|
||||
return true
|
||||
} else {
|
||||
// TODO: find the best way to remove the panic and return error here
|
||||
// The client should not panic when executing a command, only when initializing.
|
||||
panic("RESP3 responses for this command are disabled because they may still change. Please set the flag UnstableResp3 . See the [README](https://github.com/redis/go-redis/blob/master/README.md) and the release notes for guidance.")
|
||||
}
|
||||
default:
|
||||
@@ -593,19 +646,76 @@ func (c *baseClient) context(ctx context.Context) context.Context {
|
||||
return context.Background()
|
||||
}
|
||||
|
||||
// createInitConnFunc creates a connection initialization function that can be used for reconnections.
|
||||
func (c *baseClient) createInitConnFunc() func(context.Context, *pool.Conn) error {
|
||||
return func(ctx context.Context, cn *pool.Conn) error {
|
||||
return c.initConn(ctx, cn)
|
||||
}
|
||||
}
|
||||
|
||||
// enableHitlessUpgrades initializes the hitless upgrade manager and pool hook.
|
||||
// This function is called during client initialization.
|
||||
// will register push notification handlers for all hitless upgrade events.
|
||||
// will start background workers for handoff processing in the pool hook.
|
||||
func (c *baseClient) enableHitlessUpgrades() error {
|
||||
// Create client adapter
|
||||
clientAdapterInstance := newClientAdapter(c)
|
||||
|
||||
// Create hitless manager directly
|
||||
manager, err := hitless.NewHitlessManager(clientAdapterInstance, c.connPool, c.opt.HitlessUpgradeConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Set the manager reference and initialize pool hook
|
||||
c.hitlessManagerLock.Lock()
|
||||
c.hitlessManager = manager
|
||||
c.hitlessManagerLock.Unlock()
|
||||
|
||||
// Initialize pool hook (safe to call without lock since manager is now set)
|
||||
manager.InitPoolHook(c.dialHook)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *baseClient) disableHitlessUpgrades() error {
|
||||
c.hitlessManagerLock.Lock()
|
||||
defer c.hitlessManagerLock.Unlock()
|
||||
|
||||
// Close the hitless manager
|
||||
if c.hitlessManager != nil {
|
||||
// Closing the manager will also shutdown the pool hook
|
||||
// and remove it from the pool
|
||||
c.hitlessManager.Close()
|
||||
c.hitlessManager = nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the client, releasing any open resources.
|
||||
//
|
||||
// It is rare to Close a Client, as the Client is meant to be
|
||||
// long-lived and shared between many goroutines.
|
||||
func (c *baseClient) Close() error {
|
||||
var firstErr error
|
||||
|
||||
// Close hitless manager first
|
||||
if err := c.disableHitlessUpgrades(); err != nil {
|
||||
firstErr = err
|
||||
}
|
||||
|
||||
if c.onClose != nil {
|
||||
if err := c.onClose(); err != nil {
|
||||
if err := c.onClose(); err != nil && firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
if err := c.connPool.Close(); err != nil && firstErr == nil {
|
||||
firstErr = err
|
||||
if c.connPool != nil {
|
||||
if err := c.connPool.Close(); err != nil && firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
if c.pubSubPool != nil {
|
||||
if err := c.pubSubPool.Close(); err != nil && firstErr == nil {
|
||||
firstErr = err
|
||||
}
|
||||
}
|
||||
return firstErr
|
||||
}
|
||||
@@ -796,6 +906,8 @@ func NewClient(opt *Options) *Client {
|
||||
if opt == nil {
|
||||
panic("redis: NewClient nil options")
|
||||
}
|
||||
// clone to not share options with the caller
|
||||
opt = opt.clone()
|
||||
opt.init()
|
||||
|
||||
// Push notifications are always enabled for RESP3 (cannot be disabled)
|
||||
@@ -810,11 +922,40 @@ func NewClient(opt *Options) *Client {
|
||||
// Initialize push notification processor using shared helper
|
||||
// Use void processor for RESP2 connections (push notifications not available)
|
||||
c.pushProcessor = initializePushProcessor(opt)
|
||||
// set opt push processor for child clients
|
||||
c.opt.PushNotificationProcessor = c.pushProcessor
|
||||
|
||||
// Update options with the initialized push processor for connection pool
|
||||
opt.PushNotificationProcessor = c.pushProcessor
|
||||
// Create connection pools
|
||||
var err error
|
||||
c.connPool, err = newConnPool(opt, c.dialHook)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("redis: failed to create connection pool: %w", err))
|
||||
}
|
||||
c.pubSubPool, err = newPubSubPool(opt, c.dialHook)
|
||||
if err != nil {
|
||||
panic(fmt.Errorf("redis: failed to create pubsub pool: %w", err))
|
||||
}
|
||||
|
||||
c.connPool = newConnPool(opt, c.dialHook)
|
||||
// Initialize hitless upgrades first if enabled and protocol is RESP3
|
||||
if opt.HitlessUpgradeConfig != nil && opt.HitlessUpgradeConfig.Mode != hitless.MaintNotificationsDisabled && opt.Protocol == 3 {
|
||||
err := c.enableHitlessUpgrades()
|
||||
if err != nil {
|
||||
internal.Logger.Printf(context.Background(), "hitless: failed to initialize hitless upgrades: %v", err)
|
||||
if opt.HitlessUpgradeConfig.Mode == hitless.MaintNotificationsEnabled {
|
||||
/*
|
||||
Design decision: panic here to fail fast if hitless upgrades cannot be enabled when explicitly requested.
|
||||
We choose to panic instead of returning an error to avoid breaking the existing client API, which does not expect
|
||||
an error from NewClient. This ensures that misconfiguration or critical initialization failures are surfaced
|
||||
immediately, rather than allowing the client to continue in a partially initialized or inconsistent state.
|
||||
Clients relying on hitless upgrades should be aware that initialization errors will cause a panic, and should
|
||||
handle this accordingly (e.g., via recover or by validating configuration before calling NewClient).
|
||||
This approach is only used when HitlessUpgradeConfig.Mode is MaintNotificationsEnabled, indicating that hitless
|
||||
upgrades are required for correct operation. In other modes, initialization failures are logged but do not panic.
|
||||
*/
|
||||
panic(fmt.Errorf("failed to enable hitless upgrades: %w", err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return &c
|
||||
}
|
||||
@@ -851,6 +992,14 @@ func (c *Client) Options() *Options {
|
||||
return c.opt
|
||||
}
|
||||
|
||||
// GetHitlessManager returns the hitless manager instance for monitoring and control.
|
||||
// Returns nil if hitless upgrades are not enabled.
|
||||
func (c *Client) GetHitlessManager() *hitless.HitlessManager {
|
||||
c.hitlessManagerLock.RLock()
|
||||
defer c.hitlessManagerLock.RUnlock()
|
||||
return c.hitlessManager
|
||||
}
|
||||
|
||||
// initializePushProcessor initializes the push notification processor for any client type.
|
||||
// This is a shared helper to avoid duplication across NewClient, NewFailoverClient, and NewSentinelClient.
|
||||
func initializePushProcessor(opt *Options) push.NotificationProcessor {
|
||||
@@ -887,6 +1036,7 @@ type PoolStats pool.Stats
|
||||
// PoolStats returns connection pool stats.
|
||||
func (c *Client) PoolStats() *PoolStats {
|
||||
stats := c.connPool.Stats()
|
||||
stats.PubSubStats = *(c.pubSubPool.Stats())
|
||||
return (*PoolStats)(stats)
|
||||
}
|
||||
|
||||
@@ -921,11 +1071,27 @@ func (c *Client) TxPipeline() Pipeliner {
|
||||
func (c *Client) pubSub() *PubSub {
|
||||
pubsub := &PubSub{
|
||||
opt: c.opt,
|
||||
|
||||
newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
|
||||
return c.newConn(ctx)
|
||||
newConn: func(ctx context.Context, addr string, channels []string) (*pool.Conn, error) {
|
||||
cn, err := c.pubSubPool.NewConn(ctx, c.opt.Network, addr, channels)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// will return nil if already initialized
|
||||
err = c.initConn(ctx, cn)
|
||||
if err != nil {
|
||||
_ = cn.Close()
|
||||
return nil, err
|
||||
}
|
||||
// Track connection in PubSubPool
|
||||
c.pubSubPool.TrackConn(cn)
|
||||
return cn, nil
|
||||
},
|
||||
closeConn: func(cn *pool.Conn) error {
|
||||
// Untrack connection from PubSubPool
|
||||
c.pubSubPool.UntrackConn(cn)
|
||||
_ = cn.Close()
|
||||
return nil
|
||||
},
|
||||
closeConn: c.connPool.CloseConn,
|
||||
pushProcessor: c.pushProcessor,
|
||||
}
|
||||
pubsub.init()
|
||||
@@ -1113,6 +1279,6 @@ func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) push.Notifica
|
||||
return push.NotificationHandlerContext{
|
||||
Client: c,
|
||||
ConnPool: c.connPool,
|
||||
Conn: cn,
|
||||
Conn: cn, // Wrap in adapter for easier interface access
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user