mirror of
https://github.com/redis/go-redis.git
synced 2025-08-07 12:42:55 +03:00
Ensure to use pointer methods where appropriate. Tidy up godoc.
This commit is contained in:
110
ring.go
110
ring.go
@@ -102,7 +102,7 @@ func (shard *ringShard) Vote(up bool) bool {
|
||||
// concurrent use by multiple goroutines.
|
||||
//
|
||||
// Ring monitors the state of each shard and removes dead shards from
|
||||
// the ring. When shard comes online it is added back to the ring. This
|
||||
// the c. When shard comes online it is added back to the c. This
|
||||
// gives you maximum availability and partition tolerance, but no
|
||||
// consistency between different shards or even clients. Each client
|
||||
// uses shards that are available to the client and does not do any
|
||||
@@ -149,58 +149,58 @@ func NewRing(opt *RingOptions) *Ring {
|
||||
return ring
|
||||
}
|
||||
|
||||
func (ring *Ring) cmdInfo(name string) *CommandInfo {
|
||||
ring.cmdsInfoOnce.Do(func() {
|
||||
for _, shard := range ring.shards {
|
||||
func (c *Ring) cmdInfo(name string) *CommandInfo {
|
||||
c.cmdsInfoOnce.Do(func() {
|
||||
for _, shard := range c.shards {
|
||||
cmdsInfo, err := shard.Client.Command().Result()
|
||||
if err == nil {
|
||||
ring.cmdsInfo = cmdsInfo
|
||||
c.cmdsInfo = cmdsInfo
|
||||
return
|
||||
}
|
||||
}
|
||||
ring.cmdsInfoOnce = &sync.Once{}
|
||||
c.cmdsInfoOnce = &sync.Once{}
|
||||
})
|
||||
if ring.cmdsInfo == nil {
|
||||
if c.cmdsInfo == nil {
|
||||
return nil
|
||||
}
|
||||
return ring.cmdsInfo[name]
|
||||
return c.cmdsInfo[name]
|
||||
}
|
||||
|
||||
func (ring *Ring) cmdFirstKey(cmd Cmder) string {
|
||||
cmdInfo := ring.cmdInfo(cmd.arg(0))
|
||||
func (c *Ring) cmdFirstKey(cmd Cmder) string {
|
||||
cmdInfo := c.cmdInfo(cmd.arg(0))
|
||||
if cmdInfo == nil {
|
||||
return ""
|
||||
}
|
||||
return cmd.arg(int(cmdInfo.FirstKeyPos))
|
||||
}
|
||||
|
||||
func (ring *Ring) addClient(name string, cl *Client) {
|
||||
ring.mu.Lock()
|
||||
ring.hash.Add(name)
|
||||
ring.shards[name] = &ringShard{Client: cl}
|
||||
ring.mu.Unlock()
|
||||
func (c *Ring) addClient(name string, cl *Client) {
|
||||
c.mu.Lock()
|
||||
c.hash.Add(name)
|
||||
c.shards[name] = &ringShard{Client: cl}
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
func (ring *Ring) getClient(key string) (*Client, error) {
|
||||
ring.mu.RLock()
|
||||
func (c *Ring) getClient(key string) (*Client, error) {
|
||||
c.mu.RLock()
|
||||
|
||||
if ring.closed {
|
||||
if c.closed {
|
||||
return nil, pool.ErrClosed
|
||||
}
|
||||
|
||||
name := ring.hash.Get(hashtag.Key(key))
|
||||
name := c.hash.Get(hashtag.Key(key))
|
||||
if name == "" {
|
||||
ring.mu.RUnlock()
|
||||
c.mu.RUnlock()
|
||||
return nil, errRingShardsDown
|
||||
}
|
||||
|
||||
cl := ring.shards[name].Client
|
||||
ring.mu.RUnlock()
|
||||
cl := c.shards[name].Client
|
||||
c.mu.RUnlock()
|
||||
return cl, nil
|
||||
}
|
||||
|
||||
func (ring *Ring) Process(cmd Cmder) error {
|
||||
cl, err := ring.getClient(ring.cmdFirstKey(cmd))
|
||||
func (c *Ring) Process(cmd Cmder) error {
|
||||
cl, err := c.getClient(c.cmdFirstKey(cmd))
|
||||
if err != nil {
|
||||
cmd.setErr(err)
|
||||
return err
|
||||
@@ -208,34 +208,34 @@ func (ring *Ring) Process(cmd Cmder) error {
|
||||
return cl.baseClient.Process(cmd)
|
||||
}
|
||||
|
||||
// rebalance removes dead shards from the ring.
|
||||
func (ring *Ring) rebalance() {
|
||||
defer ring.mu.Unlock()
|
||||
ring.mu.Lock()
|
||||
// rebalance removes dead shards from the c.
|
||||
func (c *Ring) rebalance() {
|
||||
defer c.mu.Unlock()
|
||||
c.mu.Lock()
|
||||
|
||||
ring.hash = consistenthash.New(ring.nreplicas, nil)
|
||||
for name, shard := range ring.shards {
|
||||
c.hash = consistenthash.New(c.nreplicas, nil)
|
||||
for name, shard := range c.shards {
|
||||
if shard.IsUp() {
|
||||
ring.hash.Add(name)
|
||||
c.hash.Add(name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// heartbeat monitors state of each shard in the ring.
|
||||
func (ring *Ring) heartbeat() {
|
||||
// heartbeat monitors state of each shard in the c.
|
||||
func (c *Ring) heartbeat() {
|
||||
ticker := time.NewTicker(100 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
for _ = range ticker.C {
|
||||
var rebalance bool
|
||||
|
||||
ring.mu.RLock()
|
||||
c.mu.RLock()
|
||||
|
||||
if ring.closed {
|
||||
ring.mu.RUnlock()
|
||||
if c.closed {
|
||||
c.mu.RUnlock()
|
||||
break
|
||||
}
|
||||
|
||||
for _, shard := range ring.shards {
|
||||
for _, shard := range c.shards {
|
||||
err := shard.Client.Ping().Err()
|
||||
if shard.Vote(err == nil || err == pool.ErrPoolTimeout) {
|
||||
internal.Logf("ring shard state changed: %s", shard)
|
||||
@@ -243,10 +243,10 @@ func (ring *Ring) heartbeat() {
|
||||
}
|
||||
}
|
||||
|
||||
ring.mu.RUnlock()
|
||||
c.mu.RUnlock()
|
||||
|
||||
if rebalance {
|
||||
ring.rebalance()
|
||||
c.rebalance()
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -255,45 +255,45 @@ func (ring *Ring) heartbeat() {
|
||||
//
|
||||
// It is rare to Close a Ring, as the Ring is meant to be long-lived
|
||||
// and shared between many goroutines.
|
||||
func (ring *Ring) Close() (retErr error) {
|
||||
defer ring.mu.Unlock()
|
||||
ring.mu.Lock()
|
||||
func (c *Ring) Close() (retErr error) {
|
||||
defer c.mu.Unlock()
|
||||
c.mu.Lock()
|
||||
|
||||
if ring.closed {
|
||||
if c.closed {
|
||||
return nil
|
||||
}
|
||||
ring.closed = true
|
||||
c.closed = true
|
||||
|
||||
for _, shard := range ring.shards {
|
||||
for _, shard := range c.shards {
|
||||
if err := shard.Client.Close(); err != nil {
|
||||
retErr = err
|
||||
}
|
||||
}
|
||||
ring.hash = nil
|
||||
ring.shards = nil
|
||||
c.hash = nil
|
||||
c.shards = nil
|
||||
|
||||
return retErr
|
||||
}
|
||||
|
||||
func (ring *Ring) Pipeline() *Pipeline {
|
||||
func (c *Ring) Pipeline() *Pipeline {
|
||||
pipe := Pipeline{
|
||||
exec: ring.pipelineExec,
|
||||
exec: c.pipelineExec,
|
||||
}
|
||||
pipe.cmdable.process = pipe.Process
|
||||
pipe.statefulCmdable.process = pipe.Process
|
||||
return &pipe
|
||||
}
|
||||
|
||||
func (ring *Ring) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
|
||||
return ring.Pipeline().pipelined(fn)
|
||||
func (c *Ring) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
|
||||
return c.Pipeline().pipelined(fn)
|
||||
}
|
||||
|
||||
func (ring *Ring) pipelineExec(cmds []Cmder) error {
|
||||
func (c *Ring) pipelineExec(cmds []Cmder) error {
|
||||
var retErr error
|
||||
|
||||
cmdsMap := make(map[string][]Cmder)
|
||||
for _, cmd := range cmds {
|
||||
name := ring.hash.Get(hashtag.Key(ring.cmdFirstKey(cmd)))
|
||||
name := c.hash.Get(hashtag.Key(c.cmdFirstKey(cmd)))
|
||||
if name == "" {
|
||||
cmd.setErr(errRingShardsDown)
|
||||
if retErr == nil {
|
||||
@@ -304,11 +304,11 @@ func (ring *Ring) pipelineExec(cmds []Cmder) error {
|
||||
cmdsMap[name] = append(cmdsMap[name], cmd)
|
||||
}
|
||||
|
||||
for i := 0; i <= ring.opt.MaxRetries; i++ {
|
||||
for i := 0; i <= c.opt.MaxRetries; i++ {
|
||||
failedCmdsMap := make(map[string][]Cmder)
|
||||
|
||||
for name, cmds := range cmdsMap {
|
||||
client := ring.shards[name].Client
|
||||
client := c.shards[name].Client
|
||||
cn, err := client.conn()
|
||||
if err != nil {
|
||||
setCmdsErr(cmds, err)
|
||||
|
Reference in New Issue
Block a user