mirror of
https://github.com/redis/go-redis.git
synced 2025-07-31 05:04:23 +03:00
Add latency based routing to Redis Cluster client.
This commit is contained in:
committed by
Vladimir Mihailenco
parent
3972f28066
commit
487feebef1
60
ring.go
60
ring.go
@ -115,10 +115,13 @@ type Ring struct {
|
||||
opt *RingOptions
|
||||
nreplicas int
|
||||
|
||||
mx sync.RWMutex
|
||||
mu sync.RWMutex
|
||||
hash *consistenthash.Map
|
||||
shards map[string]*ringShard
|
||||
|
||||
cmdsInfo map[string]*CommandInfo
|
||||
cmdsInfoOnce *sync.Once
|
||||
|
||||
closed bool
|
||||
}
|
||||
|
||||
@ -130,6 +133,8 @@ func NewRing(opt *RingOptions) *Ring {
|
||||
|
||||
hash: consistenthash.New(nreplicas, nil),
|
||||
shards: make(map[string]*ringShard),
|
||||
|
||||
cmdsInfoOnce: new(sync.Once),
|
||||
}
|
||||
ring.commandable.process = ring.process
|
||||
for name, addr := range opt.Addrs {
|
||||
@ -141,15 +146,40 @@ func NewRing(opt *RingOptions) *Ring {
|
||||
return ring
|
||||
}
|
||||
|
||||
func (ring *Ring) cmdInfo(name string) *CommandInfo {
|
||||
ring.cmdsInfoOnce.Do(func() {
|
||||
for _, shard := range ring.shards {
|
||||
cmdsInfo, err := shard.Client.Command().Result()
|
||||
if err == nil {
|
||||
ring.cmdsInfo = cmdsInfo
|
||||
return
|
||||
}
|
||||
}
|
||||
ring.cmdsInfoOnce = &sync.Once{}
|
||||
})
|
||||
if ring.cmdsInfo == nil {
|
||||
return nil
|
||||
}
|
||||
return ring.cmdsInfo[name]
|
||||
}
|
||||
|
||||
func (ring *Ring) cmdFirstKey(cmd Cmder) string {
|
||||
cmdInfo := ring.cmdInfo(cmd.arg(0))
|
||||
if cmdInfo == nil {
|
||||
return ""
|
||||
}
|
||||
return cmd.arg(int(cmdInfo.FirstKeyPos))
|
||||
}
|
||||
|
||||
func (ring *Ring) addClient(name string, cl *Client) {
|
||||
ring.mx.Lock()
|
||||
ring.mu.Lock()
|
||||
ring.hash.Add(name)
|
||||
ring.shards[name] = &ringShard{Client: cl}
|
||||
ring.mx.Unlock()
|
||||
ring.mu.Unlock()
|
||||
}
|
||||
|
||||
func (ring *Ring) getClient(key string) (*Client, error) {
|
||||
ring.mx.RLock()
|
||||
ring.mu.RLock()
|
||||
|
||||
if ring.closed {
|
||||
return nil, pool.ErrClosed
|
||||
@ -157,17 +187,17 @@ func (ring *Ring) getClient(key string) (*Client, error) {
|
||||
|
||||
name := ring.hash.Get(hashtag.Key(key))
|
||||
if name == "" {
|
||||
ring.mx.RUnlock()
|
||||
ring.mu.RUnlock()
|
||||
return nil, errRingShardsDown
|
||||
}
|
||||
|
||||
cl := ring.shards[name].Client
|
||||
ring.mx.RUnlock()
|
||||
ring.mu.RUnlock()
|
||||
return cl, nil
|
||||
}
|
||||
|
||||
func (ring *Ring) process(cmd Cmder) {
|
||||
cl, err := ring.getClient(cmd.clusterKey())
|
||||
cl, err := ring.getClient(ring.cmdFirstKey(cmd))
|
||||
if err != nil {
|
||||
cmd.setErr(err)
|
||||
return
|
||||
@ -177,8 +207,8 @@ func (ring *Ring) process(cmd Cmder) {
|
||||
|
||||
// rebalance removes dead shards from the ring.
|
||||
func (ring *Ring) rebalance() {
|
||||
defer ring.mx.Unlock()
|
||||
ring.mx.Lock()
|
||||
defer ring.mu.Unlock()
|
||||
ring.mu.Lock()
|
||||
|
||||
ring.hash = consistenthash.New(ring.nreplicas, nil)
|
||||
for name, shard := range ring.shards {
|
||||
@ -195,10 +225,10 @@ func (ring *Ring) heartbeat() {
|
||||
for _ = range ticker.C {
|
||||
var rebalance bool
|
||||
|
||||
ring.mx.RLock()
|
||||
ring.mu.RLock()
|
||||
|
||||
if ring.closed {
|
||||
ring.mx.RUnlock()
|
||||
ring.mu.RUnlock()
|
||||
break
|
||||
}
|
||||
|
||||
@ -210,7 +240,7 @@ func (ring *Ring) heartbeat() {
|
||||
}
|
||||
}
|
||||
|
||||
ring.mx.RUnlock()
|
||||
ring.mu.RUnlock()
|
||||
|
||||
if rebalance {
|
||||
ring.rebalance()
|
||||
@ -223,8 +253,8 @@ func (ring *Ring) heartbeat() {
|
||||
// It is rare to Close a Ring, as the Ring is meant to be long-lived
|
||||
// and shared between many goroutines.
|
||||
func (ring *Ring) Close() (retErr error) {
|
||||
defer ring.mx.Unlock()
|
||||
ring.mx.Lock()
|
||||
defer ring.mu.Unlock()
|
||||
ring.mu.Lock()
|
||||
|
||||
if ring.closed {
|
||||
return nil
|
||||
@ -259,7 +289,7 @@ func (ring *Ring) pipelineExec(cmds []Cmder) error {
|
||||
|
||||
cmdsMap := make(map[string][]Cmder)
|
||||
for _, cmd := range cmds {
|
||||
name := ring.hash.Get(hashtag.Key(cmd.clusterKey()))
|
||||
name := ring.hash.Get(hashtag.Key(ring.cmdFirstKey(cmd)))
|
||||
if name == "" {
|
||||
cmd.setErr(errRingShardsDown)
|
||||
if retErr == nil {
|
||||
|
Reference in New Issue
Block a user