1
0
mirror of https://github.com/redis/go-redis.git synced 2025-08-07 12:42:55 +03:00

cluster: add manual setup

This commit is contained in:
Vladimir Mihailenco
2018-06-29 10:45:05 +03:00
parent d409b91714
commit 1f59be5cc0
4 changed files with 146 additions and 25 deletions

View File

@@ -30,7 +30,7 @@ type ClusterOptions struct {
// The maximum number of retries before giving up. Command is retried
// on network errors and MOVED/ASK redirects.
// Default is 8.
// Default is 8 retries.
MaxRedirects int
// Enables read-only commands on slave nodes.
@@ -39,8 +39,14 @@ type ClusterOptions struct {
// It automatically enables ReadOnly.
RouteByLatency bool
// Allows routing read-only commands to the random master or slave node.
// It automatically enables ReadOnly.
RouteRandomly bool
// Optional function that is used to load cluster slots information.
// It is useful to manually create cluster of standalone Redis servers
// or load-balance read/write operations between master and slaves.
ClusterSlots func() ([]ClusterSlot, error)
// Following options are copied from Options struct.
OnConnect func(*Conn) error
@@ -70,7 +76,7 @@ func (opt *ClusterOptions) init() {
opt.MaxRedirects = 8
}
if opt.RouteByLatency {
if opt.RouteByLatency || opt.RouteRandomly {
opt.ReadOnly = true
}
@@ -160,10 +166,6 @@ func (n *clusterNode) Close() error {
return n.Client.Close()
}
func (n *clusterNode) Test() error {
return n.Client.ClusterInfo().Err()
}
func (n *clusterNode) updateLatency() {
const probes = 10
@@ -330,7 +332,7 @@ func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
v, err := c.nodeCreateGroup.Do(addr, func() (interface{}, error) {
node := newClusterNode(c.opt, addr)
return node, node.Test()
return node, nil
})
c.mu.Lock()
@@ -509,6 +511,10 @@ func (c *clusterState) slotNodes(slot int) []*clusterNode {
}
func (c *clusterState) IsConsistent() bool {
if c.nodes.opt.ClusterSlots != nil {
return true
}
if len(c.Masters) > len(c.Slaves) {
return false
}
@@ -614,6 +620,14 @@ func (c *clusterStateHolder) Get() (*clusterState, error) {
return nil, errors.New("redis: cluster has no state")
}
func (c *clusterStateHolder) ReloadOrGet() (*clusterState, error) {
state, err := c.Reload()
if err == nil {
return state, nil
}
return c.Get()
}
//------------------------------------------------------------------------------
// ClusterClient is a Redis Cluster client representing a pool of zero
@@ -662,6 +676,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
return c
}
// ReloadState loads cluster slots information to update cluster topography.
func (c *ClusterClient) ReloadState() error {
_, err := c.state.Reload()
return err
}
func (c *ClusterClient) init() {
c.cmdable.setProcessor(c.Process)
}
@@ -946,12 +966,9 @@ func (c *ClusterClient) defaultProcess(cmd Cmder) error {
// ForEachMaster concurrently calls the fn on each master node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
state, err := c.state.Reload()
state, err := c.state.ReloadOrGet()
if err != nil {
state, err = c.state.Get()
if err != nil {
return err
}
return err
}
var wg sync.WaitGroup
@@ -982,12 +999,9 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
// ForEachSlave concurrently calls the fn on each slave node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
state, err := c.state.Reload()
state, err := c.state.ReloadOrGet()
if err != nil {
state, err = c.state.Get()
if err != nil {
return err
}
return err
}
var wg sync.WaitGroup
@@ -1018,12 +1032,9 @@ func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
// ForEachNode concurrently calls the fn on each known node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
state, err := c.state.Reload()
state, err := c.state.ReloadOrGet()
if err != nil {
state, err = c.state.Get()
if err != nil {
return err
}
return err
}
var wg sync.WaitGroup
@@ -1092,6 +1103,14 @@ func (c *ClusterClient) PoolStats() *PoolStats {
}
func (c *ClusterClient) loadState() (*clusterState, error) {
if c.opt.ClusterSlots != nil {
slots, err := c.opt.ClusterSlots()
if err != nil {
return nil, err
}
return newClusterState(c.nodes, slots, "")
}
addrs, err := c.nodes.Addrs()
if err != nil {
return nil, err