mirror of
https://github.com/redis/go-redis.git
synced 2025-07-31 05:04:23 +03:00
Select random node when there are no keys.
This commit is contained in:
101
ring.go
101
ring.go
@ -3,6 +3,8 @@ package redis
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@ -40,9 +42,6 @@ type RingOptions struct {
|
||||
PoolTimeout time.Duration
|
||||
IdleTimeout time.Duration
|
||||
IdleCheckFrequency time.Duration
|
||||
|
||||
// RouteByEvalKeys flag to enable eval and evalsha key position parsing for sharding
|
||||
RouteByEvalKeys bool
|
||||
}
|
||||
|
||||
func (opt *RingOptions) init() {
|
||||
@ -131,12 +130,10 @@ type Ring struct {
|
||||
hash *consistenthash.Map
|
||||
shards map[string]*ringShard
|
||||
|
||||
cmdsInfo map[string]*CommandInfo
|
||||
cmdsInfoOnce *sync.Once
|
||||
cmdsInfo map[string]*CommandInfo
|
||||
|
||||
closed bool
|
||||
|
||||
routeByEvalKeys bool
|
||||
}
|
||||
|
||||
var _ Cmdable = (*Ring)(nil)
|
||||
@ -159,7 +156,6 @@ func NewRing(opt *RingOptions) *Ring {
|
||||
clopt.Addr = addr
|
||||
ring.addClient(name, NewClient(clopt))
|
||||
}
|
||||
ring.routeByEvalKeys = opt.RouteByEvalKeys
|
||||
go ring.heartbeat()
|
||||
return ring
|
||||
}
|
||||
@ -227,30 +223,6 @@ func (c *Ring) cmdInfo(name string) *CommandInfo {
|
||||
return c.cmdsInfo[name]
|
||||
}
|
||||
|
||||
func (c *Ring) getEvalFirstKey(cmd Cmder) string {
|
||||
if c.routeByEvalKeys && cmd.arg(2) != "0" {
|
||||
return cmd.arg(3)
|
||||
} else {
|
||||
return cmd.arg(0)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Ring) cmdFirstKey(cmd Cmder) string {
|
||||
switch cmd.arg(0) {
|
||||
case "eval":
|
||||
return c.getEvalFirstKey(cmd)
|
||||
case "evalsha":
|
||||
return c.getEvalFirstKey(cmd)
|
||||
}
|
||||
|
||||
cmdInfo := c.cmdInfo(cmd.arg(0))
|
||||
if cmdInfo == nil {
|
||||
internal.Logf("info for cmd=%s not found", cmd.arg(0))
|
||||
return ""
|
||||
}
|
||||
return cmd.arg(int(cmdInfo.FirstKeyPos))
|
||||
}
|
||||
|
||||
func (c *Ring) addClient(name string, cl *Client) {
|
||||
c.mu.Lock()
|
||||
c.hash.Add(name)
|
||||
@ -258,14 +230,17 @@ func (c *Ring) addClient(name string, cl *Client) {
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *Ring) getClient(key string) (*Client, error) {
|
||||
func (c *Ring) shardByKey(key string) (*Client, error) {
|
||||
key = hashtag.Key(key)
|
||||
|
||||
c.mu.RLock()
|
||||
|
||||
if c.closed {
|
||||
c.mu.RUnlock()
|
||||
return nil, pool.ErrClosed
|
||||
}
|
||||
|
||||
name := c.hash.Get(hashtag.Key(key))
|
||||
name := c.hash.Get(key)
|
||||
if name == "" {
|
||||
c.mu.RUnlock()
|
||||
return nil, errRingShardsDown
|
||||
@ -276,8 +251,32 @@ func (c *Ring) getClient(key string) (*Client, error) {
|
||||
return cl, nil
|
||||
}
|
||||
|
||||
func (c *Ring) randomShard() (*Client, error) {
|
||||
return c.shardByKey(strconv.Itoa(rand.Int()))
|
||||
}
|
||||
|
||||
func (c *Ring) shardByName(name string) (*Client, error) {
|
||||
if name == "" {
|
||||
return c.randomShard()
|
||||
}
|
||||
|
||||
c.mu.RLock()
|
||||
cl := c.shards[name].Client
|
||||
c.mu.RUnlock()
|
||||
return cl, nil
|
||||
}
|
||||
|
||||
func (c *Ring) cmdShard(cmd Cmder) (*Client, error) {
|
||||
cmdInfo := c.cmdInfo(cmd.arg(0))
|
||||
firstKey := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
|
||||
if firstKey == "" {
|
||||
return c.randomShard()
|
||||
}
|
||||
return c.shardByKey(firstKey)
|
||||
}
|
||||
|
||||
func (c *Ring) Process(cmd Cmder) error {
|
||||
cl, err := c.getClient(c.cmdFirstKey(cmd))
|
||||
cl, err := c.cmdShard(cmd)
|
||||
if err != nil {
|
||||
cmd.setErr(err)
|
||||
return err
|
||||
@ -285,17 +284,18 @@ func (c *Ring) Process(cmd Cmder) error {
|
||||
return cl.baseClient.Process(cmd)
|
||||
}
|
||||
|
||||
// rebalance removes dead shards from the c.
|
||||
// rebalance removes dead shards from the Ring.
|
||||
func (c *Ring) rebalance() {
|
||||
defer c.mu.Unlock()
|
||||
c.mu.Lock()
|
||||
|
||||
c.hash = consistenthash.New(c.nreplicas, nil)
|
||||
hash := consistenthash.New(c.nreplicas, nil)
|
||||
for name, shard := range c.shards {
|
||||
if shard.IsUp() {
|
||||
c.hash.Add(name)
|
||||
hash.Add(name)
|
||||
}
|
||||
}
|
||||
|
||||
c.mu.Lock()
|
||||
c.hash = hash
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// heartbeat monitors state of each shard in the ring.
|
||||
@ -370,13 +370,10 @@ func (c *Ring) pipelineExec(cmds []Cmder) error {
|
||||
|
||||
cmdsMap := make(map[string][]Cmder)
|
||||
for _, cmd := range cmds {
|
||||
name := c.hash.Get(hashtag.Key(c.cmdFirstKey(cmd)))
|
||||
if name == "" {
|
||||
cmd.setErr(errRingShardsDown)
|
||||
if retErr == nil {
|
||||
retErr = errRingShardsDown
|
||||
}
|
||||
continue
|
||||
cmdInfo := c.cmdInfo(cmd.arg(0))
|
||||
name := cmd.arg(cmdFirstKeyPos(cmd, cmdInfo))
|
||||
if name != "" {
|
||||
name = c.hash.Get(hashtag.Key(name))
|
||||
}
|
||||
cmdsMap[name] = append(cmdsMap[name], cmd)
|
||||
}
|
||||
@ -385,7 +382,15 @@ func (c *Ring) pipelineExec(cmds []Cmder) error {
|
||||
failedCmdsMap := make(map[string][]Cmder)
|
||||
|
||||
for name, cmds := range cmdsMap {
|
||||
client := c.shards[name].Client
|
||||
client, err := c.shardByName(name)
|
||||
if err != nil {
|
||||
setCmdsErr(cmds, err)
|
||||
if retErr == nil {
|
||||
retErr = err
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
cn, _, err := client.conn()
|
||||
if err != nil {
|
||||
setCmdsErr(cmds, err)
|
||||
|
Reference in New Issue
Block a user