mirror of
https://github.com/redis/go-redis.git
synced 2025-07-29 17:41:15 +03:00
Replace PoolStats.Requests with PoolStats.Misses
This commit is contained in:
52
ring.go
52
ring.go
@ -145,9 +145,10 @@ type Ring struct {
|
||||
opt *RingOptions
|
||||
nreplicas int
|
||||
|
||||
mu sync.RWMutex
|
||||
hash *consistenthash.Map
|
||||
shards map[string]*ringShard
|
||||
mu sync.RWMutex
|
||||
hash *consistenthash.Map
|
||||
shards map[string]*ringShard
|
||||
shardsList []*ringShard
|
||||
|
||||
cmdsInfoOnce internal.Once
|
||||
cmdsInfo map[string]*CommandInfo
|
||||
@ -169,12 +170,21 @@ func NewRing(opt *RingOptions) *Ring {
|
||||
for name, addr := range opt.Addrs {
|
||||
clopt := opt.clientOptions()
|
||||
clopt.Addr = addr
|
||||
ring.addClient(name, NewClient(clopt))
|
||||
ring.addShard(name, NewClient(clopt))
|
||||
}
|
||||
go ring.heartbeat()
|
||||
return ring
|
||||
}
|
||||
|
||||
func (c *Ring) addShard(name string, cl *Client) {
|
||||
shard := &ringShard{Client: cl}
|
||||
c.mu.Lock()
|
||||
c.hash.Add(name)
|
||||
c.shards[name] = shard
|
||||
c.shardsList = append(c.shardsList, shard)
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Options returns read-only Options that were used to create the client.
|
||||
func (c *Ring) Options() *RingOptions {
|
||||
return c.opt
|
||||
@ -186,11 +196,15 @@ func (c *Ring) retryBackoff(attempt int) time.Duration {
|
||||
|
||||
// PoolStats returns accumulated connection pool stats.
|
||||
func (c *Ring) PoolStats() *PoolStats {
|
||||
c.mu.RLock()
|
||||
shards := c.shardsList
|
||||
c.mu.RUnlock()
|
||||
|
||||
var acc PoolStats
|
||||
for _, shard := range c.shards {
|
||||
for _, shard := range shards {
|
||||
s := shard.Client.connPool.Stats()
|
||||
acc.Requests += s.Requests
|
||||
acc.Hits += s.Hits
|
||||
acc.Misses += s.Misses
|
||||
acc.Timeouts += s.Timeouts
|
||||
acc.TotalConns += s.TotalConns
|
||||
acc.FreeConns += s.FreeConns
|
||||
@ -229,9 +243,13 @@ func (c *Ring) PSubscribe(channels ...string) *PubSub {
|
||||
// ForEachShard concurrently calls the fn on each live shard in the ring.
|
||||
// It returns the first error if any.
|
||||
func (c *Ring) ForEachShard(fn func(client *Client) error) error {
|
||||
c.mu.RLock()
|
||||
shards := c.shardsList
|
||||
c.mu.RUnlock()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
errCh := make(chan error, 1)
|
||||
for _, shard := range c.shards {
|
||||
for _, shard := range shards {
|
||||
if shard.IsDown() {
|
||||
continue
|
||||
}
|
||||
@ -261,10 +279,11 @@ func (c *Ring) ForEachShard(fn func(client *Client) error) error {
|
||||
func (c *Ring) cmdInfo(name string) *CommandInfo {
|
||||
err := c.cmdsInfoOnce.Do(func() error {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
shards := c.shardsList
|
||||
c.mu.RUnlock()
|
||||
|
||||
var firstErr error
|
||||
for _, shard := range c.shards {
|
||||
for _, shard := range shards {
|
||||
cmdsInfo, err := shard.Client.Command().Result()
|
||||
if err == nil {
|
||||
c.cmdsInfo = cmdsInfo
|
||||
@ -286,13 +305,6 @@ func (c *Ring) cmdInfo(name string) *CommandInfo {
|
||||
return info
|
||||
}
|
||||
|
||||
func (c *Ring) addClient(name string, cl *Client) {
|
||||
c.mu.Lock()
|
||||
c.hash.Add(name)
|
||||
c.shards[name] = &ringShard{Client: cl}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (c *Ring) shardByKey(key string) (*ringShard, error) {
|
||||
key = hashtag.Key(key)
|
||||
|
||||
@ -372,7 +384,10 @@ func (c *Ring) heartbeat() {
|
||||
break
|
||||
}
|
||||
|
||||
for _, shard := range c.shards {
|
||||
shards := c.shardsList
|
||||
c.mu.RUnlock()
|
||||
|
||||
for _, shard := range shards {
|
||||
err := shard.Client.Ping().Err()
|
||||
if shard.Vote(err == nil || err == pool.ErrPoolTimeout) {
|
||||
internal.Logf("ring shard state changed: %s", shard)
|
||||
@ -380,8 +395,6 @@ func (c *Ring) heartbeat() {
|
||||
}
|
||||
}
|
||||
|
||||
c.mu.RUnlock()
|
||||
|
||||
if rebalance {
|
||||
c.rebalance()
|
||||
}
|
||||
@ -409,6 +422,7 @@ func (c *Ring) Close() error {
|
||||
}
|
||||
c.hash = nil
|
||||
c.shards = nil
|
||||
c.shardsList = nil
|
||||
|
||||
return firstErr
|
||||
}
|
||||
|
Reference in New Issue
Block a user