mirror of
https://github.com/redis/go-redis.git
synced 2025-06-09 16:01:18 +03:00
Ensure context isn't exhausted via concurrent query as opposed to sentinel query (#3334)
This commit is contained in:
parent
e2149b06f7
commit
a4aea258fc
67
sentinel.go
67
sentinel.go
@ -566,29 +566,60 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
masterAddr string
|
||||||
|
wg sync.WaitGroup
|
||||||
|
once sync.Once
|
||||||
|
errCh = make(chan error, len(c.sentinelAddrs))
|
||||||
|
)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
for i, sentinelAddr := range c.sentinelAddrs {
|
for i, sentinelAddr := range c.sentinelAddrs {
|
||||||
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
|
wg.Add(1)
|
||||||
|
go func(i int, addr string) {
|
||||||
masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
|
defer wg.Done()
|
||||||
if err != nil {
|
sentinelCli := NewSentinelClient(c.opt.sentinelOptions(addr))
|
||||||
_ = sentinel.Close()
|
addrVal, err := sentinelCli.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
|
||||||
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
if err != nil {
|
||||||
return "", err
|
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
|
||||||
|
// Report immediately and return
|
||||||
|
errCh <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName addr=%s, master=%q failed: %s",
|
||||||
|
addr, c.opt.MasterName, err)
|
||||||
|
_ = sentinelCli.Close()
|
||||||
|
return
|
||||||
}
|
}
|
||||||
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
|
|
||||||
c.opt.MasterName, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// Push working sentinel to the top.
|
once.Do(func() {
|
||||||
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
|
masterAddr = net.JoinHostPort(addrVal[0], addrVal[1])
|
||||||
c.setSentinel(ctx, sentinel)
|
// Push working sentinel to the top
|
||||||
|
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
|
||||||
addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
|
c.setSentinel(ctx, sentinelCli)
|
||||||
return addr, nil
|
internal.Logger.Printf(ctx, "sentinel: selected addr=%s masterAddr=%s", addr, masterAddr)
|
||||||
|
cancel()
|
||||||
|
})
|
||||||
|
}(i, sentinelAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
return "", errors.New("redis: all sentinels specified in configuration are unreachable")
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
wg.Wait()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
if masterAddr != "" {
|
||||||
|
return masterAddr, nil
|
||||||
|
}
|
||||||
|
return "", errors.New("redis: all sentinels specified in configuration are unreachable")
|
||||||
|
case err := <-errCh:
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *sentinelFailover) replicaAddrs(ctx context.Context, useDisconnected bool) ([]string, error) {
|
func (c *sentinelFailover) replicaAddrs(ctx context.Context, useDisconnected bool) ([]string, error) {
|
||||||
|
@ -3,6 +3,7 @@ package redis_test
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"net"
|
"net"
|
||||||
|
"time"
|
||||||
|
|
||||||
. "github.com/bsm/ginkgo/v2"
|
. "github.com/bsm/ginkgo/v2"
|
||||||
. "github.com/bsm/gomega"
|
. "github.com/bsm/gomega"
|
||||||
@ -32,6 +33,24 @@ var _ = Describe("Sentinel PROTO 2", func() {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
var _ = Describe("Sentinel resolution", func() {
|
||||||
|
It("should resolve master without context exhaustion", func() {
|
||||||
|
shortCtx, cancel := context.WithTimeout(ctx, 500*time.Millisecond)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
client := redis.NewFailoverClient(&redis.FailoverOptions{
|
||||||
|
MasterName: sentinelName,
|
||||||
|
SentinelAddrs: sentinelAddrs,
|
||||||
|
MaxRetries: -1,
|
||||||
|
})
|
||||||
|
|
||||||
|
err := client.Ping(shortCtx).Err()
|
||||||
|
Expect(err).NotTo(HaveOccurred(), "expected master to resolve without context exhaustion")
|
||||||
|
|
||||||
|
_ = client.Close()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
var _ = Describe("Sentinel", func() {
|
var _ = Describe("Sentinel", func() {
|
||||||
var client *redis.Client
|
var client *redis.Client
|
||||||
var master *redis.Client
|
var master *redis.Client
|
||||||
|
Loading…
x
Reference in New Issue
Block a user