mirror of
https://github.com/redis/go-redis.git
synced 2025-04-17 20:17:02 +03:00
Currently, the pubsub connection is aggressively disconnected when we close the upstream client which causes it to log something like: redis: 2018/11/09 09:10:07 pubsub.go:151: redis: discarding bad PubSub connection: read tcp 127.0.0.1:61025->127.0.0.1:26378: use of closed network connection This change simply cleans up the connection on close. We create new redis servers and connections for each set of independent tests and the logs were getting spammy.
372 lines
7.9 KiB
Go
372 lines
7.9 KiB
Go
package redis
|
|
|
|
import (
|
|
"crypto/tls"
|
|
"errors"
|
|
"net"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/go-redis/redis/internal"
|
|
"github.com/go-redis/redis/internal/pool"
|
|
)
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
// FailoverOptions are used to configure a failover client and should
|
|
// be passed to NewFailoverClient.
|
|
type FailoverOptions struct {
|
|
// The master name.
|
|
MasterName string
|
|
// A seed list of host:port addresses of sentinel nodes.
|
|
SentinelAddrs []string
|
|
|
|
// Following options are copied from Options struct.
|
|
|
|
OnConnect func(*Conn) error
|
|
|
|
Password string
|
|
DB int
|
|
|
|
MaxRetries int
|
|
MinRetryBackoff time.Duration
|
|
MaxRetryBackoff time.Duration
|
|
|
|
DialTimeout time.Duration
|
|
ReadTimeout time.Duration
|
|
WriteTimeout time.Duration
|
|
|
|
PoolSize int
|
|
MinIdleConns int
|
|
MaxConnAge time.Duration
|
|
PoolTimeout time.Duration
|
|
IdleTimeout time.Duration
|
|
IdleCheckFrequency time.Duration
|
|
|
|
TLSConfig *tls.Config
|
|
}
|
|
|
|
func (opt *FailoverOptions) options() *Options {
|
|
return &Options{
|
|
Addr: "FailoverClient",
|
|
|
|
OnConnect: opt.OnConnect,
|
|
|
|
DB: opt.DB,
|
|
Password: opt.Password,
|
|
|
|
MaxRetries: opt.MaxRetries,
|
|
|
|
DialTimeout: opt.DialTimeout,
|
|
ReadTimeout: opt.ReadTimeout,
|
|
WriteTimeout: opt.WriteTimeout,
|
|
|
|
PoolSize: opt.PoolSize,
|
|
PoolTimeout: opt.PoolTimeout,
|
|
IdleTimeout: opt.IdleTimeout,
|
|
IdleCheckFrequency: opt.IdleCheckFrequency,
|
|
|
|
TLSConfig: opt.TLSConfig,
|
|
}
|
|
}
|
|
|
|
// NewFailoverClient returns a Redis client that uses Redis Sentinel
|
|
// for automatic failover. It's safe for concurrent use by multiple
|
|
// goroutines.
|
|
func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
|
|
opt := failoverOpt.options()
|
|
opt.init()
|
|
|
|
failover := &sentinelFailover{
|
|
masterName: failoverOpt.MasterName,
|
|
sentinelAddrs: failoverOpt.SentinelAddrs,
|
|
|
|
opt: opt,
|
|
}
|
|
|
|
c := Client{
|
|
baseClient: baseClient{
|
|
opt: opt,
|
|
connPool: failover.Pool(),
|
|
|
|
onClose: func() error {
|
|
return failover.Close()
|
|
},
|
|
},
|
|
}
|
|
c.baseClient.init()
|
|
c.cmdable.setProcessor(c.Process)
|
|
|
|
return &c
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
type SentinelClient struct {
|
|
baseClient
|
|
}
|
|
|
|
func NewSentinelClient(opt *Options) *SentinelClient {
|
|
opt.init()
|
|
c := &SentinelClient{
|
|
baseClient: baseClient{
|
|
opt: opt,
|
|
connPool: newConnPool(opt),
|
|
},
|
|
}
|
|
c.baseClient.init()
|
|
return c
|
|
}
|
|
|
|
func (c *SentinelClient) pubSub() *PubSub {
|
|
pubsub := &PubSub{
|
|
opt: c.opt,
|
|
|
|
newConn: func(channels []string) (*pool.Conn, error) {
|
|
return c.newConn()
|
|
},
|
|
closeConn: c.connPool.CloseConn,
|
|
}
|
|
pubsub.init()
|
|
return pubsub
|
|
}
|
|
|
|
// Subscribe subscribes the client to the specified channels.
|
|
// Channels can be omitted to create empty subscription.
|
|
func (c *SentinelClient) Subscribe(channels ...string) *PubSub {
|
|
pubsub := c.pubSub()
|
|
if len(channels) > 0 {
|
|
_ = pubsub.Subscribe(channels...)
|
|
}
|
|
return pubsub
|
|
}
|
|
|
|
// PSubscribe subscribes the client to the given patterns.
|
|
// Patterns can be omitted to create empty subscription.
|
|
func (c *SentinelClient) PSubscribe(channels ...string) *PubSub {
|
|
pubsub := c.pubSub()
|
|
if len(channels) > 0 {
|
|
_ = pubsub.PSubscribe(channels...)
|
|
}
|
|
return pubsub
|
|
}
|
|
|
|
func (c *SentinelClient) GetMasterAddrByName(name string) *StringSliceCmd {
|
|
cmd := NewStringSliceCmd("sentinel", "get-master-addr-by-name", name)
|
|
c.Process(cmd)
|
|
return cmd
|
|
}
|
|
|
|
func (c *SentinelClient) Sentinels(name string) *SliceCmd {
|
|
cmd := NewSliceCmd("sentinel", "sentinels", name)
|
|
c.Process(cmd)
|
|
return cmd
|
|
}
|
|
|
|
type sentinelFailover struct {
|
|
sentinelAddrs []string
|
|
|
|
opt *Options
|
|
|
|
pool *pool.ConnPool
|
|
poolOnce sync.Once
|
|
|
|
mu sync.RWMutex
|
|
masterName string
|
|
_masterAddr string
|
|
sentinel *SentinelClient
|
|
pubsub *PubSub
|
|
}
|
|
|
|
func (c *sentinelFailover) Close() error {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
if c.sentinel != nil {
|
|
return c.closeSentinel()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *sentinelFailover) Pool() *pool.ConnPool {
|
|
c.poolOnce.Do(func() {
|
|
c.opt.Dialer = c.dial
|
|
c.pool = newConnPool(c.opt)
|
|
})
|
|
return c.pool
|
|
}
|
|
|
|
func (c *sentinelFailover) dial() (net.Conn, error) {
|
|
addr, err := c.MasterAddr()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return net.DialTimeout("tcp", addr, c.opt.DialTimeout)
|
|
}
|
|
|
|
func (c *sentinelFailover) MasterAddr() (string, error) {
|
|
addr, err := c.masterAddr()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
c.switchMaster(addr)
|
|
return addr, nil
|
|
}
|
|
|
|
func (c *sentinelFailover) masterAddr() (string, error) {
|
|
addr := c.getMasterAddr()
|
|
if addr != "" {
|
|
return addr, nil
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
for i, sentinelAddr := range c.sentinelAddrs {
|
|
sentinel := NewSentinelClient(&Options{
|
|
Addr: sentinelAddr,
|
|
|
|
MaxRetries: c.opt.MaxRetries,
|
|
|
|
DialTimeout: c.opt.DialTimeout,
|
|
ReadTimeout: c.opt.ReadTimeout,
|
|
WriteTimeout: c.opt.WriteTimeout,
|
|
|
|
PoolSize: c.opt.PoolSize,
|
|
PoolTimeout: c.opt.PoolTimeout,
|
|
IdleTimeout: c.opt.IdleTimeout,
|
|
IdleCheckFrequency: c.opt.IdleCheckFrequency,
|
|
|
|
TLSConfig: c.opt.TLSConfig,
|
|
})
|
|
|
|
masterAddr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
|
|
if err != nil {
|
|
internal.Logf("sentinel: GetMasterAddrByName master=%q failed: %s",
|
|
c.masterName, err)
|
|
_ = sentinel.Close()
|
|
continue
|
|
}
|
|
|
|
// Push working sentinel to the top.
|
|
c.sentinelAddrs[0], c.sentinelAddrs[i] = c.sentinelAddrs[i], c.sentinelAddrs[0]
|
|
c.setSentinel(sentinel)
|
|
|
|
addr := net.JoinHostPort(masterAddr[0], masterAddr[1])
|
|
return addr, nil
|
|
}
|
|
|
|
return "", errors.New("redis: all sentinels are unreachable")
|
|
}
|
|
|
|
func (c *sentinelFailover) getMasterAddr() string {
|
|
c.mu.RLock()
|
|
sentinel := c.sentinel
|
|
c.mu.RUnlock()
|
|
|
|
if sentinel == nil {
|
|
return ""
|
|
}
|
|
|
|
addr, err := sentinel.GetMasterAddrByName(c.masterName).Result()
|
|
if err != nil {
|
|
internal.Logf("sentinel: GetMasterAddrByName name=%q failed: %s",
|
|
c.masterName, err)
|
|
c.mu.Lock()
|
|
if c.sentinel == sentinel {
|
|
c.closeSentinel()
|
|
}
|
|
c.mu.Unlock()
|
|
return ""
|
|
}
|
|
|
|
return net.JoinHostPort(addr[0], addr[1])
|
|
}
|
|
|
|
func (c *sentinelFailover) switchMaster(addr string) {
|
|
c.mu.RLock()
|
|
masterAddr := c._masterAddr
|
|
c.mu.RUnlock()
|
|
if masterAddr == addr {
|
|
return
|
|
}
|
|
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
internal.Logf("sentinel: new master=%q addr=%q",
|
|
c.masterName, addr)
|
|
_ = c.Pool().Filter(func(cn *pool.Conn) bool {
|
|
return cn.RemoteAddr().String() != addr
|
|
})
|
|
c._masterAddr = addr
|
|
}
|
|
|
|
func (c *sentinelFailover) setSentinel(sentinel *SentinelClient) {
|
|
c.discoverSentinels(sentinel)
|
|
c.sentinel = sentinel
|
|
go c.listen(sentinel)
|
|
}
|
|
|
|
func (c *sentinelFailover) closeSentinel() error {
|
|
if err := c.pubsub.Close(); err != nil {
|
|
return err
|
|
}
|
|
err := c.sentinel.Close()
|
|
c.sentinel = nil
|
|
return err
|
|
}
|
|
|
|
func (c *sentinelFailover) discoverSentinels(sentinel *SentinelClient) {
|
|
sentinels, err := sentinel.Sentinels(c.masterName).Result()
|
|
if err != nil {
|
|
internal.Logf("sentinel: Sentinels master=%q failed: %s", c.masterName, err)
|
|
return
|
|
}
|
|
for _, sentinel := range sentinels {
|
|
vals := sentinel.([]interface{})
|
|
for i := 0; i < len(vals); i += 2 {
|
|
key := vals[i].(string)
|
|
if key == "name" {
|
|
sentinelAddr := vals[i+1].(string)
|
|
if !contains(c.sentinelAddrs, sentinelAddr) {
|
|
internal.Logf("sentinel: discovered new sentinel=%q for master=%q",
|
|
sentinelAddr, c.masterName)
|
|
c.sentinelAddrs = append(c.sentinelAddrs, sentinelAddr)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *sentinelFailover) listen(sentinel *SentinelClient) {
|
|
c.pubsub = sentinel.Subscribe("+switch-master")
|
|
ch := c.pubsub.Channel()
|
|
for {
|
|
msg, ok := <-ch
|
|
if !ok {
|
|
break
|
|
}
|
|
|
|
switch msg.Channel {
|
|
case "+switch-master":
|
|
parts := strings.Split(msg.Payload, " ")
|
|
if parts[0] != c.masterName {
|
|
internal.Logf("sentinel: ignore addr for master=%q", parts[0])
|
|
continue
|
|
}
|
|
addr := net.JoinHostPort(parts[3], parts[4])
|
|
c.switchMaster(addr)
|
|
}
|
|
}
|
|
}
|
|
|
|
func contains(slice []string, str string) bool {
|
|
for _, s := range slice {
|
|
if s == str {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|