1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-25 08:21:55 +03:00

Close connections to unused nodes

This commit is contained in:
Vladimir Mihailenco
2017-07-09 13:10:07 +03:00
parent 6060f097e1
commit 3ddda73a05
7 changed files with 418 additions and 230 deletions

View File

@ -28,18 +28,19 @@ type ClusterOptions struct {
// Default is 16.
MaxRedirects int
// Enables read queries for a connection to a Redis Cluster slave node.
// Enables read-only commands on slave nodes.
ReadOnly bool
// Enables routing read-only queries to the closest master or slave node.
// Allows routing read-only commands to the closest master or slave node.
RouteByLatency bool
// Following options are copied from Options struct.
OnConnect func(*Conn) error
MaxRetries int
Password string
MaxRetries int
MinRetryBackoff time.Duration
MaxRetryBackoff time.Duration
Password string
DialTimeout time.Duration
ReadTimeout time.Duration
@ -62,6 +63,19 @@ func (opt *ClusterOptions) init() {
if opt.RouteByLatency {
opt.ReadOnly = true
}
switch opt.MinRetryBackoff {
case -1:
opt.MinRetryBackoff = 0
case 0:
opt.MinRetryBackoff = 8 * time.Millisecond
}
switch opt.MaxRetryBackoff {
case -1:
opt.MaxRetryBackoff = 0
case 0:
opt.MaxRetryBackoff = 512 * time.Millisecond
}
}
func (opt *ClusterOptions) clientOptions() *Options {
@ -70,9 +84,11 @@ func (opt *ClusterOptions) clientOptions() *Options {
return &Options{
OnConnect: opt.OnConnect,
MaxRetries: opt.MaxRetries,
Password: opt.Password,
ReadOnly: opt.ReadOnly,
MaxRetries: opt.MaxRetries,
MinRetryBackoff: opt.MinRetryBackoff,
MaxRetryBackoff: opt.MaxRetryBackoff,
Password: opt.Password,
ReadOnly: opt.ReadOnly,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
@ -91,7 +107,9 @@ func (opt *ClusterOptions) clientOptions() *Options {
type clusterNode struct {
Client *Client
Latency time.Duration
loading time.Time
loading time.Time
generation uint32
}
func newClusterNode(clOpt *ClusterOptions, addr string) *clusterNode {
@ -122,6 +140,17 @@ func (n *clusterNode) Loading() bool {
return !n.loading.IsZero() && time.Since(n.loading) < time.Minute
}
func (n *clusterNode) Generation() uint32 {
return n.generation
}
func (n *clusterNode) SetGeneration(gen uint32) {
if gen < n.generation {
panic("gen < n.generation")
}
n.generation = gen
}
//------------------------------------------------------------------------------
type clusterNodes struct {
@ -131,6 +160,8 @@ type clusterNodes struct {
addrs []string
nodes map[string]*clusterNode
closed bool
generation uint32
}
func newClusterNodes(opt *ClusterOptions) *clusterNodes {
@ -161,6 +192,39 @@ func (c *clusterNodes) Close() error {
return firstErr
}
func (c *clusterNodes) NextGeneration() uint32 {
c.generation++
return c.generation
}
// GC removes unused nodes.
func (c *clusterNodes) GC(generation uint32) error {
var collected []*clusterNode
c.mu.Lock()
for i := 0; i < len(c.addrs); {
addr := c.addrs[i]
node := c.nodes[addr]
if node.Generation() >= generation {
i++
continue
}
c.addrs = append(c.addrs[:i], c.addrs[i+1:]...)
delete(c.nodes, addr)
collected = append(collected, node)
}
c.mu.Unlock()
var firstErr error
for _, node := range collected {
if err := node.Client.Close(); err != nil && firstErr == nil {
firstErr = err
}
}
return firstErr
}
func (c *clusterNodes) All() ([]*clusterNode, error) {
c.mu.RLock()
defer c.mu.RUnlock()
@ -176,7 +240,7 @@ func (c *clusterNodes) All() ([]*clusterNode, error) {
return nodes, nil
}
func (c *clusterNodes) Get(addr string) (*clusterNode, error) {
func (c *clusterNodes) GetOrCreate(addr string) (*clusterNode, error) {
var node *clusterNode
var ok bool
@ -223,7 +287,7 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
var nodeErr error
for i := 0; i <= c.opt.MaxRedirects; i++ {
n := rand.Intn(len(addrs))
node, err := c.Get(addrs[n])
node, err := c.GetOrCreate(addrs[n])
if err != nil {
return nil, err
}
@ -239,30 +303,45 @@ func (c *clusterNodes) Random() (*clusterNode, error) {
//------------------------------------------------------------------------------
type clusterState struct {
nodes *clusterNodes
nodes *clusterNodes
masters []*clusterNode
slaves []*clusterNode
slots [][]*clusterNode
generation uint32
}
func newClusterState(nodes *clusterNodes, slots []ClusterSlot, origin string) (*clusterState, error) {
c := clusterState{
nodes: nodes,
nodes: nodes,
generation: nodes.NextGeneration(),
slots: make([][]*clusterNode, hashtag.SlotNumber),
}
isLoopbackOrigin := isLoopbackAddr(origin)
for _, slot := range slots {
var nodes []*clusterNode
for _, slotNode := range slot.Nodes {
for i, slotNode := range slot.Nodes {
addr := slotNode.Addr
if !isLoopbackOrigin && isLoopbackAddr(addr) {
addr = origin
}
node, err := c.nodes.Get(addr)
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return nil, err
}
node.SetGeneration(c.generation)
nodes = append(nodes, node)
if i == 0 {
c.masters = appendNode(c.masters, node)
} else {
c.slaves = appendNode(c.slaves, node)
}
}
for i := slot.Start; i <= slot.End; i++ {
@ -348,7 +427,7 @@ type ClusterClient struct {
cmdsInfoOnce internal.Once
cmdsInfo map[string]*CommandInfo
// Reports where slots reloading is in progress.
// Reports whether slots reloading is in progress.
reloading uint32
}
@ -365,12 +444,12 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
// Add initial nodes.
for _, addr := range opt.Addrs {
_, _ = c.nodes.Get(addr)
_, _ = c.nodes.GetOrCreate(addr)
}
// Preload cluster slots.
for i := 0; i < 10; i++ {
state, err := c.reloadSlots()
state, err := c.reloadState()
if err == nil {
c._state.Store(state)
break
@ -394,7 +473,7 @@ func (c *ClusterClient) state() *clusterState {
if v != nil {
return v.(*clusterState)
}
c.lazyReloadSlots()
c.lazyReloadState()
return nil
}
@ -476,6 +555,10 @@ func (c *ClusterClient) Process(cmd Cmder) error {
var ask bool
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
time.Sleep(node.Client.retryBackoff(attempt))
}
if ask {
pipe := node.Client.Pipeline()
pipe.Process(NewCmd("ASKING"))
@ -487,13 +570,14 @@ func (c *ClusterClient) Process(cmd Cmder) error {
err = node.Client.Process(cmd)
}
// If there is no (real) error - we are done.
// If there is no error - we are done.
if err == nil {
return nil
}
// If slave is loading - read from master.
if c.opt.ReadOnly && internal.IsLoadingError(err) {
// TODO: race
node.loading = time.Now()
continue
}
@ -516,11 +600,11 @@ func (c *ClusterClient) Process(cmd Cmder) error {
if state != nil && slot >= 0 {
master, _ := state.slotMasterNode(slot)
if moved && (master == nil || master.Client.getAddr() != addr) {
c.lazyReloadSlots()
c.lazyReloadState()
}
}
node, err = c.nodes.Get(addr)
node, err = c.nodes.GetOrCreate(addr)
if err != nil {
cmd.setErr(err)
return err
@ -535,39 +619,6 @@ func (c *ClusterClient) Process(cmd Cmder) error {
return cmd.Err()
}
// ForEachNode concurrently calls the fn on each ever known node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
nodes, err := c.nodes.All()
if err != nil {
return err
}
var wg sync.WaitGroup
errCh := make(chan error, 1)
for _, node := range nodes {
wg.Add(1)
go func(node *clusterNode) {
defer wg.Done()
err := fn(node.Client)
if err != nil {
select {
case errCh <- err:
default:
}
}
}(node)
}
wg.Wait()
select {
case err := <-errCh:
return err
default:
return nil
}
}
// ForEachMaster concurrently calls the fn on each master node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
@ -577,19 +628,8 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
}
var wg sync.WaitGroup
visited := make(map[*clusterNode]struct{})
errCh := make(chan error, 1)
for _, nodes := range state.slots {
if len(nodes) == 0 {
continue
}
master := nodes[0]
if _, ok := visited[master]; ok {
continue
}
visited[master] = struct{}{}
for _, master := range state.masters {
wg.Add(1)
go func(node *clusterNode) {
defer wg.Done()
@ -612,16 +652,88 @@ func (c *ClusterClient) ForEachMaster(fn func(client *Client) error) error {
}
}
// ForEachSlave concurrently calls the fn on each slave node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachSlave(fn func(client *Client) error) error {
state := c.state()
if state == nil {
return errNilClusterState
}
var wg sync.WaitGroup
errCh := make(chan error, 1)
for _, slave := range state.slaves {
wg.Add(1)
go func(node *clusterNode) {
defer wg.Done()
err := fn(node.Client)
if err != nil {
select {
case errCh <- err:
default:
}
}
}(slave)
}
wg.Wait()
select {
case err := <-errCh:
return err
default:
return nil
}
}
// ForEachNode concurrently calls the fn on each known node in the cluster.
// It returns the first error if any.
func (c *ClusterClient) ForEachNode(fn func(client *Client) error) error {
state := c.state()
if state == nil {
return errNilClusterState
}
var wg sync.WaitGroup
errCh := make(chan error, 1)
worker := func(node *clusterNode) {
defer wg.Done()
err := fn(node.Client)
if err != nil {
select {
case errCh <- err:
default:
}
}
}
for _, node := range state.masters {
wg.Add(1)
go worker(node)
}
for _, node := range state.slaves {
wg.Add(1)
go worker(node)
}
wg.Wait()
select {
case err := <-errCh:
return err
default:
return nil
}
}
// PoolStats returns accumulated connection pool stats.
func (c *ClusterClient) PoolStats() *PoolStats {
var acc PoolStats
nodes, err := c.nodes.All()
if err != nil {
state := c.state()
if state == nil {
return &acc
}
for _, node := range nodes {
for _, node := range state.masters {
s := node.Client.connPool.Stats()
acc.Requests += s.Requests
acc.Hits += s.Hits
@ -629,33 +741,51 @@ func (c *ClusterClient) PoolStats() *PoolStats {
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
}
for _, node := range state.slaves {
s := node.Client.connPool.Stats()
acc.Requests += s.Requests
acc.Hits += s.Hits
acc.Timeouts += s.Timeouts
acc.TotalConns += s.TotalConns
acc.FreeConns += s.FreeConns
}
return &acc
}
func (c *ClusterClient) lazyReloadSlots() {
func (c *ClusterClient) lazyReloadState() {
if !atomic.CompareAndSwapUint32(&c.reloading, 0, 1) {
return
}
go func() {
for i := 0; i < 1000; i++ {
state, err := c.reloadSlots()
defer atomic.StoreUint32(&c.reloading, 0)
var state *clusterState
for {
var err error
state, err = c.reloadState()
if err == pool.ErrClosed {
break
return
}
if err == nil {
c._state.Store(state)
break
if err != nil {
time.Sleep(time.Millisecond)
continue
}
time.Sleep(time.Millisecond)
c._state.Store(state)
break
}
time.Sleep(3 * time.Second)
atomic.StoreUint32(&c.reloading, 0)
c.nodes.GC(state.generation)
}()
}
func (c *ClusterClient) reloadSlots() (*clusterState, error) {
// Not thread-safe.
func (c *ClusterClient) reloadState() (*clusterState, error) {
node, err := c.nodes.Random()
if err != nil {
return nil, err
@ -799,9 +929,9 @@ func (c *ClusterClient) pipelineReadCmds(
func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]Cmder) error {
moved, ask, addr := internal.IsMovedError(cmd.Err())
if moved {
c.lazyReloadSlots()
c.lazyReloadState()
node, err := c.nodes.Get(addr)
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return err
}
@ -809,7 +939,7 @@ func (c *ClusterClient) checkMovedErr(cmd Cmder, failedCmds map[*clusterNode][]C
failedCmds[node] = append(failedCmds[node], cmd)
}
if ask {
node, err := c.nodes.Get(addr)
node, err := c.nodes.GetOrCreate(addr)
if err != nil {
return err
}
@ -1029,3 +1159,12 @@ func isLoopbackAddr(addr string) bool {
return ip.IsLoopback()
}
func appendNode(nodes []*clusterNode, node *clusterNode) []*clusterNode {
for _, n := range nodes {
if n == node {
return nodes
}
}
return append(nodes, node)
}