1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-28 06:42:00 +03:00

Merge branch 'master' into ndyakov/token-based-auth

This commit is contained in:
Nedyalko Dyakov
2025-04-22 12:05:06 +03:00
committed by GitHub
11 changed files with 682 additions and 333 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
"strings"
"sync"
@ -566,29 +567,50 @@ func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
}
}
var (
masterAddr string
wg sync.WaitGroup
once sync.Once
errCh = make(chan error, len(c.sentinelAddrs))
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
for i, sentinelAddr := range c.sentinelAddrs {
sentinel := NewSentinelClient(c.opt.sentinelOptions(sentinelAddr))
masterAddr, err := sentinel.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
if err != nil {
_ = sentinel.Close()
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return "", err
wg.Add(1)
go func(i int, addr string) {
defer wg.Done()
sentinelCli := NewSentinelClient(c.opt.sentinelOptions(addr))
addrVal, err := sentinelCli.GetMasterAddrByName(ctx, c.opt.MasterName).Result()
if err != nil {
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName addr=%s, master=%q failed: %s",
addr, c.opt.MasterName, err)
_ = sentinelCli.Close()
errCh <- err
return
}
internal.Logger.Printf(ctx, "sentinel: GetMasterAddrByName master=%q failed: %s",
c.opt.MasterName, err)
continue
}
// Push working sentinel to the top.
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
c.setSentinel(ctx, sentinel)
addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
return addr, nil
once.Do(func() {
masterAddr = net.JoinHostPort(addrVal[0], addrVal[1])
// Push working sentinel to the top
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
c.setSentinel(ctx, sentinelCli)
internal.Logger.Printf(ctx, "sentinel: selected addr=%s masterAddr=%s", addr, masterAddr)
cancel()
})
}(i, sentinelAddr)
}
return "", errors.New("redis: all sentinels specified in configuration are unreachable")
wg.Wait()
close(errCh)
if masterAddr != "" {
return masterAddr, nil
}
errs := make([]error, 0, len(errCh))
for err := range errCh {
errs = append(errs, err)
}
return "", fmt.Errorf("redis: all sentinels specified in configuration are unreachable: %w", errors.Join(errs...))
}
func (c *sentinelFailover) replicaAddrs(ctx context.Context, useDisconnected bool) ([]string, error) {
@ -815,6 +837,22 @@ func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
}
opt := failoverOpt.clusterOptions()
if failoverOpt.DB != 0 {
onConnect := opt.OnConnect
opt.OnConnect = func(ctx context.Context, cn *Conn) error {
if err := cn.Select(ctx, failoverOpt.DB).Err(); err != nil {
return err
}
if onConnect != nil {
return onConnect(ctx, cn)
}
return nil
}
}
opt.ClusterSlots = func(ctx context.Context) ([]ClusterSlot, error) {
masterAddr, err := failover.MasterAddr(ctx)
if err != nil {