mirror of
https://github.com/redis/go-redis.git
synced 2025-07-18 00:20:57 +03:00
- Add PushNotificationProcessor field to pool.Conn for connection-level processing - Modify connection pool Put() and isHealthyConn() to handle push notifications - Process pending push notifications before discarding connections - Pass push notification processor to connections during creation - Update connection pool options to include push notification processor - Add comprehensive test for connection health check integration This prevents connections with buffered push notification data from being incorrectly discarded by the connection health check, ensuring push notifications are properly processed and connections are reused.
1030 lines
27 KiB
Go
1030 lines
27 KiB
Go
package redis
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/redis/go-redis/v9/auth"
|
|
"github.com/redis/go-redis/v9/internal"
|
|
"github.com/redis/go-redis/v9/internal/hscan"
|
|
"github.com/redis/go-redis/v9/internal/pool"
|
|
"github.com/redis/go-redis/v9/internal/proto"
|
|
)
|
|
|
|
// Scanner internal/hscan.Scanner exposed interface.
|
|
type Scanner = hscan.Scanner
|
|
|
|
// Nil reply returned by Redis when key does not exist.
|
|
const Nil = proto.Nil
|
|
|
|
// SetLogger set custom log
|
|
func SetLogger(logger internal.Logging) {
|
|
internal.Logger = logger
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
type Hook interface {
|
|
DialHook(next DialHook) DialHook
|
|
ProcessHook(next ProcessHook) ProcessHook
|
|
ProcessPipelineHook(next ProcessPipelineHook) ProcessPipelineHook
|
|
}
|
|
|
|
type (
|
|
DialHook func(ctx context.Context, network, addr string) (net.Conn, error)
|
|
ProcessHook func(ctx context.Context, cmd Cmder) error
|
|
ProcessPipelineHook func(ctx context.Context, cmds []Cmder) error
|
|
)
|
|
|
|
type hooksMixin struct {
|
|
hooksMu *sync.RWMutex
|
|
|
|
slice []Hook
|
|
initial hooks
|
|
current hooks
|
|
}
|
|
|
|
func (hs *hooksMixin) initHooks(hooks hooks) {
|
|
hs.hooksMu = new(sync.RWMutex)
|
|
hs.initial = hooks
|
|
hs.chain()
|
|
}
|
|
|
|
type hooks struct {
|
|
dial DialHook
|
|
process ProcessHook
|
|
pipeline ProcessPipelineHook
|
|
txPipeline ProcessPipelineHook
|
|
}
|
|
|
|
func (h *hooks) setDefaults() {
|
|
if h.dial == nil {
|
|
h.dial = func(ctx context.Context, network, addr string) (net.Conn, error) { return nil, nil }
|
|
}
|
|
if h.process == nil {
|
|
h.process = func(ctx context.Context, cmd Cmder) error { return nil }
|
|
}
|
|
if h.pipeline == nil {
|
|
h.pipeline = func(ctx context.Context, cmds []Cmder) error { return nil }
|
|
}
|
|
if h.txPipeline == nil {
|
|
h.txPipeline = func(ctx context.Context, cmds []Cmder) error { return nil }
|
|
}
|
|
}
|
|
|
|
// AddHook is to add a hook to the queue.
|
|
// Hook is a function executed during network connection, command execution, and pipeline,
|
|
// it is a first-in-first-out stack queue (FIFO).
|
|
// You need to execute the next hook in each hook, unless you want to terminate the execution of the command.
|
|
// For example, you added hook-1, hook-2:
|
|
//
|
|
// client.AddHook(hook-1, hook-2)
|
|
//
|
|
// hook-1:
|
|
//
|
|
// func (Hook1) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
|
|
// return func(ctx context.Context, cmd Cmder) error {
|
|
// print("hook-1 start")
|
|
// next(ctx, cmd)
|
|
// print("hook-1 end")
|
|
// return nil
|
|
// }
|
|
// }
|
|
//
|
|
// hook-2:
|
|
//
|
|
// func (Hook2) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
|
|
// return func(ctx context.Context, cmd redis.Cmder) error {
|
|
// print("hook-2 start")
|
|
// next(ctx, cmd)
|
|
// print("hook-2 end")
|
|
// return nil
|
|
// }
|
|
// }
|
|
//
|
|
// The execution sequence is:
|
|
//
|
|
// hook-1 start -> hook-2 start -> exec redis cmd -> hook-2 end -> hook-1 end
|
|
//
|
|
// Please note: "next(ctx, cmd)" is very important, it will call the next hook,
|
|
// if "next(ctx, cmd)" is not executed, the redis command will not be executed.
|
|
func (hs *hooksMixin) AddHook(hook Hook) {
|
|
hs.slice = append(hs.slice, hook)
|
|
hs.chain()
|
|
}
|
|
|
|
func (hs *hooksMixin) chain() {
|
|
hs.initial.setDefaults()
|
|
|
|
hs.hooksMu.Lock()
|
|
defer hs.hooksMu.Unlock()
|
|
|
|
hs.current.dial = hs.initial.dial
|
|
hs.current.process = hs.initial.process
|
|
hs.current.pipeline = hs.initial.pipeline
|
|
hs.current.txPipeline = hs.initial.txPipeline
|
|
|
|
for i := len(hs.slice) - 1; i >= 0; i-- {
|
|
if wrapped := hs.slice[i].DialHook(hs.current.dial); wrapped != nil {
|
|
hs.current.dial = wrapped
|
|
}
|
|
if wrapped := hs.slice[i].ProcessHook(hs.current.process); wrapped != nil {
|
|
hs.current.process = wrapped
|
|
}
|
|
if wrapped := hs.slice[i].ProcessPipelineHook(hs.current.pipeline); wrapped != nil {
|
|
hs.current.pipeline = wrapped
|
|
}
|
|
if wrapped := hs.slice[i].ProcessPipelineHook(hs.current.txPipeline); wrapped != nil {
|
|
hs.current.txPipeline = wrapped
|
|
}
|
|
}
|
|
}
|
|
|
|
func (hs *hooksMixin) clone() hooksMixin {
|
|
hs.hooksMu.Lock()
|
|
defer hs.hooksMu.Unlock()
|
|
|
|
clone := *hs
|
|
l := len(clone.slice)
|
|
clone.slice = clone.slice[:l:l]
|
|
clone.hooksMu = new(sync.RWMutex)
|
|
return clone
|
|
}
|
|
|
|
func (hs *hooksMixin) withProcessHook(ctx context.Context, cmd Cmder, hook ProcessHook) error {
|
|
for i := len(hs.slice) - 1; i >= 0; i-- {
|
|
if wrapped := hs.slice[i].ProcessHook(hook); wrapped != nil {
|
|
hook = wrapped
|
|
}
|
|
}
|
|
return hook(ctx, cmd)
|
|
}
|
|
|
|
func (hs *hooksMixin) withProcessPipelineHook(
|
|
ctx context.Context, cmds []Cmder, hook ProcessPipelineHook,
|
|
) error {
|
|
for i := len(hs.slice) - 1; i >= 0; i-- {
|
|
if wrapped := hs.slice[i].ProcessPipelineHook(hook); wrapped != nil {
|
|
hook = wrapped
|
|
}
|
|
}
|
|
return hook(ctx, cmds)
|
|
}
|
|
|
|
func (hs *hooksMixin) dialHook(ctx context.Context, network, addr string) (net.Conn, error) {
|
|
// Access to hs.current is guarded by a read-only lock since it may be mutated by AddHook(...)
|
|
// while this dialer is concurrently accessed by the background connection pool population
|
|
// routine when MinIdleConns > 0.
|
|
hs.hooksMu.RLock()
|
|
current := hs.current
|
|
hs.hooksMu.RUnlock()
|
|
|
|
return current.dial(ctx, network, addr)
|
|
}
|
|
|
|
func (hs *hooksMixin) processHook(ctx context.Context, cmd Cmder) error {
|
|
return hs.current.process(ctx, cmd)
|
|
}
|
|
|
|
func (hs *hooksMixin) processPipelineHook(ctx context.Context, cmds []Cmder) error {
|
|
return hs.current.pipeline(ctx, cmds)
|
|
}
|
|
|
|
func (hs *hooksMixin) processTxPipelineHook(ctx context.Context, cmds []Cmder) error {
|
|
return hs.current.txPipeline(ctx, cmds)
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
type baseClient struct {
|
|
opt *Options
|
|
connPool pool.Pooler
|
|
hooksMixin
|
|
|
|
onClose func() error // hook called when client is closed
|
|
|
|
// Push notification processing
|
|
pushProcessor *PushNotificationProcessor
|
|
}
|
|
|
|
func (c *baseClient) clone() *baseClient {
|
|
clone := *c
|
|
return &clone
|
|
}
|
|
|
|
func (c *baseClient) withTimeout(timeout time.Duration) *baseClient {
|
|
opt := c.opt.clone()
|
|
opt.ReadTimeout = timeout
|
|
opt.WriteTimeout = timeout
|
|
|
|
clone := c.clone()
|
|
clone.opt = opt
|
|
|
|
return clone
|
|
}
|
|
|
|
func (c *baseClient) String() string {
|
|
return fmt.Sprintf("Redis<%s db:%d>", c.getAddr(), c.opt.DB)
|
|
}
|
|
|
|
func (c *baseClient) newConn(ctx context.Context) (*pool.Conn, error) {
|
|
cn, err := c.connPool.NewConn(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = c.initConn(ctx, cn)
|
|
if err != nil {
|
|
_ = c.connPool.CloseConn(cn)
|
|
return nil, err
|
|
}
|
|
|
|
return cn, nil
|
|
}
|
|
|
|
func (c *baseClient) getConn(ctx context.Context) (*pool.Conn, error) {
|
|
if c.opt.Limiter != nil {
|
|
err := c.opt.Limiter.Allow()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
cn, err := c._getConn(ctx)
|
|
if err != nil {
|
|
if c.opt.Limiter != nil {
|
|
c.opt.Limiter.ReportResult(err)
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
return cn, nil
|
|
}
|
|
|
|
func (c *baseClient) _getConn(ctx context.Context) (*pool.Conn, error) {
|
|
cn, err := c.connPool.Get(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if cn.Inited {
|
|
return cn, nil
|
|
}
|
|
|
|
if err := c.initConn(ctx, cn); err != nil {
|
|
c.connPool.Remove(ctx, cn, err)
|
|
if err := errors.Unwrap(err); err != nil {
|
|
return nil, err
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
return cn, nil
|
|
}
|
|
|
|
func (c *baseClient) newReAuthCredentialsListener(poolCn *pool.Conn) auth.CredentialsListener {
|
|
return auth.NewReAuthCredentialsListener(
|
|
c.reAuthConnection(poolCn),
|
|
c.onAuthenticationErr(poolCn),
|
|
)
|
|
}
|
|
|
|
func (c *baseClient) reAuthConnection(poolCn *pool.Conn) func(credentials auth.Credentials) error {
|
|
return func(credentials auth.Credentials) error {
|
|
var err error
|
|
username, password := credentials.BasicAuth()
|
|
ctx := context.Background()
|
|
connPool := pool.NewSingleConnPool(c.connPool, poolCn)
|
|
// hooksMixin are intentionally empty here
|
|
cn := newConn(c.opt, connPool, nil)
|
|
|
|
if username != "" {
|
|
err = cn.AuthACL(ctx, username, password).Err()
|
|
} else {
|
|
err = cn.Auth(ctx, password).Err()
|
|
}
|
|
return err
|
|
}
|
|
}
|
|
func (c *baseClient) onAuthenticationErr(poolCn *pool.Conn) func(err error) {
|
|
return func(err error) {
|
|
if err != nil {
|
|
if isBadConn(err, false, c.opt.Addr) {
|
|
// Close the connection to force a reconnection.
|
|
err := c.connPool.CloseConn(poolCn)
|
|
if err != nil {
|
|
internal.Logger.Printf(context.Background(), "redis: failed to close connection: %v", err)
|
|
// try to close the network connection directly
|
|
// so that no resource is leaked
|
|
err := poolCn.Close()
|
|
if err != nil {
|
|
internal.Logger.Printf(context.Background(), "redis: failed to close network connection: %v", err)
|
|
}
|
|
}
|
|
}
|
|
internal.Logger.Printf(context.Background(), "redis: re-authentication failed: %v", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *baseClient) wrappedOnClose(newOnClose func() error) func() error {
|
|
onClose := c.onClose
|
|
return func() error {
|
|
var firstErr error
|
|
err := newOnClose()
|
|
// Even if we have an error we would like to execute the onClose hook
|
|
// if it exists. We will return the first error that occurred.
|
|
// This is to keep error handling consistent with the rest of the code.
|
|
if err != nil {
|
|
firstErr = err
|
|
}
|
|
if onClose != nil {
|
|
err = onClose()
|
|
if err != nil && firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
}
|
|
return firstErr
|
|
}
|
|
}
|
|
|
|
func (c *baseClient) initConn(ctx context.Context, cn *pool.Conn) error {
|
|
if cn.Inited {
|
|
return nil
|
|
}
|
|
|
|
var err error
|
|
cn.Inited = true
|
|
connPool := pool.NewSingleConnPool(c.connPool, cn)
|
|
conn := newConn(c.opt, connPool, &c.hooksMixin)
|
|
|
|
username, password := "", ""
|
|
if c.opt.StreamingCredentialsProvider != nil {
|
|
credentials, unsubscribeFromCredentialsProvider, err := c.opt.StreamingCredentialsProvider.
|
|
Subscribe(c.newReAuthCredentialsListener(cn))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to subscribe to streaming credentials: %w", err)
|
|
}
|
|
c.onClose = c.wrappedOnClose(unsubscribeFromCredentialsProvider)
|
|
cn.SetOnClose(unsubscribeFromCredentialsProvider)
|
|
username, password = credentials.BasicAuth()
|
|
} else if c.opt.CredentialsProviderContext != nil {
|
|
username, password, err = c.opt.CredentialsProviderContext(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get credentials from context provider: %w", err)
|
|
}
|
|
} else if c.opt.CredentialsProvider != nil {
|
|
username, password = c.opt.CredentialsProvider()
|
|
} else if c.opt.Username != "" || c.opt.Password != "" {
|
|
username, password = c.opt.Username, c.opt.Password
|
|
}
|
|
|
|
// for redis-server versions that do not support the HELLO command,
|
|
// RESP2 will continue to be used.
|
|
if err = conn.Hello(ctx, c.opt.Protocol, username, password, c.opt.ClientName).Err(); err == nil {
|
|
// Authentication successful with HELLO command
|
|
} else if !isRedisError(err) {
|
|
// When the server responds with the RESP protocol and the result is not a normal
|
|
// execution result of the HELLO command, we consider it to be an indication that
|
|
// the server does not support the HELLO command.
|
|
// The server may be a redis-server that does not support the HELLO command,
|
|
// or it could be DragonflyDB or a third-party redis-proxy. They all respond
|
|
// with different error string results for unsupported commands, making it
|
|
// difficult to rely on error strings to determine all results.
|
|
return err
|
|
} else if password != "" {
|
|
// Try legacy AUTH command if HELLO failed
|
|
if username != "" {
|
|
err = conn.AuthACL(ctx, username, password).Err()
|
|
} else {
|
|
err = conn.Auth(ctx, password).Err()
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("failed to authenticate: %w", err)
|
|
}
|
|
}
|
|
|
|
_, err = conn.Pipelined(ctx, func(pipe Pipeliner) error {
|
|
if c.opt.DB > 0 {
|
|
pipe.Select(ctx, c.opt.DB)
|
|
}
|
|
|
|
if c.opt.readOnly {
|
|
pipe.ReadOnly(ctx)
|
|
}
|
|
|
|
if c.opt.ClientName != "" {
|
|
pipe.ClientSetName(ctx, c.opt.ClientName)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("failed to initialize connection options: %w", err)
|
|
}
|
|
|
|
if !c.opt.DisableIdentity && !c.opt.DisableIndentity {
|
|
libName := ""
|
|
libVer := Version()
|
|
if c.opt.IdentitySuffix != "" {
|
|
libName = c.opt.IdentitySuffix
|
|
}
|
|
p := conn.Pipeline()
|
|
p.ClientSetInfo(ctx, WithLibraryName(libName))
|
|
p.ClientSetInfo(ctx, WithLibraryVersion(libVer))
|
|
// Handle network errors (e.g. timeouts) in CLIENT SETINFO to avoid
|
|
// out of order responses later on.
|
|
if _, err = p.Exec(ctx); err != nil && !isRedisError(err) {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if c.opt.OnConnect != nil {
|
|
return c.opt.OnConnect(ctx, conn)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *baseClient) releaseConn(ctx context.Context, cn *pool.Conn, err error) {
|
|
if c.opt.Limiter != nil {
|
|
c.opt.Limiter.ReportResult(err)
|
|
}
|
|
|
|
if isBadConn(err, false, c.opt.Addr) {
|
|
c.connPool.Remove(ctx, cn, err)
|
|
} else {
|
|
c.connPool.Put(ctx, cn)
|
|
}
|
|
}
|
|
|
|
func (c *baseClient) withConn(
|
|
ctx context.Context, fn func(context.Context, *pool.Conn) error,
|
|
) error {
|
|
cn, err := c.getConn(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
var fnErr error
|
|
defer func() {
|
|
c.releaseConn(ctx, cn, fnErr)
|
|
}()
|
|
|
|
fnErr = fn(ctx, cn)
|
|
|
|
return fnErr
|
|
}
|
|
|
|
func (c *baseClient) dial(ctx context.Context, network, addr string) (net.Conn, error) {
|
|
return c.opt.Dialer(ctx, network, addr)
|
|
}
|
|
|
|
func (c *baseClient) process(ctx context.Context, cmd Cmder) error {
|
|
var lastErr error
|
|
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
|
|
attempt := attempt
|
|
|
|
retry, err := c._process(ctx, cmd, attempt)
|
|
if err == nil || !retry {
|
|
return err
|
|
}
|
|
|
|
lastErr = err
|
|
}
|
|
return lastErr
|
|
}
|
|
|
|
func (c *baseClient) assertUnstableCommand(cmd Cmder) bool {
|
|
switch cmd.(type) {
|
|
case *AggregateCmd, *FTInfoCmd, *FTSpellCheckCmd, *FTSearchCmd, *FTSynDumpCmd:
|
|
if c.opt.UnstableResp3 {
|
|
return true
|
|
} else {
|
|
panic("RESP3 responses for this command are disabled because they may still change. Please set the flag UnstableResp3 . See the [README](https://github.com/redis/go-redis/blob/master/README.md) and the release notes for guidance.")
|
|
}
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (c *baseClient) _process(ctx context.Context, cmd Cmder, attempt int) (bool, error) {
|
|
if attempt > 0 {
|
|
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
|
|
return false, err
|
|
}
|
|
}
|
|
|
|
retryTimeout := uint32(0)
|
|
if err := c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
|
|
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
|
return writeCmd(wr, cmd)
|
|
}); err != nil {
|
|
atomic.StoreUint32(&retryTimeout, 1)
|
|
return err
|
|
}
|
|
readReplyFunc := cmd.readReply
|
|
// Apply unstable RESP3 search module.
|
|
if c.opt.Protocol != 2 && c.assertUnstableCommand(cmd) {
|
|
readReplyFunc = cmd.readRawReply
|
|
}
|
|
if err := cn.WithReader(c.context(ctx), c.cmdTimeout(cmd), func(rd *proto.Reader) error {
|
|
// Check for push notifications before reading the command reply
|
|
if c.opt.Protocol == 3 && c.pushProcessor != nil && c.pushProcessor.IsEnabled() {
|
|
if err := c.pushProcessor.ProcessPendingNotifications(ctx, rd); err != nil {
|
|
internal.Logger.Printf(ctx, "push: error processing push notifications: %v", err)
|
|
}
|
|
}
|
|
return readReplyFunc(rd)
|
|
}); err != nil {
|
|
if cmd.readTimeout() == nil {
|
|
atomic.StoreUint32(&retryTimeout, 1)
|
|
} else {
|
|
atomic.StoreUint32(&retryTimeout, 0)
|
|
}
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
retry := shouldRetry(err, atomic.LoadUint32(&retryTimeout) == 1)
|
|
return retry, err
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
func (c *baseClient) retryBackoff(attempt int) time.Duration {
|
|
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
|
|
}
|
|
|
|
func (c *baseClient) cmdTimeout(cmd Cmder) time.Duration {
|
|
if timeout := cmd.readTimeout(); timeout != nil {
|
|
t := *timeout
|
|
if t == 0 {
|
|
return 0
|
|
}
|
|
return t + 10*time.Second
|
|
}
|
|
return c.opt.ReadTimeout
|
|
}
|
|
|
|
// context returns the context for the current connection.
|
|
// If the context timeout is enabled, it returns the original context.
|
|
// Otherwise, it returns a new background context.
|
|
func (c *baseClient) context(ctx context.Context) context.Context {
|
|
if c.opt.ContextTimeoutEnabled {
|
|
return ctx
|
|
}
|
|
return context.Background()
|
|
}
|
|
|
|
// Close closes the client, releasing any open resources.
|
|
//
|
|
// It is rare to Close a Client, as the Client is meant to be
|
|
// long-lived and shared between many goroutines.
|
|
func (c *baseClient) Close() error {
|
|
var firstErr error
|
|
if c.onClose != nil {
|
|
if err := c.onClose(); err != nil {
|
|
firstErr = err
|
|
}
|
|
}
|
|
if err := c.connPool.Close(); err != nil && firstErr == nil {
|
|
firstErr = err
|
|
}
|
|
return firstErr
|
|
}
|
|
|
|
func (c *baseClient) getAddr() string {
|
|
return c.opt.Addr
|
|
}
|
|
|
|
func (c *baseClient) processPipeline(ctx context.Context, cmds []Cmder) error {
|
|
if err := c.generalProcessPipeline(ctx, cmds, c.pipelineProcessCmds); err != nil {
|
|
return err
|
|
}
|
|
return cmdsFirstErr(cmds)
|
|
}
|
|
|
|
func (c *baseClient) processTxPipeline(ctx context.Context, cmds []Cmder) error {
|
|
if err := c.generalProcessPipeline(ctx, cmds, c.txPipelineProcessCmds); err != nil {
|
|
return err
|
|
}
|
|
return cmdsFirstErr(cmds)
|
|
}
|
|
|
|
type pipelineProcessor func(context.Context, *pool.Conn, []Cmder) (bool, error)
|
|
|
|
func (c *baseClient) generalProcessPipeline(
|
|
ctx context.Context, cmds []Cmder, p pipelineProcessor,
|
|
) error {
|
|
var lastErr error
|
|
for attempt := 0; attempt <= c.opt.MaxRetries; attempt++ {
|
|
if attempt > 0 {
|
|
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
|
|
setCmdsErr(cmds, err)
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Enable retries by default to retry dial errors returned by withConn.
|
|
canRetry := true
|
|
lastErr = c.withConn(ctx, func(ctx context.Context, cn *pool.Conn) error {
|
|
var err error
|
|
canRetry, err = p(ctx, cn, cmds)
|
|
return err
|
|
})
|
|
if lastErr == nil || !canRetry || !shouldRetry(lastErr, true) {
|
|
return lastErr
|
|
}
|
|
}
|
|
return lastErr
|
|
}
|
|
|
|
func (c *baseClient) pipelineProcessCmds(
|
|
ctx context.Context, cn *pool.Conn, cmds []Cmder,
|
|
) (bool, error) {
|
|
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
|
return writeCmds(wr, cmds)
|
|
}); err != nil {
|
|
setCmdsErr(cmds, err)
|
|
return true, err
|
|
}
|
|
|
|
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
|
|
return pipelineReadCmds(rd, cmds)
|
|
}); err != nil {
|
|
return true, err
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
func pipelineReadCmds(rd *proto.Reader, cmds []Cmder) error {
|
|
for i, cmd := range cmds {
|
|
err := cmd.readReply(rd)
|
|
cmd.SetErr(err)
|
|
if err != nil && !isRedisError(err) {
|
|
setCmdsErr(cmds[i+1:], err)
|
|
return err
|
|
}
|
|
}
|
|
// Retry errors like "LOADING redis is loading the dataset in memory".
|
|
return cmds[0].Err()
|
|
}
|
|
|
|
func (c *baseClient) txPipelineProcessCmds(
|
|
ctx context.Context, cn *pool.Conn, cmds []Cmder,
|
|
) (bool, error) {
|
|
if err := cn.WithWriter(c.context(ctx), c.opt.WriteTimeout, func(wr *proto.Writer) error {
|
|
return writeCmds(wr, cmds)
|
|
}); err != nil {
|
|
setCmdsErr(cmds, err)
|
|
return true, err
|
|
}
|
|
|
|
if err := cn.WithReader(c.context(ctx), c.opt.ReadTimeout, func(rd *proto.Reader) error {
|
|
statusCmd := cmds[0].(*StatusCmd)
|
|
// Trim multi and exec.
|
|
trimmedCmds := cmds[1 : len(cmds)-1]
|
|
|
|
if err := txPipelineReadQueued(rd, statusCmd, trimmedCmds); err != nil {
|
|
setCmdsErr(cmds, err)
|
|
return err
|
|
}
|
|
|
|
return pipelineReadCmds(rd, trimmedCmds)
|
|
}); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
func txPipelineReadQueued(rd *proto.Reader, statusCmd *StatusCmd, cmds []Cmder) error {
|
|
// Parse +OK.
|
|
if err := statusCmd.readReply(rd); err != nil {
|
|
return err
|
|
}
|
|
|
|
// Parse +QUEUED.
|
|
for range cmds {
|
|
if err := statusCmd.readReply(rd); err != nil && !isRedisError(err) {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Parse number of replies.
|
|
line, err := rd.ReadLine()
|
|
if err != nil {
|
|
if err == Nil {
|
|
err = TxFailedErr
|
|
}
|
|
return err
|
|
}
|
|
|
|
if line[0] != proto.RespArray {
|
|
return fmt.Errorf("redis: expected '*', but got line %q", line)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
// Client is a Redis client representing a pool of zero or more underlying connections.
|
|
// It's safe for concurrent use by multiple goroutines.
|
|
//
|
|
// Client creates and frees connections automatically; it also maintains a free pool
|
|
// of idle connections. You can control the pool size with Config.PoolSize option.
|
|
type Client struct {
|
|
*baseClient
|
|
cmdable
|
|
}
|
|
|
|
// NewClient returns a client to the Redis Server specified by Options.
|
|
func NewClient(opt *Options) *Client {
|
|
if opt == nil {
|
|
panic("redis: NewClient nil options")
|
|
}
|
|
opt.init()
|
|
|
|
// Enable push notifications by default for RESP3
|
|
// Only override if no custom processor is provided
|
|
if opt.Protocol == 3 && opt.PushNotificationProcessor == nil {
|
|
opt.PushNotifications = true
|
|
}
|
|
|
|
c := Client{
|
|
baseClient: &baseClient{
|
|
opt: opt,
|
|
},
|
|
}
|
|
c.init()
|
|
|
|
// Initialize push notification processor
|
|
c.initializePushProcessor()
|
|
|
|
// Update options with the initialized push processor for connection pool
|
|
if c.pushProcessor != nil {
|
|
opt.PushNotificationProcessor = c.pushProcessor
|
|
}
|
|
|
|
c.connPool = newConnPool(opt, c.dialHook)
|
|
|
|
return &c
|
|
}
|
|
|
|
func (c *Client) init() {
|
|
c.cmdable = c.Process
|
|
c.initHooks(hooks{
|
|
dial: c.baseClient.dial,
|
|
process: c.baseClient.process,
|
|
pipeline: c.baseClient.processPipeline,
|
|
txPipeline: c.baseClient.processTxPipeline,
|
|
})
|
|
}
|
|
|
|
func (c *Client) WithTimeout(timeout time.Duration) *Client {
|
|
clone := *c
|
|
clone.baseClient = c.baseClient.withTimeout(timeout)
|
|
clone.init()
|
|
return &clone
|
|
}
|
|
|
|
func (c *Client) Conn() *Conn {
|
|
return newConn(c.opt, pool.NewStickyConnPool(c.connPool), &c.hooksMixin)
|
|
}
|
|
|
|
func (c *Client) Process(ctx context.Context, cmd Cmder) error {
|
|
err := c.processHook(ctx, cmd)
|
|
cmd.SetErr(err)
|
|
return err
|
|
}
|
|
|
|
// Options returns read-only Options that were used to create the client.
|
|
func (c *Client) Options() *Options {
|
|
return c.opt
|
|
}
|
|
|
|
// initializePushProcessor initializes the push notification processor.
|
|
func (c *Client) initializePushProcessor() {
|
|
// Always use custom processor if provided
|
|
if c.opt.PushNotificationProcessor != nil {
|
|
c.pushProcessor = c.opt.PushNotificationProcessor
|
|
} else if c.opt.PushNotifications {
|
|
// Create default processor only if push notifications are enabled
|
|
c.pushProcessor = NewPushNotificationProcessor(true)
|
|
}
|
|
}
|
|
|
|
// RegisterPushNotificationHandler registers a handler for a specific push notification command.
|
|
// Returns an error if a handler is already registered for this command.
|
|
func (c *Client) RegisterPushNotificationHandler(command string, handler PushNotificationHandler) error {
|
|
if c.pushProcessor != nil {
|
|
return c.pushProcessor.RegisterHandler(command, handler)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// RegisterPushNotificationHandlerFunc registers a function as a handler for a specific push notification command.
|
|
// Returns an error if a handler is already registered for this command.
|
|
func (c *Client) RegisterPushNotificationHandlerFunc(command string, handlerFunc func(ctx context.Context, notification []interface{}) bool) error {
|
|
if c.pushProcessor != nil {
|
|
return c.pushProcessor.RegisterHandlerFunc(command, handlerFunc)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetPushNotificationProcessor returns the push notification processor.
|
|
func (c *Client) GetPushNotificationProcessor() *PushNotificationProcessor {
|
|
return c.pushProcessor
|
|
}
|
|
|
|
type PoolStats pool.Stats
|
|
|
|
// PoolStats returns connection pool stats.
|
|
func (c *Client) PoolStats() *PoolStats {
|
|
stats := c.connPool.Stats()
|
|
return (*PoolStats)(stats)
|
|
}
|
|
|
|
func (c *Client) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
|
|
return c.Pipeline().Pipelined(ctx, fn)
|
|
}
|
|
|
|
func (c *Client) Pipeline() Pipeliner {
|
|
pipe := Pipeline{
|
|
exec: pipelineExecer(c.processPipelineHook),
|
|
}
|
|
pipe.init()
|
|
return &pipe
|
|
}
|
|
|
|
func (c *Client) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
|
|
return c.TxPipeline().Pipelined(ctx, fn)
|
|
}
|
|
|
|
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
|
|
func (c *Client) TxPipeline() Pipeliner {
|
|
pipe := Pipeline{
|
|
exec: func(ctx context.Context, cmds []Cmder) error {
|
|
cmds = wrapMultiExec(ctx, cmds)
|
|
return c.processTxPipelineHook(ctx, cmds)
|
|
},
|
|
}
|
|
pipe.init()
|
|
return &pipe
|
|
}
|
|
|
|
func (c *Client) pubSub() *PubSub {
|
|
pubsub := &PubSub{
|
|
opt: c.opt,
|
|
|
|
newConn: func(ctx context.Context, channels []string) (*pool.Conn, error) {
|
|
return c.newConn(ctx)
|
|
},
|
|
closeConn: c.connPool.CloseConn,
|
|
}
|
|
pubsub.init()
|
|
|
|
// Set the push notification processor if available
|
|
if c.pushProcessor != nil {
|
|
pubsub.SetPushNotificationProcessor(c.pushProcessor)
|
|
}
|
|
|
|
return pubsub
|
|
}
|
|
|
|
// Subscribe subscribes the client to the specified channels.
|
|
// Channels can be omitted to create empty subscription.
|
|
// Note that this method does not wait on a response from Redis, so the
|
|
// subscription may not be active immediately. To force the connection to wait,
|
|
// you may call the Receive() method on the returned *PubSub like so:
|
|
//
|
|
// sub := client.Subscribe(queryResp)
|
|
// iface, err := sub.Receive()
|
|
// if err != nil {
|
|
// // handle error
|
|
// }
|
|
//
|
|
// // Should be *Subscription, but others are possible if other actions have been
|
|
// // taken on sub since it was created.
|
|
// switch iface.(type) {
|
|
// case *Subscription:
|
|
// // subscribe succeeded
|
|
// case *Message:
|
|
// // received first message
|
|
// case *Pong:
|
|
// // pong received
|
|
// default:
|
|
// // handle error
|
|
// }
|
|
//
|
|
// ch := sub.Channel()
|
|
func (c *Client) Subscribe(ctx context.Context, channels ...string) *PubSub {
|
|
pubsub := c.pubSub()
|
|
if len(channels) > 0 {
|
|
_ = pubsub.Subscribe(ctx, channels...)
|
|
}
|
|
return pubsub
|
|
}
|
|
|
|
// PSubscribe subscribes the client to the given patterns.
|
|
// Patterns can be omitted to create empty subscription.
|
|
func (c *Client) PSubscribe(ctx context.Context, channels ...string) *PubSub {
|
|
pubsub := c.pubSub()
|
|
if len(channels) > 0 {
|
|
_ = pubsub.PSubscribe(ctx, channels...)
|
|
}
|
|
return pubsub
|
|
}
|
|
|
|
// SSubscribe Subscribes the client to the specified shard channels.
|
|
// Channels can be omitted to create empty subscription.
|
|
func (c *Client) SSubscribe(ctx context.Context, channels ...string) *PubSub {
|
|
pubsub := c.pubSub()
|
|
if len(channels) > 0 {
|
|
_ = pubsub.SSubscribe(ctx, channels...)
|
|
}
|
|
return pubsub
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
// Conn represents a single Redis connection rather than a pool of connections.
|
|
// Prefer running commands from Client unless there is a specific need
|
|
// for a continuous single Redis connection.
|
|
type Conn struct {
|
|
baseClient
|
|
cmdable
|
|
statefulCmdable
|
|
}
|
|
|
|
// newConn is a helper func to create a new Conn instance.
|
|
// the Conn instance is not thread-safe and should not be shared between goroutines.
|
|
// the parentHooks will be cloned, no need to clone before passing it.
|
|
func newConn(opt *Options, connPool pool.Pooler, parentHooks *hooksMixin) *Conn {
|
|
c := Conn{
|
|
baseClient: baseClient{
|
|
opt: opt,
|
|
connPool: connPool,
|
|
},
|
|
}
|
|
|
|
if parentHooks != nil {
|
|
c.hooksMixin = parentHooks.clone()
|
|
}
|
|
|
|
c.cmdable = c.Process
|
|
c.statefulCmdable = c.Process
|
|
c.initHooks(hooks{
|
|
dial: c.baseClient.dial,
|
|
process: c.baseClient.process,
|
|
pipeline: c.baseClient.processPipeline,
|
|
txPipeline: c.baseClient.processTxPipeline,
|
|
})
|
|
|
|
return &c
|
|
}
|
|
|
|
func (c *Conn) Process(ctx context.Context, cmd Cmder) error {
|
|
err := c.processHook(ctx, cmd)
|
|
cmd.SetErr(err)
|
|
return err
|
|
}
|
|
|
|
func (c *Conn) Pipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
|
|
return c.Pipeline().Pipelined(ctx, fn)
|
|
}
|
|
|
|
func (c *Conn) Pipeline() Pipeliner {
|
|
pipe := Pipeline{
|
|
exec: c.processPipelineHook,
|
|
}
|
|
pipe.init()
|
|
return &pipe
|
|
}
|
|
|
|
func (c *Conn) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([]Cmder, error) {
|
|
return c.TxPipeline().Pipelined(ctx, fn)
|
|
}
|
|
|
|
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
|
|
func (c *Conn) TxPipeline() Pipeliner {
|
|
pipe := Pipeline{
|
|
exec: func(ctx context.Context, cmds []Cmder) error {
|
|
cmds = wrapMultiExec(ctx, cmds)
|
|
return c.processTxPipelineHook(ctx, cmds)
|
|
},
|
|
}
|
|
pipe.init()
|
|
return &pipe
|
|
}
|