1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-18 00:20:57 +03:00
Files
go-redis/redis.go
Nedyalko Dyakov 1606de8b73 feat: implement strongly typed HandlerContext interface
Convert HandlerContext from struct to interface with strongly typed getters
for different client types. This provides better type safety and a cleaner
API for push notification handlers while maintaining flexibility.

Key Changes:

1. HandlerContext Interface Design:
   - Converted HandlerContext from struct to interface
   - Added strongly typed getters for different client types
   - GetClusterClient() returns ClusterClientInterface
   - GetSentinelClient() returns SentinelClientInterface
   - GetFailoverClient() returns FailoverClientInterface
   - GetRegularClient() returns RegularClientInterface
   - GetPubSub() returns PubSubInterface

2. Client Type Interfaces:
   - Defined ClusterClientInterface for cluster client access
   - Defined SentinelClientInterface for sentinel client access
   - Defined FailoverClientInterface for failover client access
   - Defined RegularClientInterface for regular client access
   - Defined PubSubInterface for pub/sub access
   - Each interface provides String() method for basic operations

3. Concrete Implementation:
   - Created handlerContext struct implementing HandlerContext interface
   - Added NewHandlerContext constructor function
   - Implemented type-safe getters with interface casting
   - Returns nil for incorrect client types (type safety)

4. Updated All Usage:
   - Updated Handler interface to use HandlerContext interface
   - Updated ProcessorInterface to use HandlerContext interface
   - Updated all processor implementations (Processor, VoidProcessor)
   - Updated all handler context creation sites
   - Updated test handlers and test context creation

5. Helper Methods:
   - Updated pushNotificationHandlerContext() in baseClient
   - Updated pushNotificationHandlerContext() in PubSub
   - Consistent context creation across all client types
   - Proper parameter passing for different connection types

6. Type Safety Benefits:
   - Handlers can safely cast to specific client types
   - Compile-time checking for client type access
   - Clear API for accessing different client capabilities
   - No runtime panics from incorrect type assertions

7. API Usage Example:
   ```go
   func (h *MyHandler) HandlePushNotification(
       ctx context.Context,
       handlerCtx HandlerContext,
       notification []interface{},
   ) bool {
       // Strongly typed access
       if clusterClient := handlerCtx.GetClusterClient(); clusterClient != nil {
           // Handle cluster-specific logic
       }
       if sentinelClient := handlerCtx.GetSentinelClient(); sentinelClient != nil {
           // Handle sentinel-specific logic
       }
       return true
   }
   ```

8. Backward Compatibility:
   - Interface maintains same functionality as original struct
   - All existing handler patterns continue to work
   - No breaking changes to handler implementations
   - Smooth migration path for existing code

Benefits:
- Strong type safety for client access in handlers
- Clear API with explicit client type getters
- Compile-time checking prevents runtime errors
- Flexible interface allows future extensions
- Better separation of concerns between client types
- Enhanced developer experience with IntelliSense support

This enhancement provides handlers with strongly typed access to different
Redis client types while maintaining the flexibility and context information
needed for sophisticated push notification handling, particularly important
for hitless upgrades and cluster management operations.
2025-07-04 19:53:19 +03:00

1145 lines
34 KiB
Go

package redis
import (
"context"
"errors"
"fmt"
"net"
"sync"
"sync/atomic"
"time"
"github.com/redis/go-redis/v9/auth"
"github.com/redis/go-redis/v9/internal"
"github.com/redis/go-redis/v9/internal/hscan"
"github.com/redis/go-redis/v9/internal/pool"
"github.com/redis/go-redis/v9/internal/proto"
"github.com/redis/go-redis/v9/internal/pushnotif"
)
// Scanner internal/hscan.Scanner exposed interface.
type Scanner = hscan.Scanner
// Nil reply returned by Redis when key does not exist.
const Nil = proto.Nil
// SetLogger set custom log
func SetLogger(logger internal.Logging) {
internal.Logger = logger
}
//------------------------------------------------------------------------------
type Hook interface {
DialHook(next DialHook) DialHook
ProcessHook(next ProcessHook) ProcessHook
ProcessPipelineHook(next ProcessPipelineHook) ProcessPipelineHook
}
type (
DialHook func(ctx context.Context, network, addr string) (net.Conn, error)
ProcessHook func(ctx context.Context, cmd Cmder) error
ProcessPipelineHook func(ctx context.Context, cmds []Cmder) error
)
type hooksMixin struct {
hooksMu *sync.RWMutex
slice []Hook
initial hooks
current hooks
}
func (hs *hooksMixin) initHooks(hooks hooks) {
hs.hooksMu = new(sync.RWMutex)
hs.initial = hooks
hs.chain()
}
type hooks struct {
dial DialHook
process ProcessHook
pipeline ProcessPipelineHook
txPipeline ProcessPipelineHook
}
func (h *hooks) setDefaults() {
if h.dial == nil {
h.dial = func(ctx context.Context, network, addr string) (net.Conn, error) { return nil, nil }
}
if h.process == nil {
h.process = func(ctx context.Context, cmd Cmder) error { return nil }
}
if h.pipeline == nil {
h.pipeline = func(ctx context.Context, cmds []Cmder) error { return nil }
}
if h.txPipeline == nil {
h.txPipeline = func(ctx context.Context, cmds []Cmder) error { return nil }
}
}
// AddHook is to add a hook to the queue.
// Hook is a function executed during network connection, command execution, and pipeline,
// it is a first-in-first-out stack queue (FIFO).
// You need to execute the next hook in each hook, unless you want to terminate the execution of the command.
// For example, you added hook-1, hook-2:
//
// client.AddHook(hook-1, hook-2)
//
// hook-1:
//
// func (Hook1) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
// return func(ctx context.Context, cmd Cmder) error {
// print("hook-1 start")
// next(ctx, cmd)
// print("hook-1 end")
// return nil
// }
// }
//
// hook-2:
//
// func (Hook2) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
// return func(ctx context.Context, cmd redis.Cmder) error {
// print("hook-2 start")
// next(ctx, cmd)
// print("hook-2 end")
// return nil
// }
// }
//
// The execution sequence is:
//
// hook-1 start -> hook-2 start -> exec redis cmd -> hook-2 end -> hook-1 end
//
// Please note: "next(ctx, cmd)" is very important, it will call the next hook,
// if "next(ctx, cmd)" is not executed, the redis command will not be executed.
func (hs *hooksMixin) AddHook(hook Hook) {
hs.slice = append(hs.slice, hook)
hs.chain()
}
func (hs *hooksMixin) chain() {
hs.initial.setDefaults()
hs.hooksMu.Lock()
defer hs.hooksMu.Unlock()
hs.current.dial = hs.initial.dial
hs.current.process = hs.initial.process
hs.current.pipeline = hs.initial.pipeline
hs.current.txPipeline = hs.initial.txPipeline
for i := len(hs.slice) - 1; i >= 0; i-- {
if wrapped := hs.slice[i].DialHook(hs.current.dial); wrapped != nil {
hs.current.dial = wrapped
}
if wrapped := hs.slice[i].ProcessHook(hs.current.process); wrapped != nil {
hs.current.process = wrapped
}
if wrapped := hs.slice[i].ProcessPipelineHook(hs.current.pipeline); wrapped != nil {
hs.current.pipeline = wrapped
}
if wrapped := hs.slice[i].ProcessPipelineHook(hs.current.txPipeline); wrapped != nil {
hs.current.txPipeline = wrapped
}
}
}
func (hs *hooksMixin) clone() hooksMixin {
hs.hooksMu.Lock()
defer hs.hooksMu.Unlock()
clone := *hs
l := len(clone.slice)
clone.slice = clone.slice[:l:l]
clone.hooksMu = new(sync.RWMutex)
return clone
}
func (hs *hooksMixin) withProcessHook(ctx context.Context, cmd Cmder, hook ProcessHook) error {
for i := len(hs.slice) - 1; i >= 0; i-- {
if wrapped := hs.slice[i].ProcessHook(hook); wrapped != nil {
hook = wrapped
}
}
return hook(ctx, cmd)
}
func (hs *hooksMixin) withProcessPipelineHook(
ctx context.Context, cmds []Cmder, hook ProcessPipelineHook,
) error {
for i := len(hs.slice) - 1; i >= 0; i-- {
if wrapped := hs.slice[i].ProcessPipelineHook(hook); wrapped != nil {
hook = wrapped
}
}
return hook(ctx, cmds)
}
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
// Access to hs.current is guarded by a read-only lock since it may be mutated by AddHook(...)
// while this dialer is concurrently accessed by the background connection pool population
// routine when MinIdleConns > 0.
hs.hooksMu.RLock()
current := hs.current
hs.hooksMu.RUnlock()
return current.dial(ctx, network, addr)
}
func (hs *hooksMixin) processHook(ctx context.Context, cmd Cmder) error {
return hs.current.process(ctx, cmd)
}
func (hs *hooksMixin) processPipelineHook(ctx context.Context, cmds []Cmder) error {
return hs.current.pipeline(ctx, cmds)
}
func (hs *hooksMixin) processTxPipelineHook(ctx context.Context, cmds []Cmder) error {
return hs.current.txPipeline(ctx, cmds)
}
//------------------------------------------------------------------------------
type baseClient struct {
opt *Options
connPool pool.Pooler
hooksMixin
onClose func() error // hook called when client is closed
// Push notification processing
pushProcessor PushNotificationProcessorInterface
}
func (c *baseClient) clone() *baseClient {
clone := *c
return &clone
}
func (c *baseClient) withTimeout(timeout time.Duration) *baseClient {
opt := c.opt.clone()
opt.ReadTimeout = timeout
opt.WriteTimeout = timeout
clone := c.clone()
clone.opt = opt
return clone
}
func (c *baseClient) String() string {
return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
}
func (c *baseClient) newConn(ctx context.Context) (*pool.Conn, error) {
cn, err := c.connPool.NewConn(ctx)
if err != nil {
return nil, err
}
err = c.initConn(ctx, cn)
if err != nil {
_ = c.connPool.CloseConn(cn)
return nil, err
}
return cn, nil
}
func (c *baseClient) getConn(ctx context.Context) (*pool.Conn, error) {
if c.opt.Limiter != nil {
err := c.opt.Limiter.Allow()
if err != nil {
return nil, err
}
}
cn, err := c._getConn(ctx)
if err != nil {
if c.opt.Limiter != nil {
c.opt.Limiter.ReportResult(err)
}
return nil, err
}
return cn, nil
}
func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
cn, err := c.connPool.Get(ctx)
if err != nil {
return nil, err
}
if cn.Inited {
return cn, nil
}
if err := c.initConn(ctx, cn); err != nil {
c.connPool.Remove(ctx, cn, err)
if err := errors.Unwrap(err); err != nil {
return nil, err
}
return nil, err
}
return cn, nil
}
func (c *baseClient) newReAuthCredentialsListener(poolCn *pool.Conn) auth.CredentialsListener {
return auth.NewReAuthCredentialsListener(
c.reAuthConnection(poolCn),
c.onAuthenticationErr(poolCn),
)
}
func (c *baseClient) reAuthConnection(poolCn *pool.Conn) func(credentials auth.Credentials) error {
return func(credentials auth.Credentials) error {
var err error
username, password := credentials.BasicAuth()
ctx := context.Background()
connPool := pool.NewSingleConnPool(c.connPool, poolCn)
// hooksMixin are intentionally empty here
cn := newConn(c.opt, connPool, nil)
if username != "" {
err = cn.AuthACL(ctx, username, password).Err()
} else {
err = cn.Auth(ctx, password).Err()
}
return err
}
}
func (c *baseClient) onAuthenticationErr(poolCn *pool.Conn) func(err error) {
return func(err error) {
if err != nil {
if isBadConn(err, false, c.opt.Addr) {
// Close the connection to force a reconnection.
err := c.connPool.CloseConn(poolCn)
if err != nil {
internal.Logger.Printf(context.Background(), "redis: failed to close connection: %v", err)
// try to close the network connection directly
// so that no resource is leaked
err := poolCn.Close()
if err != nil {
internal.Logger.Printf(context.Background(), "redis: failed to close network connection: %v", err)
}
}
}
internal.Logger.Printf(context.Background(), "redis: re-authentication failed: %v", err)
}
}
}
func (c *baseClient) wrappedOnClose(newOnClose func() error) func() error {
onClose := c.onClose
return func() error {
var firstErr error
err := newOnClose()
// Even if we have an error we would like to execute the onClose hook
// if it exists. We will return the first error that occurred.
// This is to keep error handling consistent with the rest of the code.
if err != nil {
firstErr = err
}
if onClose != nil {
err = onClose()
if err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}
}
func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
if cn.Inited {
return nil
}
var err error
cn.Inited = true
connPool := pool.NewSingleConnPool(c.connPool, cn)
conn := newConn(c.opt, connPool, &c.hooksMixin)
username, password := "", ""
if c.opt.StreamingCredentialsProvider != nil {
credentials, unsubscribeFromCredentialsProvider, err := c.opt.StreamingCredentialsProvider.
Subscribe(c.newReAuthCredentialsListener(cn))
if err != nil {
return fmt.Errorf("failed to subscribe to streaming credentials: %w", err)
}
c.onClose = c.wrappedOnClose(unsubscribeFromCredentialsProvider)
cn.SetOnClose(unsubscribeFromCredentialsProvider)
username, password = credentials.BasicAuth()
} else if c.opt.CredentialsProviderContext != nil {
username, password, err = c.opt.CredentialsProviderContext(ctx)
if err != nil {
return fmt.Errorf("failed to get credentials from context provider: %w", err)
}
} else if c.opt.CredentialsProvider != nil {
username, password = c.opt.CredentialsProvider()
} else if c.opt.Username != "" || c.opt.Password != "" {
username, password = c.opt.Username, c.opt.Password
}
// for redis-server versions that do not support the HELLO command,
// RESP2 will continue to be used.
if err = conn.Hello(ctx, c.opt.Protocol, username, password, c.opt.ClientName).Err(); err == nil {
// Authentication successful with HELLO command
} else if !isRedisError(err) {
// When the server responds with the RESP protocol and the result is not a normal
// execution result of the HELLO command, we consider it to be an indication that
// the server does not support the HELLO command.
// The server may be a redis-server that does not support the HELLO command,
// or it could be DragonflyDB or a third-party redis-proxy. They all respond
// with different error string results for unsupported commands, making it
// difficult to rely on error strings to determine all results.
return err
} else if password != "" {
// Try legacy AUTH command if HELLO failed
if username != "" {
err = conn.AuthACL(ctx, username, password).Err()
} else {
err = conn.Auth(ctx, password).Err()
}
if err != nil {
return fmt.Errorf("failed to authenticate: %w", err)
}
}
_, err = conn.Pipelined(ctx, func(pipe Pipeliner) error {
if c.opt.DB > 0 {
pipe.Select(ctx, c.opt.DB)
}
if c.opt.readOnly {
pipe.ReadOnly(ctx)
}
if c.opt.ClientName != "" {
pipe.ClientSetName(ctx, c.opt.ClientName)
}
return nil
})
if err != nil {
return fmt.Errorf("failed to initialize connection options: %w", err)
}
if !c.opt.DisableIdentity && !c.opt.DisableIndentity {
libName := ""
libVer := Version()
if c.opt.IdentitySuffix != "" {
libName = c.opt.IdentitySuffix
}
p := conn.Pipeline()
p.ClientSetInfo(ctx, WithLibraryName(libName))
p.ClientSetInfo(ctx, WithLibraryVersion(libVer))
// Handle network errors (e.g. timeouts) in CLIENT SETINFO to avoid
// out of order responses later on.
if _, err = p.Exec(ctx); err != nil && !isRedisError(err) {
return err
}
}
if c.opt.OnConnect != nil {
return c.opt.OnConnect(ctx, conn)
}
return nil
}
func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error) {
if c.opt.Limiter != nil {
c.opt.Limiter.ReportResult(err)
}
if isBadConn(err, false, c.opt.Addr) {
c.connPool.Remove(ctx, cn, err)
} else {
// process any pending push notifications before returning the connection to the pool
if err := c.processPushNotifications(ctx, cn); err != nil {
// Log the error but don't fail the connection release
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before releasing connection: %v", err)
}
c.connPool.Put(ctx, cn)
}
}
func (c *baseClient) withConn(
ctx context.Context, fn func(context.Context, *pool.Conn) error,
) error {
cn, err := c.getConn(ctx)
if err != nil {
return err
}
var fnErr error
defer func() {
c.releaseConn(ctx, cn, fnErr)
}()
fnErr = fn(ctx, cn)
return fnErr
}
func (c *baseClient) dial(ctx context.Context, network, addr string) (net.Conn, error) {
return c.opt.Dialer(ctx, network, addr)
}
func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
var lastErr error
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
attempt := attempt
retry, err := c._process(ctx, cmd, attempt)
if err == nil || !retry {
return err
}
lastErr = err
}
return lastErr
}
func (c *baseClient) assertUnstableCommand(cmd Cmder) bool {
switch cmd.(type) {
case *AggregateCmd, *FTInfoCmd, *FTSpellCheckCmd, *FTSearchCmd, *FTSynDumpCmd:
if c.opt.UnstableResp3 {
return true
} else {
panic("RESP3 responses for this command are disabled because they may still change. Please set the flag UnstableResp3 . See the [README](https://github.com/redis/go-redis/blob/master/README.md) and the release notes for guidance.")
}
default:
return false
}
}
func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) {
if attempt > 0 {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
return false, err
}
}
retryTimeout := uint32(0)
if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
// Process any pending push notifications before executing the command
if err := c.processPushNotifications(ctx, cn); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before command: %v", err)
}
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmd(wr, cmd)
}); err != nil {
atomic.StoreUint32(&retryTimeout, 1)
return err
}
readReplyFunc := cmd.readReply
// Apply unstable RESP3 search module.
if c.opt.Protocol != 2 && c.assertUnstableCommand(cmd) {
readReplyFunc = cmd.readRawReply
}
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
return readReplyFunc(rd)
}); err != nil {
if cmd.readTimeout() == nil {
atomic.StoreUint32(&retryTimeout, 1)
} else {
atomic.StoreUint32(&retryTimeout, 0)
}
return err
}
return nil
}); err != nil {
retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1)
return retry, err
}
return false, nil
}
func (c *baseClient) retryBackoff(attempt int) time.Duration {
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
}
func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
if timeout := cmd.readTimeout(); timeout != nil {
t := *timeout
if t == 0 {
return 0
}
return t + 10*time.Second
}
return c.opt.ReadTimeout
}
// context returns the context for the current connection.
// If the context timeout is enabled, it returns the original context.
// Otherwise, it returns a new background context.
func (c *baseClient) context(ctx context.Context) context.Context {
if c.opt.ContextTimeoutEnabled {
return ctx
}
return context.Background()
}
// Close closes the client, releasing any open resources.
//
// It is rare to Close a Client, as the Client is meant to be
// long-lived and shared between many goroutines.
func (c *baseClient) Close() error {
var firstErr error
if c.onClose != nil {
if err := c.onClose(); err != nil {
firstErr = err
}
}
if err := c.connPool.Close(); err != nil && firstErr == nil {
firstErr = err
}
return firstErr
}
func (c *baseClient) getAddr() string {
return c.opt.Addr
}
func (c *baseClient) processPipeline(ctx context.Context, cmds []Cmder) error {
if err := c.generalProcessPipeline(ctx, cmds, c.pipelineProcessCmds); err != nil {
return err
}
return cmdsFirstErr(cmds)
}
func (c *baseClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
if err := c.generalProcessPipeline(ctx, cmds, c.txPipelineProcessCmds); err != nil {
return err
}
return cmdsFirstErr(cmds)
}
type pipelineProcessor func(context.Context, *pool.Conn, []Cmder) (bool, error)
func (c *baseClient) generalProcessPipeline(
ctx context.Context, cmds []Cmder, p pipelineProcessor,
) error {
var lastErr error
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
if attempt > 0 {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
setCmdsErr(cmds, err)
return err
}
}
// Enable retries by default to retry dial errors returned by withConn.
canRetry := true
lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
// Process any pending push notifications before executing the pipeline
if err := c.processPushNotifications(ctx, cn); err != nil {
// Log the error but don't fail the pipeline execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before pipeline: %v", err)
}
var err error
canRetry, err = p(ctx, cn, cmds)
return err
})
if lastErr == nil || !canRetry || !shouldRetry(lastErr, true) {
return lastErr
}
}
return lastErr
}
func (c *baseClient) pipelineProcessCmds(
ctx context.Context, cn *pool.Conn, cmds []Cmder,
) (bool, error) {
// Process any pending push notifications before executing the pipeline
// This ensures that cluster topology changes are handled immediately
if err := c.processPushNotifications(ctx, cn); err != nil {
// Log the error but don't fail the pipeline execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before pipeline: %v", err)
}
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds)
}); err != nil {
setCmdsErr(cmds, err)
return true, err
}
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
// read all replies
return c.pipelineReadCmds(ctx, cn, rd, cmds)
}); err != nil {
return true, err
}
return false, nil
}
func (c *baseClient) pipelineReadCmds(ctx context.Context, cn *pool.Conn, rd *proto.Reader, cmds []Cmder) error {
for i, cmd := range cmds {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
err := cmd.readReply(rd)
cmd.SetErr(err)
if err != nil && !isRedisError(err) {
setCmdsErr(cmds[i+1:], err)
return err
}
}
// Retry errors like "LOADING redis is loading the dataset in memory".
return cmds[0].Err()
}
func (c *baseClient) txPipelineProcessCmds(
ctx context.Context, cn *pool.Conn, cmds []Cmder,
) (bool, error) {
// Process any pending push notifications before executing the transaction pipeline
// This ensures that cluster topology changes are handled immediately
if err := c.processPushNotifications(ctx, cn); err != nil {
// Log the error but don't fail the transaction execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before transaction: %v", err)
}
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
return writeCmds(wr, cmds)
}); err != nil {
setCmdsErr(cmds, err)
return true, err
}
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
statusCmd := cmds[0].(*StatusCmd)
// Trim multi and exec.
trimmedCmds := cmds[1 : len(cmds)-1]
if err := c.txPipelineReadQueued(ctx, cn, rd, statusCmd, trimmedCmds); err != nil {
setCmdsErr(cmds, err)
return err
}
// Read replies.
return c.pipelineReadCmds(ctx, cn, rd, trimmedCmds)
}); err != nil {
return false, err
}
return false, nil
}
// txPipelineReadQueued reads queued replies from the Redis server.
// It returns an error if the server returns an error or if the number of replies does not match the number of commands.
func (c *baseClient) txPipelineReadQueued(ctx context.Context, cn *pool.Conn, rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
// Parse +OK.
if err := statusCmd.readReply(rd); err != nil {
return err
}
// Parse +QUEUED.
for range cmds {
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) {
return err
}
}
// To be sure there are no buffered push notifications, we process them before reading the reply
if err := c.processPendingPushNotificationWithReader(ctx, cn, rd); err != nil {
// Log the error but don't fail the command execution
// Push notification processing errors shouldn't break normal Redis operations
internal.Logger.Printf(ctx, "push: error processing pending notifications before reading reply: %v", err)
}
// Parse number of replies.
line, err := rd.ReadLine()
if err != nil {
if err == Nil {
err = TxFailedErr
}
return err
}
if line[0] != proto.RespArray {
return fmt.Errorf("redis: expected '*', but got line %q", line)
}
return nil
}
//------------------------------------------------------------------------------
// Client is a Redis client representing a pool of zero or more underlying connections.
// It's safe for concurrent use by multiple goroutines.
//
// Client creates and frees connections automatically; it also maintains a free pool
// of idle connections. You can control the pool size with Config.PoolSize option.
type Client struct {
*baseClient
cmdable
}
// NewClient returns a client to the Redis Server specified by Options.
func NewClient(opt *Options) *Client {
if opt == nil {
panic("redis: NewClient nil options")
}
opt.init()
// Push notifications are always enabled for RESP3 (cannot be disabled)
c := Client{
baseClient: &baseClient{
opt: opt,
},
}
c.init()
// Initialize push notification processor using shared helper
// Use void processor for RESP2 connections (push notifications not available)
c.pushProcessor = initializePushProcessor(opt)
// Update options with the initialized push processor for connection pool
opt.PushNotificationProcessor = c.pushProcessor
c.connPool = newConnPool(opt, c.dialHook)
return &c
}
func (c *Client) init() {
c.cmdable = c.Process
c.initHooks(hooks{
dial: c.baseClient.dial,
process: c.baseClient.process,
pipeline: c.baseClient.processPipeline,
txPipeline: c.baseClient.processTxPipeline,
})
}
func (c *Client) WithTimeout(timeout time.Duration) *Client {
clone := *c
clone.baseClient = c.baseClient.withTimeout(timeout)
clone.init()
return &clone
}
func (c *Client) Conn() *Conn {
return newConn(c.opt, pool.NewStickyConnPool(c.connPool), &c.hooksMixin)
}
func (c *Client) Process(ctx context.Context, cmd Cmder) error {
err := c.processHook(ctx, cmd)
cmd.SetErr(err)
return err
}
// Options returns read-only Options that were used to create the client.
func (c *Client) Options() *Options {
return c.opt
}
// initializePushProcessor initializes the push notification processor for any client type.
// This is a shared helper to avoid duplication across NewClient, NewFailoverClient, and NewSentinelClient.
func initializePushProcessor(opt *Options) PushNotificationProcessorInterface {
// Always use custom processor if provided
if opt.PushNotificationProcessor != nil {
return opt.PushNotificationProcessor
}
// Push notifications are always enabled for RESP3, disabled for RESP2
if opt.Protocol == 3 {
// Create default processor for RESP3 connections
return NewPushNotificationProcessor()
}
// Create void processor for RESP2 connections (push notifications not available)
return NewVoidPushNotificationProcessor()
}
// RegisterPushNotificationHandler registers a handler for a specific push notification name.
// Returns an error if a handler is already registered for this push notification name.
// If protected is true, the handler cannot be unregistered.
func (c *Client) RegisterPushNotificationHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected)
}
// GetPushNotificationProcessor returns the push notification processor.
func (c *Client) GetPushNotificationProcessor() PushNotificationProcessorInterface {
return c.pushProcessor
}
// GetPushNotificationHandler returns the handler for a specific push notification name.
// Returns nil if no handler is registered for the given name.
func (c *Client) GetPushNotificationHandler(pushNotificationName string) PushNotificationHandler {
return c.pushProcessor.GetHandler(pushNotificationName)
}
type PoolStats pool.Stats
// PoolStats returns connection pool stats.
func (c *Client) PoolStats() *PoolStats {
stats := c.connPool.Stats()
return (*PoolStats)(stats)
}
func (c *Client) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(ctx, fn)
}
func (c *Client) Pipeline() Pipeliner {
pipe := Pipeline{
exec: pipelineExecer(c.processPipelineHook),
}
pipe.init()
return &pipe
}
func (c *Client) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
return c.TxPipeline().Pipelined(ctx, fn)
}
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *Client) TxPipeline() Pipeliner {
pipe := Pipeline{
exec: func(ctx context.Context, cmds []Cmder) error {
cmds = wrapMultiExec(ctx, cmds)
return c.processTxPipelineHook(ctx, cmds)
},
}
pipe.init()
return &pipe
}
func (c *Client) pubSub() *PubSub {
pubsub := &PubSub{
opt: c.opt,
newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
return c.newConn(ctx)
},
closeConn: c.connPool.CloseConn,
pushProcessor: c.pushProcessor,
}
pubsub.init()
return pubsub
}
// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
// Note that this method does not wait on a response from Redis, so the
// subscription may not be active immediately. To force the connection to wait,
// you may call the Receive() method on the returned *PubSub like so:
//
// sub := client.Subscribe(queryResp)
// iface, err := sub.Receive()
// if err != nil {
// // handle error
// }
//
// // Should be *Subscription, but others are possible if other actions have been
// // taken on sub since it was created.
// switch iface.(type) {
// case *Subscription:
// // subscribe succeeded
// case *Message:
// // received first message
// case *Pong:
// // pong received
// default:
// // handle error
// }
//
// ch := sub.Channel()
func (c *Client) Subscribe(ctx context.Context, channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.Subscribe(ctx, channels...)
}
return pubsub
}
// PSubscribe subscribes the client to the given patterns.
// Patterns can be omitted to create empty subscription.
func (c *Client) PSubscribe(ctx context.Context, channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.PSubscribe(ctx, channels...)
}
return pubsub
}
// SSubscribe Subscribes the client to the specified shard channels.
// Channels can be omitted to create empty subscription.
func (c *Client) SSubscribe(ctx context.Context, channels ...string) *PubSub {
pubsub := c.pubSub()
if len(channels) > 0 {
_ = pubsub.SSubscribe(ctx, channels...)
}
return pubsub
}
//------------------------------------------------------------------------------
// Conn represents a single Redis connection rather than a pool of connections.
// Prefer running commands from Client unless there is a specific need
// for a continuous single Redis connection.
type Conn struct {
baseClient
cmdable
statefulCmdable
}
// newConn is a helper func to create a new Conn instance.
// the Conn instance is not thread-safe and should not be shared between goroutines.
// the parentHooks will be cloned, no need to clone before passing it.
func newConn(opt *Options, connPool pool.Pooler, parentHooks *hooksMixin) *Conn {
c := Conn{
baseClient: baseClient{
opt: opt,
connPool: connPool,
},
}
if parentHooks != nil {
c.hooksMixin = parentHooks.clone()
}
// Initialize push notification processor using shared helper
// Use void processor for RESP2 connections (push notifications not available)
c.pushProcessor = initializePushProcessor(opt)
c.cmdable = c.Process
c.statefulCmdable = c.Process
c.initHooks(hooks{
dial: c.baseClient.dial,
process: c.baseClient.process,
pipeline: c.baseClient.processPipeline,
txPipeline: c.baseClient.processTxPipeline,
})
return &c
}
func (c *Conn) Process(ctx context.Context, cmd Cmder) error {
err := c.processHook(ctx, cmd)
cmd.SetErr(err)
return err
}
// RegisterPushNotificationHandler registers a handler for a specific push notification name.
// Returns an error if a handler is already registered for this push notification name.
// If protected is true, the handler cannot be unregistered.
func (c *Conn) RegisterPushNotificationHandler(pushNotificationName string, handler PushNotificationHandler, protected bool) error {
return c.pushProcessor.RegisterHandler(pushNotificationName, handler, protected)
}
// GetPushNotificationProcessor returns the push notification processor.
func (c *Conn) GetPushNotificationProcessor() PushNotificationProcessorInterface {
return c.pushProcessor
}
func (c *Conn) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
return c.Pipeline().Pipelined(ctx, fn)
}
func (c *Conn) Pipeline() Pipeliner {
pipe := Pipeline{
exec: c.processPipelineHook,
}
pipe.init()
return &pipe
}
func (c *Conn) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
return c.TxPipeline().Pipelined(ctx, fn)
}
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
func (c *Conn) TxPipeline() Pipeliner {
pipe := Pipeline{
exec: func(ctx context.Context, cmds []Cmder) error {
cmds = wrapMultiExec(ctx, cmds)
return c.processTxPipelineHook(ctx, cmds)
},
}
pipe.init()
return &pipe
}
// processPushNotifications processes all pending push notifications on a connection
// This ensures that cluster topology changes are handled immediately before the connection is used
// This method should be called by the client before using WithReader for command execution
func (c *baseClient) processPushNotifications(ctx context.Context, cn *pool.Conn) error {
// Only process push notifications for RESP3 connections with a processor
if c.opt.Protocol != 3 || c.pushProcessor == nil {
return nil
}
// Use WithReader to access the reader and process push notifications
// This is critical for hitless upgrades to work properly
return cn.WithReader(ctx, 0, func(rd *proto.Reader) error {
// Create handler context with client, connection pool, and connection information
handlerCtx := c.pushNotificationHandlerContext(cn)
return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd)
})
}
// processPendingPushNotificationWithReader processes all pending push notifications on a connection
// This method should be called by the client in WithReader before reading the reply
func (c *baseClient) processPendingPushNotificationWithReader(ctx context.Context, cn *pool.Conn, rd *proto.Reader) error {
if c.opt.Protocol != 3 || c.pushProcessor == nil {
return nil
}
// Create handler context with client, connection pool, and connection information
handlerCtx := c.pushNotificationHandlerContext(cn)
return c.pushProcessor.ProcessPendingNotifications(ctx, handlerCtx, rd)
}
// pushNotificationHandlerContext creates a handler context for push notification processing
func (c *baseClient) pushNotificationHandlerContext(cn *pool.Conn) pushnotif.HandlerContext {
return pushnotif.NewHandlerContext(c, c.connPool, nil, cn, false)
}