1
0
mirror of https://github.com/redis/go-redis.git synced 2025-04-17 20:17:02 +03:00

Merge pull request #1473 from go-redis/feature/failover-cluster-client

Feature/failover cluster client
This commit is contained in:
Vladimir Mihailenco 2020-09-09 18:37:14 +03:00 committed by GitHub
commit 655eaaa39d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 304 additions and 122 deletions

View File

@ -259,7 +259,7 @@ func BenchmarkClusterPing(b *testing.B) {
if err := startCluster(ctx, cluster); err != nil {
b.Fatal(err)
}
defer stopCluster(cluster)
defer cluster.Close()
client := cluster.newClusterClient(ctx, redisClusterOptions())
defer client.Close()
@ -286,7 +286,7 @@ func BenchmarkClusterSetString(b *testing.B) {
if err := startCluster(ctx, cluster); err != nil {
b.Fatal(err)
}
defer stopCluster(cluster)
defer cluster.Close()
client := cluster.newClusterClient(ctx, redisClusterOptions())
defer client.Close()
@ -315,7 +315,7 @@ func BenchmarkClusterReloadState(b *testing.B) {
if err := startCluster(ctx, cluster); err != nil {
b.Fatal(err)
}
defer stopCluster(cluster)
defer cluster.Close()
client := cluster.newClusterClient(ctx, redisClusterOptions())
defer client.Close()

View File

@ -49,7 +49,7 @@ type ClusterOptions struct {
// and load-balance read/write operations between master and slaves.
// It can use service like ZooKeeper to maintain configuration information
// and Cluster.ReloadState to manually trigger state reloading.
ClusterSlots func() ([]ClusterSlot, error)
ClusterSlots func(context.Context) ([]ClusterSlot, error)
// Following options are copied from Options struct.
@ -987,7 +987,7 @@ func (c *ClusterClient) PoolStats() *PoolStats {
func (c *ClusterClient) loadState(ctx context.Context) (*clusterState, error) {
if c.opt.ClusterSlots != nil {
slots, err := c.opt.ClusterSlots()
slots, err := c.opt.ClusterSlots(ctx)
if err != nil {
return nil, err
}

View File

@ -80,6 +80,14 @@ func (s *clusterScenario) newClusterClient(
return client
}
func (s *clusterScenario) Close() error {
for _, port := range s.ports {
processes[port].Close()
delete(processes, port)
}
return nil
}
func startCluster(ctx context.Context, scenario *clusterScenario) error {
// Start processes and collect node ids
for pos, port := range scenario.ports {
@ -221,20 +229,6 @@ func slotEqual(s1, s2 redis.ClusterSlot) bool {
return true
}
func stopCluster(scenario *clusterScenario) error {
for _, client := range scenario.clients {
if err := client.Close(); err != nil {
return err
}
}
for _, process := range scenario.processes {
if err := process.Close(); err != nil {
return err
}
}
return nil
}
//------------------------------------------------------------------------------
var _ = Describe("ClusterClient", func() {
@ -911,7 +905,7 @@ var _ = Describe("ClusterClient", func() {
failover = true
opt = redisClusterOptions()
opt.ClusterSlots = func() ([]redis.ClusterSlot, error) {
opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
slots := []redis.ClusterSlot{{
Start: 0,
End: 4999,
@ -965,7 +959,7 @@ var _ = Describe("ClusterClient", func() {
opt = redisClusterOptions()
opt.RouteRandomly = true
opt.ClusterSlots = func() ([]redis.ClusterSlot, error) {
opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
slots := []redis.ClusterSlot{{
Start: 0,
End: 4999,

View File

@ -2307,7 +2307,7 @@ func (c cmdable) SlaveOf(ctx context.Context, host, port string) *StatusCmd {
return cmd
}
func (c cmdable) SlowLog(ctx context.Context, num int64) *SlowLogCmd {
func (c cmdable) SlowLogGet(ctx context.Context, num int64) *SlowLogCmd {
n := strconv.FormatInt(num, 10)
cmd := NewSlowLogCmd(context.Background(), "slowlog", "get", n)
_ = c(ctx, cmd)

View File

@ -4014,7 +4014,7 @@ var _ = Describe("Commands", func() {
})
})
Describe("SlowLog", func() {
Describe("SlowLogGet", func() {
It("returns slow query result", func() {
const key = "slowlog-log-slower-than"
@ -4027,9 +4027,9 @@ var _ = Describe("Commands", func() {
client.Set(ctx, "test", "true", 0)
result, err := client.SlowLog(ctx, -1).Result()
result, err := client.SlowLogGet(ctx, -1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(result)).To(Equal(2))
Expect(len(result)).NotTo(BeZero())
})
})
})

View File

@ -80,7 +80,7 @@ func ExampleNewClusterClient_manualSetup() {
// clusterSlots returns cluster slots information.
// It can use service like ZooKeeper to maintain configuration information
// and Cluster.ReloadState to manually trigger state reloading.
clusterSlots := func() ([]redis.ClusterSlot, error) {
clusterSlots := func(ctx context.Context) ([]redis.ClusterSlot, error) {
slots := []redis.ClusterSlot{
// First node with 1 master and 1 slave.
{
@ -511,7 +511,7 @@ func ExampleNewUniversalClient_cluster() {
rdb.Ping(ctx)
}
func ExampleClient_SlowLog() {
func ExampleClient_SlowLogGet() {
const key = "slowlog-log-slower-than"
old := rdb.ConfigGet(ctx, key).Val()
@ -524,7 +524,7 @@ func ExampleClient_SlowLog() {
rdb.Set(ctx, "test", "true", 0)
result, err := rdb.SlowLog(ctx, -1).Result()
result, err := rdb.SlowLogGet(ctx, -1).Result()
if err != nil {
panic(err)
}

1
go.sum
View File

@ -40,6 +40,7 @@ github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA=
github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/ginkgo v1.14.1 h1:jMU0WaQrP0a/YAEq8eJmJKjBoMs+pClEr1vDMlM/Do4=
github.com/onsi/ginkgo v1.14.1/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=

View File

@ -41,12 +41,14 @@ const (
)
var (
sentinelAddrs = []string{":" + sentinelPort1, ":" + sentinelPort2, ":" + sentinelPort3}
processes map[string]*redisProcess
redisMain *redisProcess
ringShard1, ringShard2, ringShard3 *redisProcess
sentinelMaster, sentinelSlave1, sentinelSlave2 *redisProcess
sentinel1, sentinel2, sentinel3 *redisProcess
sentinelAddrs = []string{":" + sentinelPort1, ":" + sentinelPort2, ":" + sentinelPort3}
)
var cluster = &clusterScenario{
@ -56,6 +58,13 @@ var cluster = &clusterScenario{
clients: make(map[string]*redis.Client, 6),
}
func registerProcess(port string, p *redisProcess) {
if processes == nil {
processes = make(map[string]*redisProcess)
}
processes[port] = p
}
var _ = BeforeSuite(func() {
var err error
@ -95,20 +104,12 @@ var _ = BeforeSuite(func() {
})
var _ = AfterSuite(func() {
Expect(redisMain.Close()).NotTo(HaveOccurred())
Expect(cluster.Close()).NotTo(HaveOccurred())
Expect(ringShard1.Close()).NotTo(HaveOccurred())
Expect(ringShard2.Close()).NotTo(HaveOccurred())
Expect(ringShard3.Close()).NotTo(HaveOccurred())
Expect(sentinel1.Close()).NotTo(HaveOccurred())
Expect(sentinel2.Close()).NotTo(HaveOccurred())
Expect(sentinel3.Close()).NotTo(HaveOccurred())
Expect(sentinelSlave1.Close()).NotTo(HaveOccurred())
Expect(sentinelSlave2.Close()).NotTo(HaveOccurred())
Expect(sentinelMaster.Close()).NotTo(HaveOccurred())
Expect(stopCluster(cluster)).NotTo(HaveOccurred())
for _, p := range processes {
Expect(p.Close()).NotTo(HaveOccurred())
}
processes = nil
})
func TestGinkgoSuite(t *testing.T) {
@ -308,7 +309,10 @@ func startRedis(port string, args ...string) (*redisProcess, error) {
process.Kill()
return nil, err
}
return &redisProcess{process, client}, err
p := &redisProcess{process, client}
registerProcess(port, p)
return p, err
}
func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
@ -316,15 +320,18 @@ func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
if err != nil {
return nil, err
}
process, err := execCmd(redisServerBin, os.DevNull, "--sentinel", "--port", port, "--dir", dir)
if err != nil {
return nil, err
}
client, err := connectTo(port)
if err != nil {
process.Kill()
return nil, err
}
for _, cmd := range []*redis.StatusCmd{
redis.NewStatusCmd(ctx, "SENTINEL", "MONITOR", masterName, "127.0.0.1", masterPort, "2"),
redis.NewStatusCmd(ctx, "SENTINEL", "SET", masterName, "down-after-milliseconds", "500"),
@ -337,7 +344,10 @@ func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
return nil, err
}
}
return &redisProcess{process, client}, nil
p := &redisProcess{process, client}
registerProcess(port, p)
return p, nil
}
//------------------------------------------------------------------------------

View File

@ -104,9 +104,6 @@ type Options struct {
// Enables read only queries on slave nodes.
readOnly bool
// Enables read only queries on redis replicas in sentinel mode
sentinelReadOnly bool
// TLS Config to use. When set TLS will be negotiated.
TLSConfig *tls.Config

View File

@ -26,8 +26,8 @@ type FailoverOptions struct {
// Sentinel password from "requirepass <password>" (if enabled) in Sentinel configuration
SentinelPassword string
// Enables read-only commands on slave nodes.
ReadOnly bool
// Route all commands to slave read-only nodes.
SlaveOnly bool
// Following options are copied from Options struct.
@ -57,7 +57,7 @@ type FailoverOptions struct {
}
func (opt *FailoverOptions) options() *Options {
return &Options{
redisOpt := &Options{
Addr: "FailoverClient",
Dialer: opt.Dialer,
@ -83,28 +83,64 @@ func (opt *FailoverOptions) options() *Options {
MaxConnAge: opt.MaxConnAge,
TLSConfig: opt.TLSConfig,
sentinelReadOnly: opt.ReadOnly,
}
redisOpt.init()
return redisOpt
}
func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
clusterOpt := &ClusterOptions{
Dialer: opt.Dialer,
OnConnect: opt.OnConnect,
Username: opt.Username,
Password: opt.Password,
MaxRetries: opt.MaxRetries,
MinRetryBackoff: opt.MinRetryBackoff,
MaxRetryBackoff: opt.MaxRetryBackoff,
DialTimeout: opt.DialTimeout,
ReadTimeout: opt.ReadTimeout,
WriteTimeout: opt.WriteTimeout,
PoolSize: opt.PoolSize,
PoolTimeout: opt.PoolTimeout,
IdleTimeout: opt.IdleTimeout,
IdleCheckFrequency: opt.IdleCheckFrequency,
MinIdleConns: opt.MinIdleConns,
MaxConnAge: opt.MaxConnAge,
TLSConfig: opt.TLSConfig,
}
clusterOpt.init()
return clusterOpt
}
// NewFailoverClient returns a Redis client that uses Redis Sentinel
// for automatic failover. It's safe for concurrent use by multiple
// goroutines.
func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
opt := failoverOpt.options()
opt.init()
failover := &sentinelFailover{
masterName: failoverOpt.MasterName,
sentinelAddrs: failoverOpt.SentinelAddrs,
sentinelPassword: failoverOpt.SentinelPassword,
opt: opt,
opt: failoverOpt.options(),
}
opt := failoverOpt.options()
opt.Dialer = masterSlaveDialer(failover, failoverOpt.SlaveOnly)
connPool := newConnPool(opt)
failover.onFailover = func(ctx context.Context, addr string) {
_ = connPool.Filter(func(cn *pool.Conn) bool {
return cn.RemoteAddr().String() != addr
})
}
c := Client{
baseClient: newBaseClient(opt, failover.Pool()),
baseClient: newBaseClient(opt, connPool),
ctx: context.Background(),
}
c.cmdable = c.Process
@ -113,8 +149,35 @@ func NewFailoverClient(failoverOpt *FailoverOptions) *Client {
return &c
}
func masterSlaveDialer(
failover *sentinelFailover, slaveOnly bool,
) func(ctx context.Context, network, addr string) (net.Conn, error) {
return func(ctx context.Context, network, _ string) (net.Conn, error) {
var addr string
var err error
if slaveOnly {
addr, err = failover.RandomSlaveAddr(ctx)
} else {
addr, err = failover.MasterAddr(ctx)
if err == nil {
failover.trySwitchMaster(ctx, addr)
}
}
if err != nil {
return nil, err
}
if failover.opt.Dialer != nil {
return failover.opt.Dialer(ctx, network, addr)
}
return net.DialTimeout("tcp", addr, failover.opt.DialTimeout)
}
}
//------------------------------------------------------------------------------
// SentinelClient is a client for a Redis Sentinel.
type SentinelClient struct {
*baseClient
ctx context.Context
@ -283,14 +346,14 @@ func (c *SentinelClient) Remove(ctx context.Context, name string) *StringCmd {
return cmd
}
//------------------------------------------------------------------------------
type sentinelFailover struct {
sentinelAddrs []string
sentinelPassword string
opt *Options
pool *pool.ConnPool
poolOnce sync.Once
opt *Options
onFailover func(ctx context.Context, addr string)
mu sync.RWMutex
masterName string
@ -321,55 +384,18 @@ func (c *sentinelFailover) closeSentinel() error {
return firstErr
}
func (c *sentinelFailover) Pool() *pool.ConnPool {
c.poolOnce.Do(func() {
opt := *c.opt
opt.Dialer = c.dial
c.pool = newConnPool(&opt)
})
return c.pool
}
func (c *sentinelFailover) dial(ctx context.Context, network, _ string) (net.Conn, error) {
var addr string
var err error
if c.opt.sentinelReadOnly {
addr, err = c.RandomSlaveAddr(ctx)
} else {
addr, err = c.MasterAddr(ctx)
}
if err != nil {
return nil, err
}
if c.opt.Dialer != nil {
return c.opt.Dialer(ctx, network, addr)
}
return net.DialTimeout("tcp", addr, c.opt.DialTimeout)
}
func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
addr, err := c.masterAddr(ctx)
if err != nil {
return "", err
}
c.switchMaster(ctx, addr)
return addr, nil
}
func (c *sentinelFailover) RandomSlaveAddr(ctx context.Context) (string, error) {
addresses, err := c.slaveAddresses(ctx)
if err != nil {
return "", err
}
if len(addresses) < 1 {
if len(addresses) == 0 {
return c.MasterAddr(ctx)
}
return addresses[rand.Intn(len(addresses))], nil
}
func (c *sentinelFailover) masterAddr(ctx context.Context) (string, error) {
func (c *sentinelFailover) MasterAddr(ctx context.Context) (string, error) {
c.mu.RLock()
sentinel := c.sentinel
c.mu.RUnlock()
@ -553,27 +579,26 @@ func parseSlaveAddresses(addrs []interface{}) []string {
return nodes
}
func (c *sentinelFailover) switchMaster(ctx context.Context, addr string) {
func (c *sentinelFailover) trySwitchMaster(ctx context.Context, addr string) {
c.mu.RLock()
masterAddr := c._masterAddr
currentAddr := c._masterAddr
c.mu.RUnlock()
if masterAddr == addr {
if addr == currentAddr {
return
}
c.mu.Lock()
defer c.mu.Unlock()
if c._masterAddr == addr {
if addr == c._masterAddr {
return
}
c._masterAddr = addr
internal.Logger.Printf(ctx, "sentinel: new master=%q addr=%q",
c.masterName, addr)
_ = c.Pool().Filter(func(cn *pool.Conn) bool {
return cn.RemoteAddr().String() != addr
})
c._masterAddr = addr
go c.onFailover(ctx, addr)
}
func (c *sentinelFailover) setSentinel(ctx context.Context, sentinel *SentinelClient) {
@ -624,7 +649,7 @@ func (c *sentinelFailover) listen(pubsub *PubSub) {
continue
}
addr := net.JoinHostPort(parts[3], parts[4])
c.switchMaster(pubsub.getContext(), addr)
c.trySwitchMaster(pubsub.getContext(), addr)
}
}
}
@ -637,3 +662,54 @@ func contains(slice []string, str string) bool {
}
return false
}
//------------------------------------------------------------------------------
func NewFailoverClusterClient(failoverOpt *FailoverOptions) *ClusterClient {
failover := &sentinelFailover{
masterName: failoverOpt.MasterName,
sentinelAddrs: failoverOpt.SentinelAddrs,
opt: failoverOpt.options(),
}
opt := failoverOpt.clusterOptions()
opt.ClusterSlots = func(ctx context.Context) ([]ClusterSlot, error) {
masterAddr, err := failover.MasterAddr(ctx)
if err != nil {
return nil, err
}
nodes := []ClusterNode{{
Addr: masterAddr,
}}
slaveAddrs, err := failover.slaveAddresses(ctx)
if err != nil {
return nil, err
}
for _, slaveAddr := range slaveAddrs {
nodes = append(nodes, ClusterNode{
Addr: slaveAddr,
})
}
slots := []ClusterSlot{
{
Start: 0,
End: 16383,
Nodes: nodes,
},
}
return slots, nil
}
c := NewClusterClient(opt)
failover.onFailover = func(ctx context.Context, addr string) {
_ = c.ReloadState(ctx)
}
return c
}

View File

@ -1,6 +1,8 @@
package redis_test
import (
"net"
"github.com/go-redis/redis/v8"
. "github.com/onsi/ginkgo"
@ -9,6 +11,8 @@ import (
var _ = Describe("Sentinel", func() {
var client *redis.Client
var master *redis.Client
var masterPort string
BeforeEach(func() {
client = redis.NewFailoverClient(&redis.FailoverOptions{
@ -16,10 +20,23 @@ var _ = Describe("Sentinel", func() {
SentinelAddrs: sentinelAddrs,
})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
sentinel := redis.NewSentinelClient(&redis.Options{
Addr: ":" + sentinelPort1,
})
addr, err := sentinel.GetMasterAddrByName(ctx, sentinelName).Result()
Expect(err).NotTo(HaveOccurred())
master = redis.NewClient(&redis.Options{
Addr: net.JoinHostPort(addr[0], addr[1]),
})
masterPort = addr[1]
})
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
Expect(master.Close()).NotTo(HaveOccurred())
})
It("should facilitate failover", func() {
@ -28,7 +45,7 @@ var _ = Describe("Sentinel", func() {
Expect(err).NotTo(HaveOccurred())
// Verify.
val, err := sentinelMaster.Get(ctx, "foo").Result()
val, err := client.Get(ctx, "foo").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("master"))
@ -46,25 +63,21 @@ var _ = Describe("Sentinel", func() {
// Wait until slaves are picked up by sentinel.
Eventually(func() string {
return sentinel1.Info(ctx).Val()
}, "10s", "100ms").Should(ContainSubstring("slaves=2"))
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
Eventually(func() string {
return sentinel2.Info(ctx).Val()
}, "10s", "100ms").Should(ContainSubstring("slaves=2"))
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
Eventually(func() string {
return sentinel3.Info(ctx).Val()
}, "10s", "100ms").Should(ContainSubstring("slaves=2"))
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
// Kill master.
sentinelMaster.Shutdown(ctx)
err = master.Shutdown(ctx).Err()
Expect(err).NotTo(HaveOccurred())
Eventually(func() error {
return sentinelMaster.Ping(ctx).Err()
return master.Ping(ctx).Err()
}, "15s", "100ms").Should(HaveOccurred())
// Wait for Redis sentinel to elect new master.
Eventually(func() string {
return sentinelSlave1.Info(ctx).Val() + sentinelSlave2.Info(ctx).Val()
}, "30s", "100ms").Should(ContainSubstring("role:master"))
// Check that client picked up new master.
Eventually(func() error {
return client.Get(ctx, "foo").Err()
@ -75,9 +88,12 @@ var _ = Describe("Sentinel", func() {
Eventually(func() <-chan *redis.Message {
_ = client.Publish(ctx, "foo", "hello").Err()
return ch
}, "15s").Should(Receive(&msg))
}, "15s", "100ms").Should(Receive(&msg))
Expect(msg.Channel).To(Equal("foo"))
Expect(msg.Payload).To(Equal("hello"))
_, err = startRedis(masterPort)
Expect(err).NotTo(HaveOccurred())
})
It("supports DB selection", func() {
@ -92,3 +108,91 @@ var _ = Describe("Sentinel", func() {
Expect(err).NotTo(HaveOccurred())
})
})
var _ = Describe("NewFailoverClusterClient", func() {
var client *redis.ClusterClient
var master *redis.Client
var masterPort string
BeforeEach(func() {
client = redis.NewFailoverClusterClient(&redis.FailoverOptions{
MasterName: sentinelName,
SentinelAddrs: sentinelAddrs,
})
Expect(client.FlushDB(ctx).Err()).NotTo(HaveOccurred())
sentinel := redis.NewSentinelClient(&redis.Options{
Addr: ":" + sentinelPort1,
})
addr, err := sentinel.GetMasterAddrByName(ctx, sentinelName).Result()
Expect(err).NotTo(HaveOccurred())
master = redis.NewClient(&redis.Options{
Addr: net.JoinHostPort(addr[0], addr[1]),
})
masterPort = addr[1]
})
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
Expect(master.Close()).NotTo(HaveOccurred())
})
It("should facilitate failover", func() {
// Set value.
err := client.Set(ctx, "foo", "master", 0).Err()
Expect(err).NotTo(HaveOccurred())
// Verify.
val, err := client.Get(ctx, "foo").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("master"))
// Create subscription.
ch := client.Subscribe(ctx, "foo").Channel()
// Wait until replicated.
Eventually(func() string {
return sentinelSlave1.Get(ctx, "foo").Val()
}, "15s", "100ms").Should(Equal("master"))
Eventually(func() string {
return sentinelSlave2.Get(ctx, "foo").Val()
}, "15s", "100ms").Should(Equal("master"))
// Wait until slaves are picked up by sentinel.
Eventually(func() string {
return sentinel1.Info(ctx).Val()
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
Eventually(func() string {
return sentinel2.Info(ctx).Val()
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
Eventually(func() string {
return sentinel3.Info(ctx).Val()
}, "15s", "100ms").Should(ContainSubstring("slaves=2"))
// Kill master.
err = master.Shutdown(ctx).Err()
Expect(err).NotTo(HaveOccurred())
Eventually(func() error {
return sentinelMaster.Ping(ctx).Err()
}, "15s", "100ms").Should(HaveOccurred())
// Check that client picked up new master.
Eventually(func() error {
return client.Get(ctx, "foo").Err()
}, "15s", "100ms").ShouldNot(HaveOccurred())
// Check if subscription is renewed.
var msg *redis.Message
Eventually(func() <-chan *redis.Message {
_ = client.Publish(ctx, "foo", "hello").Err()
return ch
}, "15s", "100ms").Should(Receive(&msg))
Expect(msg.Channel).To(Equal("foo"))
Expect(msg.Payload).To(Equal("hello"))
_, err = startRedis(masterPort)
Expect(err).NotTo(HaveOccurred())
})
})