1
0
mirror of https://github.com/redis/go-redis.git synced 2025-08-08 23:42:06 +03:00

cluster: optimize newClusterState

This commit is contained in:
Vladimir Mihailenco
2018-07-22 10:50:26 +03:00
parent 493945402e
commit 2559f32464
6 changed files with 109 additions and 61 deletions

View File

@@ -8,7 +8,7 @@ import (
"math"
"math/rand"
"net"
"strings"
"sort"
"sync"
"sync/atomic"
"time"
@@ -387,12 +387,31 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
//------------------------------------------------------------------------------
type clusterSlot struct {
start, end int
nodes []*clusterNode
}
type clusterSlotSlice []*clusterSlot
func (p clusterSlotSlice) Len() int {
return len(p)
}
func (p clusterSlotSlice) Less(i, j int) bool {
return p[i].start < p[j].start
}
func (p clusterSlotSlice) Swap(i, j int) {
p[i], p[j] = p[j], p[i]
}
type clusterState struct {
nodes *clusterNodes
Masters []*clusterNode
Slaves []*clusterNode
slots [][]*clusterNode
slots []*clusterSlot
generation uint32
createdAt time.Time
@@ -404,7 +423,7 @@ func newClusterState(
c := clusterState{
nodes: nodes,
slots: make([][]*clusterNode, hashtag.SlotNumber),
slots: make([]*clusterSlot, 0, len(slots)),
generation: nodes.NextGeneration(),
createdAt: time.Now(),
@@ -434,11 +453,15 @@ func newClusterState(
}
}
for i := slot.Start; i <= slot.End; i++ {
c.slots[i] = nodes
}
c.slots = append(c.slots, &clusterSlot{
start: slot.Start,
end: slot.End,
nodes: nodes,
})
}
sort.Sort(clusterSlotSlice(c.slots))
time.AfterFunc(time.Minute, func() {
nodes.GC(c.generation)
})
@@ -506,8 +529,15 @@ func (c *clusterState) slotRandomNode(slot int) *clusterNode {
}
func (c *clusterState) slotNodes(slot int) []*clusterNode {
if slot >= 0 && slot < len(c.slots) {
return c.slots[slot]
i := sort.Search(len(c.slots), func(i int) bool {
return c.slots[i].end >= slot
})
if i >= len(c.slots) {
return nil
}
x := c.slots[i]
if slot >= x.start && slot <= x.end {
return x.nodes
}
return nil
}
@@ -516,26 +546,7 @@ func (c *clusterState) IsConsistent() bool {
if c.nodes.opt.ClusterSlots != nil {
return true
}
if len(c.Masters) > len(c.Slaves) {
return false
}
for _, master := range c.Masters {
s := master.Client.Info("replication").Val()
if !strings.Contains(s, "role:master") {
return false
}
}
for _, slave := range c.Slaves {
s := slave.Client.Info("replication").Val()
if !strings.Contains(s, "role:slave") {
return false
}
}
return true
return len(c.Masters) <= len(c.Slaves)
}
//------------------------------------------------------------------------------
@@ -563,7 +574,7 @@ func (c *clusterStateHolder) Reload() (*clusterState, error) {
return nil, err
}
if !state.IsConsistent() {
c.LazyReload()
time.AfterFunc(time.Second, c.LazyReload)
}
return state, nil
}
@@ -843,6 +854,7 @@ func (c *ClusterClient) Watch(fn func(*Tx) error, keys ...string) error {
}
if internal.IsRetryableError(err, true) {
c.state.LazyReload()
continue
}
@@ -929,12 +941,14 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
}
if internal.IsRetryableError(err, true) {
// Firstly retry the same node.
c.state.LazyReload()
// First retry the same node.
if attempt == 0 {
continue
}
// Secondly try random node.
// Second try random node.
node, err = c.nodes.Random()
if err != nil {
break