mirror of
https://github.com/redis/go-redis.git
synced 2025-07-28 06:42:00 +03:00
Store addresses of replicas again.
This commit is contained in:
33
cluster.go
33
cluster.go
@ -12,7 +12,7 @@ type ClusterClient struct {
|
|||||||
commandable
|
commandable
|
||||||
|
|
||||||
addrs []string
|
addrs []string
|
||||||
slots []string
|
slots [][]string
|
||||||
slotsMx sync.RWMutex // protects slots & addrs cache
|
slotsMx sync.RWMutex // protects slots & addrs cache
|
||||||
|
|
||||||
clients map[string]*Client
|
clients map[string]*Client
|
||||||
@ -90,13 +90,12 @@ func (c *ClusterClient) process(cmd Cmder) {
|
|||||||
c.reloadIfDue()
|
c.reloadIfDue()
|
||||||
|
|
||||||
slot := hashSlot(cmd.clusterKey())
|
slot := hashSlot(cmd.clusterKey())
|
||||||
|
|
||||||
c.slotsMx.RLock()
|
c.slotsMx.RLock()
|
||||||
masterAddr := c.slots[slot]
|
addrs := c.slots[slot]
|
||||||
c.slotsMx.RUnlock()
|
c.slotsMx.RUnlock()
|
||||||
|
|
||||||
if masterAddr != "" {
|
if len(addrs) > 0 {
|
||||||
client = c.getClient(masterAddr)
|
client = c.getClient(addrs[0]) // First address is master.
|
||||||
} else {
|
} else {
|
||||||
var err error
|
var err error
|
||||||
client, err = c.randomClient()
|
client, err = c.randomClient()
|
||||||
@ -170,20 +169,24 @@ func (c *ClusterClient) resetClients() (err error) {
|
|||||||
func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) {
|
func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) {
|
||||||
c.slotsMx.Lock()
|
c.slotsMx.Lock()
|
||||||
|
|
||||||
c.addrs = c.addrs[:0]
|
c.slots = make([][]string, hashSlots)
|
||||||
c.slots = make([]string, hashSlots)
|
|
||||||
c.resetClients()
|
c.resetClients()
|
||||||
|
|
||||||
seen := make(map[string]struct{})
|
seen := make(map[string]struct{})
|
||||||
for _, info := range slots {
|
for _, addr := range c.addrs {
|
||||||
masterAddr := info.Addrs[0]
|
seen[addr] = struct{}{}
|
||||||
for slot := info.Start; slot <= info.End; slot++ {
|
|
||||||
c.slots[slot] = masterAddr
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, ok := seen[masterAddr]; !ok {
|
for _, info := range slots {
|
||||||
c.addrs = append(c.addrs, masterAddr)
|
for slot := info.Start; slot <= info.End; slot++ {
|
||||||
seen[masterAddr] = struct{}{}
|
c.slots[slot] = info.Addrs
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, addr := range info.Addrs {
|
||||||
|
if _, ok := seen[addr]; !ok {
|
||||||
|
c.addrs = append(c.addrs, addr)
|
||||||
|
seen[addr] = struct{}{}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -218,7 +221,7 @@ func (c *ClusterClient) scheduleReload() {
|
|||||||
// reaper closes idle connections to the cluster.
|
// reaper closes idle connections to the cluster.
|
||||||
func (c *ClusterClient) reaper(ticker *time.Ticker) {
|
func (c *ClusterClient) reaper(ticker *time.Ticker) {
|
||||||
for _ = range ticker.C {
|
for _ = range ticker.C {
|
||||||
for _, client := range c.conns {
|
for _, client := range c.clients {
|
||||||
pool := client.connPool
|
pool := client.connPool
|
||||||
// pool.First removes idle connections from the pool for us. So
|
// pool.First removes idle connections from the pool for us. So
|
||||||
// just put returned connection back.
|
// just put returned connection back.
|
||||||
|
Reference in New Issue
Block a user