diff --git a/cluster.go b/cluster.go index b68c60f1..fdf0f840 100644 --- a/cluster.go +++ b/cluster.go @@ -12,7 +12,7 @@ type ClusterClient struct { commandable addrs []string - slots []string + slots [][]string slotsMx sync.RWMutex // protects slots & addrs cache clients map[string]*Client @@ -90,13 +90,12 @@ func (c *ClusterClient) process(cmd Cmder) { c.reloadIfDue() slot := hashSlot(cmd.clusterKey()) - c.slotsMx.RLock() - masterAddr := c.slots[slot] + addrs := c.slots[slot] c.slotsMx.RUnlock() - if masterAddr != "" { - client = c.getClient(masterAddr) + if len(addrs) > 0 { + client = c.getClient(addrs[0]) // First address is master. } else { var err error client, err = c.randomClient() @@ -170,20 +169,24 @@ func (c *ClusterClient) resetClients() (err error) { func (c *ClusterClient) setSlots(slots []ClusterSlotInfo) { c.slotsMx.Lock() - c.addrs = c.addrs[:0] - c.slots = make([]string, hashSlots) + c.slots = make([][]string, hashSlots) c.resetClients() seen := make(map[string]struct{}) + for _, addr := range c.addrs { + seen[addr] = struct{}{} + } + for _, info := range slots { - masterAddr := info.Addrs[0] for slot := info.Start; slot <= info.End; slot++ { - c.slots[slot] = masterAddr + c.slots[slot] = info.Addrs } - if _, ok := seen[masterAddr]; !ok { - c.addrs = append(c.addrs, masterAddr) - seen[masterAddr] = struct{}{} + for _, addr := range info.Addrs { + if _, ok := seen[addr]; !ok { + c.addrs = append(c.addrs, addr) + seen[addr] = struct{}{} + } } } @@ -218,7 +221,7 @@ func (c *ClusterClient) scheduleReload() { // reaper closes idle connections to the cluster. func (c *ClusterClient) reaper(ticker *time.Ticker) { for _ = range ticker.C { - for _, client := range c.conns { + for _, client := range c.clients { pool := client.connPool // pool.First removes idle connections from the pool for us. So // just put returned connection back.