mirror of
https://github.com/redis/go-redis.git
synced 2025-07-28 06:42:00 +03:00
Merge branch 'master' into add-cluster-parse-urls
This commit is contained in:
149
cluster.go
149
cluster.go
@ -14,11 +14,11 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/go-redis/redis/v8/internal"
|
||||
"github.com/go-redis/redis/v8/internal/hashtag"
|
||||
"github.com/go-redis/redis/v8/internal/pool"
|
||||
"github.com/go-redis/redis/v8/internal/proto"
|
||||
"github.com/go-redis/redis/v8/internal/rand"
|
||||
"github.com/go-redis/redis/v9/internal"
|
||||
"github.com/go-redis/redis/v9/internal/hashtag"
|
||||
"github.com/go-redis/redis/v9/internal/pool"
|
||||
"github.com/go-redis/redis/v9/internal/proto"
|
||||
"github.com/go-redis/redis/v9/internal/rand"
|
||||
)
|
||||
|
||||
var errClusterNoNodes = fmt.Errorf("redis: cluster has no nodes")
|
||||
@ -74,12 +74,12 @@ type ClusterOptions struct {
|
||||
PoolFIFO bool
|
||||
|
||||
// PoolSize applies per cluster node and not for the whole cluster.
|
||||
PoolSize int
|
||||
MinIdleConns int
|
||||
MaxConnAge time.Duration
|
||||
PoolTimeout time.Duration
|
||||
IdleTimeout time.Duration
|
||||
IdleCheckFrequency time.Duration
|
||||
PoolSize int
|
||||
PoolTimeout time.Duration
|
||||
MinIdleConns int
|
||||
MaxIdleConns int
|
||||
ConnMaxIdleTime time.Duration
|
||||
ConnMaxLifetime time.Duration
|
||||
|
||||
TLSConfig *tls.Config
|
||||
}
|
||||
@ -252,8 +252,6 @@ func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, er
|
||||
}
|
||||
|
||||
func (opt *ClusterOptions) clientOptions() *Options {
|
||||
const disableIdleCheck = -1
|
||||
|
||||
return &Options{
|
||||
Dialer: opt.Dialer,
|
||||
OnConnect: opt.OnConnect,
|
||||
@ -269,13 +267,13 @@ func (opt *ClusterOptions) clientOptions() *Options {
|
||||
ReadTimeout: opt.ReadTimeout,
|
||||
WriteTimeout: opt.WriteTimeout,
|
||||
|
||||
PoolFIFO: opt.PoolFIFO,
|
||||
PoolSize: opt.PoolSize,
|
||||
MinIdleConns: opt.MinIdleConns,
|
||||
MaxConnAge: opt.MaxConnAge,
|
||||
PoolTimeout: opt.PoolTimeout,
|
||||
IdleTimeout: opt.IdleTimeout,
|
||||
IdleCheckFrequency: disableIdleCheck,
|
||||
PoolFIFO: opt.PoolFIFO,
|
||||
PoolSize: opt.PoolSize,
|
||||
PoolTimeout: opt.PoolTimeout,
|
||||
MinIdleConns: opt.MinIdleConns,
|
||||
MaxIdleConns: opt.MaxIdleConns,
|
||||
ConnMaxIdleTime: opt.ConnMaxIdleTime,
|
||||
ConnMaxLifetime: opt.ConnMaxLifetime,
|
||||
|
||||
TLSConfig: opt.TLSConfig,
|
||||
// If ClusterSlots is populated, then we probably have an artificial
|
||||
@ -324,15 +322,26 @@ func (n *clusterNode) updateLatency() {
|
||||
const numProbe = 10
|
||||
var dur uint64
|
||||
|
||||
successes := 0
|
||||
for i := 0; i < numProbe; i++ {
|
||||
time.Sleep(time.Duration(10+rand.Intn(10)) * time.Millisecond)
|
||||
|
||||
start := time.Now()
|
||||
n.Client.Ping(context.TODO())
|
||||
dur += uint64(time.Since(start) / time.Microsecond)
|
||||
err := n.Client.Ping(context.TODO()).Err()
|
||||
if err == nil {
|
||||
dur += uint64(time.Since(start) / time.Microsecond)
|
||||
successes++
|
||||
}
|
||||
}
|
||||
|
||||
latency := float64(dur) / float64(numProbe)
|
||||
var latency float64
|
||||
if successes == 0 {
|
||||
// If none of the pings worked, set latency to some arbitrarily high value so this node gets
|
||||
// least priority.
|
||||
latency = float64((1 * time.Minute) / time.Microsecond)
|
||||
} else {
|
||||
latency = float64(dur) / float64(successes)
|
||||
}
|
||||
atomic.StoreUint32(&n.latency, uint32(latency+0.5))
|
||||
}
|
||||
|
||||
@ -817,7 +826,6 @@ type ClusterClient struct {
|
||||
*clusterClient
|
||||
cmdable
|
||||
hooks
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// NewClusterClient returns a Redis Cluster client as described in
|
||||
@ -830,34 +838,14 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
|
||||
opt: opt,
|
||||
nodes: newClusterNodes(opt),
|
||||
},
|
||||
ctx: context.Background(),
|
||||
}
|
||||
c.state = newClusterStateHolder(c.loadState)
|
||||
c.cmdsInfoCache = newCmdsInfoCache(c.cmdsInfo)
|
||||
c.cmdable = c.Process
|
||||
|
||||
if opt.IdleCheckFrequency > 0 {
|
||||
go c.reaper(opt.IdleCheckFrequency)
|
||||
}
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *ClusterClient) Context() context.Context {
|
||||
return c.ctx
|
||||
}
|
||||
|
||||
func (c *ClusterClient) WithContext(ctx context.Context) *ClusterClient {
|
||||
if ctx == nil {
|
||||
panic("nil context")
|
||||
}
|
||||
clone := *c
|
||||
clone.cmdable = clone.Process
|
||||
clone.hooks.lock()
|
||||
clone.ctx = ctx
|
||||
return &clone
|
||||
}
|
||||
|
||||
// Options returns read-only Options that were used to create the client.
|
||||
func (c *ClusterClient) Options() *ClusterOptions {
|
||||
return c.opt
|
||||
@ -889,8 +877,8 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
|
||||
}
|
||||
|
||||
func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
|
||||
cmdInfo := c.cmdInfo(cmd.Name())
|
||||
slot := c.cmdSlot(cmd)
|
||||
cmdInfo := c.cmdInfo(ctx, cmd.Name())
|
||||
slot := c.cmdSlot(ctx, cmd)
|
||||
|
||||
var node *clusterNode
|
||||
var ask bool
|
||||
@ -915,7 +903,6 @@ func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
|
||||
_ = pipe.Process(ctx, NewCmd(ctx, "asking"))
|
||||
_ = pipe.Process(ctx, cmd)
|
||||
_, lastErr = pipe.Exec(ctx)
|
||||
_ = pipe.Close()
|
||||
ask = false
|
||||
} else {
|
||||
lastErr = node.Client.Process(ctx, cmd)
|
||||
@ -1176,29 +1163,8 @@ func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
|
||||
return nil, firstErr
|
||||
}
|
||||
|
||||
// reaper closes idle connections to the cluster.
|
||||
func (c *ClusterClient) reaper(idleCheckFrequency time.Duration) {
|
||||
ticker := time.NewTicker(idleCheckFrequency)
|
||||
defer ticker.Stop()
|
||||
|
||||
for range ticker.C {
|
||||
nodes, err := c.nodes.All()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
|
||||
for _, node := range nodes {
|
||||
_, err := node.Client.connPool.(*pool.ConnPool).ReapStaleConns()
|
||||
if err != nil {
|
||||
internal.Logger.Printf(c.Context(), "ReapStaleConns failed: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ClusterClient) Pipeline() Pipeliner {
|
||||
pipe := Pipeline{
|
||||
ctx: c.ctx,
|
||||
exec: c.processPipeline,
|
||||
}
|
||||
pipe.init()
|
||||
@ -1267,9 +1233,9 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
|
||||
return err
|
||||
}
|
||||
|
||||
if c.opt.ReadOnly && c.cmdsAreReadOnly(cmds) {
|
||||
if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) {
|
||||
for _, cmd := range cmds {
|
||||
slot := c.cmdSlot(cmd)
|
||||
slot := c.cmdSlot(ctx, cmd)
|
||||
node, err := c.slotReadOnlyNode(state, slot)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1280,7 +1246,7 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
|
||||
}
|
||||
|
||||
for _, cmd := range cmds {
|
||||
slot := c.cmdSlot(cmd)
|
||||
slot := c.cmdSlot(ctx, cmd)
|
||||
node, err := state.slotMasterNode(slot)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1290,9 +1256,9 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ClusterClient) cmdsAreReadOnly(cmds []Cmder) bool {
|
||||
func (c *ClusterClient) cmdsAreReadOnly(ctx context.Context, cmds []Cmder) bool {
|
||||
for _, cmd := range cmds {
|
||||
cmdInfo := c.cmdInfo(cmd.Name())
|
||||
cmdInfo := c.cmdInfo(ctx, cmd.Name())
|
||||
if cmdInfo == nil || !cmdInfo.ReadOnly {
|
||||
return false
|
||||
}
|
||||
@ -1338,7 +1304,7 @@ func (c *ClusterClient) pipelineReadCmds(
|
||||
continue
|
||||
}
|
||||
|
||||
if c.opt.ReadOnly && isLoadingError(err) {
|
||||
if c.opt.ReadOnly && (isLoadingError(err) || !isRedisError(err)) {
|
||||
node.MarkAsFailing()
|
||||
return err
|
||||
}
|
||||
@ -1380,7 +1346,6 @@ func (c *ClusterClient) checkMovedErr(
|
||||
// TxPipeline acts like Pipeline, but wraps queued commands with MULTI/EXEC.
|
||||
func (c *ClusterClient) TxPipeline() Pipeliner {
|
||||
pipe := Pipeline{
|
||||
ctx: c.ctx,
|
||||
exec: c.processTxPipeline,
|
||||
}
|
||||
pipe.init()
|
||||
@ -1405,7 +1370,7 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
|
||||
return err
|
||||
}
|
||||
|
||||
cmdsMap := c.mapCmdsBySlot(cmds)
|
||||
cmdsMap := c.mapCmdsBySlot(ctx, cmds)
|
||||
for slot, cmds := range cmdsMap {
|
||||
node, err := state.slotMasterNode(slot)
|
||||
if err != nil {
|
||||
@ -1456,10 +1421,10 @@ func (c *ClusterClient) _processTxPipeline(ctx context.Context, cmds []Cmder) er
|
||||
return cmdsFirstErr(cmds)
|
||||
}
|
||||
|
||||
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
|
||||
func (c *ClusterClient) mapCmdsBySlot(ctx context.Context, cmds []Cmder) map[int][]Cmder {
|
||||
cmdsMap := make(map[int][]Cmder)
|
||||
for _, cmd := range cmds {
|
||||
slot := c.cmdSlot(cmd)
|
||||
slot := c.cmdSlot(ctx, cmd)
|
||||
cmdsMap[slot] = append(cmdsMap[slot], cmd)
|
||||
}
|
||||
return cmdsMap
|
||||
@ -1526,12 +1491,7 @@ func (c *ClusterClient) txPipelineReadQueued(
|
||||
return err
|
||||
}
|
||||
|
||||
switch line[0] {
|
||||
case proto.ErrorReply:
|
||||
return proto.ParseErrorReply(line)
|
||||
case proto.ArrayReply:
|
||||
// ok
|
||||
default:
|
||||
if line[0] != proto.RespArray {
|
||||
return fmt.Errorf("redis: expected '*', but got line %q", line)
|
||||
}
|
||||
|
||||
@ -1688,6 +1648,16 @@ func (c *ClusterClient) PSubscribe(ctx context.Context, channels ...string) *Pub
|
||||
return pubsub
|
||||
}
|
||||
|
||||
// SSubscribe Subscribes the client to the specified shard channels.
|
||||
func (c *ClusterClient) SSubscribe(ctx context.Context, channels ...string) *PubSub {
|
||||
pubsub := c.pubSub()
|
||||
if len(channels) > 0 {
|
||||
_ = pubsub.SSubscribe(ctx, channels...)
|
||||
}
|
||||
return pubsub
|
||||
}
|
||||
|
||||
|
||||
func (c *ClusterClient) retryBackoff(attempt int) time.Duration {
|
||||
return internal.RetryBackoff(attempt, c.opt.MinRetryBackoff, c.opt.MaxRetryBackoff)
|
||||
}
|
||||
@ -1734,26 +1704,27 @@ func (c *ClusterClient) cmdsInfo(ctx context.Context) (map[string]*CommandInfo,
|
||||
return nil, firstErr
|
||||
}
|
||||
|
||||
func (c *ClusterClient) cmdInfo(name string) *CommandInfo {
|
||||
cmdsInfo, err := c.cmdsInfoCache.Get(c.ctx)
|
||||
func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo {
|
||||
cmdsInfo, err := c.cmdsInfoCache.Get(ctx)
|
||||
if err != nil {
|
||||
internal.Logger.Printf(context.TODO(), "getting command info: %s", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
info := cmdsInfo[name]
|
||||
if info == nil {
|
||||
internal.Logger.Printf(c.Context(), "info for cmd=%s not found", name)
|
||||
internal.Logger.Printf(context.TODO(), "info for cmd=%s not found", name)
|
||||
}
|
||||
return info
|
||||
}
|
||||
|
||||
func (c *ClusterClient) cmdSlot(cmd Cmder) int {
|
||||
func (c *ClusterClient) cmdSlot(ctx context.Context, cmd Cmder) int {
|
||||
args := cmd.Args()
|
||||
if args[0] == "cluster" && args[1] == "getkeysinslot" {
|
||||
return args[2].(int)
|
||||
}
|
||||
|
||||
cmdInfo := c.cmdInfo(cmd.Name())
|
||||
cmdInfo := c.cmdInfo(ctx, cmd.Name())
|
||||
return cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user