mirror of
https://github.com/redis/go-redis.git
synced 2025-07-31 05:04:23 +03:00
Enable reaper on ClusterClient and add tests.
This commit is contained in:
55
ring.go
55
ring.go
@ -4,6 +4,7 @@ import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"gopkg.in/redis.v4/internal"
|
||||
@ -65,7 +66,7 @@ func (opt *RingOptions) clientOptions() *Options {
|
||||
|
||||
type ringShard struct {
|
||||
Client *Client
|
||||
down int
|
||||
down int32
|
||||
}
|
||||
|
||||
func (shard *ringShard) String() string {
|
||||
@ -80,7 +81,7 @@ func (shard *ringShard) String() string {
|
||||
|
||||
func (shard *ringShard) IsDown() bool {
|
||||
const threshold = 3
|
||||
return shard.down >= threshold
|
||||
return atomic.LoadInt32(&shard.down) >= threshold
|
||||
}
|
||||
|
||||
func (shard *ringShard) IsUp() bool {
|
||||
@ -91,7 +92,7 @@ func (shard *ringShard) IsUp() bool {
|
||||
func (shard *ringShard) Vote(up bool) bool {
|
||||
if up {
|
||||
changed := shard.IsDown()
|
||||
shard.down = 0
|
||||
atomic.StoreInt32(&shard.down, 0)
|
||||
return changed
|
||||
}
|
||||
|
||||
@ -99,7 +100,7 @@ func (shard *ringShard) Vote(up bool) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
shard.down++
|
||||
atomic.AddInt32(&shard.down, 1)
|
||||
return shard.IsDown()
|
||||
}
|
||||
|
||||
@ -157,6 +158,52 @@ func NewRing(opt *RingOptions) *Ring {
|
||||
return ring
|
||||
}
|
||||
|
||||
// PoolStats returns accumulated connection pool stats.
|
||||
func (c *Ring) PoolStats() *PoolStats {
|
||||
var acc PoolStats
|
||||
for _, shard := range c.shards {
|
||||
s := shard.Client.connPool.Stats()
|
||||
acc.Requests += s.Requests
|
||||
acc.Hits += s.Hits
|
||||
acc.Timeouts += s.Timeouts
|
||||
acc.TotalConns += s.TotalConns
|
||||
acc.FreeConns += s.FreeConns
|
||||
}
|
||||
return &acc
|
||||
}
|
||||
|
||||
// ForEachShard concurrently calls the fn on each live shard in the ring.
|
||||
// It returns the first error if any.
|
||||
func (c *Ring) ForEachShard(fn func(client *Client) error) error {
|
||||
var wg sync.WaitGroup
|
||||
errCh := make(chan error, 1)
|
||||
for _, shard := range c.shards {
|
||||
if shard.IsDown() {
|
||||
continue
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(shard *ringShard) {
|
||||
defer wg.Done()
|
||||
err := fn(shard.Client)
|
||||
if err != nil {
|
||||
select {
|
||||
case errCh <- err:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}(shard)
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
select {
|
||||
case err := <-errCh:
|
||||
return err
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Ring) cmdInfo(name string) *CommandInfo {
|
||||
c.cmdsInfoOnce.Do(func() {
|
||||
for _, shard := range c.shards {
|
||||
|
Reference in New Issue
Block a user