mirror of
https://github.com/redis/go-redis.git
synced 2025-04-16 09:23:06 +03:00
* introduce github workflow for ci similar to the one in redis-py use prerelease for 8.0-M4 * Enable osscluster tests in CI * Add redis major version env Enable filtering test per redis major version Fix test for FT.SEARCH WITHSCORE, the default scorer has changed. fix Makefile syntax remove filter from github action fix makefile use the container name in Makefile * remove 1.20 from doctests * self review, cleanup, add comments * add comments, reorder prints, add default value for REDIS_MAJOR_VERSION
1685 lines
44 KiB
Go
1685 lines
44 KiB
Go
package redis_test
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"errors"
|
|
"fmt"
|
|
"net"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
. "github.com/bsm/ginkgo/v2"
|
|
. "github.com/bsm/gomega"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
"github.com/redis/go-redis/v9/internal/hashtag"
|
|
)
|
|
|
|
type clusterScenario struct {
|
|
ports []string
|
|
nodeIDs []string
|
|
processes map[string]*redisProcess
|
|
clients map[string]*redis.Client
|
|
}
|
|
|
|
func (s *clusterScenario) slots() []int {
|
|
return []int{0, 5461, 10923, 16384}
|
|
}
|
|
|
|
func (s *clusterScenario) masters() []*redis.Client {
|
|
result := make([]*redis.Client, 3)
|
|
for pos, port := range s.ports[:3] {
|
|
result[pos] = s.clients[port]
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (s *clusterScenario) slaves() []*redis.Client {
|
|
result := make([]*redis.Client, 3)
|
|
for pos, port := range s.ports[3:] {
|
|
result[pos] = s.clients[port]
|
|
}
|
|
return result
|
|
}
|
|
|
|
func (s *clusterScenario) addrs() []string {
|
|
addrs := make([]string, len(s.ports))
|
|
for i, port := range s.ports {
|
|
addrs[i] = net.JoinHostPort("127.0.0.1", port)
|
|
}
|
|
return addrs
|
|
}
|
|
|
|
func (s *clusterScenario) newClusterClientUnstable(opt *redis.ClusterOptions) *redis.ClusterClient {
|
|
opt.Addrs = s.addrs()
|
|
return redis.NewClusterClient(opt)
|
|
}
|
|
|
|
func (s *clusterScenario) newClusterClient(
|
|
ctx context.Context, opt *redis.ClusterOptions,
|
|
) *redis.ClusterClient {
|
|
client := s.newClusterClientUnstable(opt)
|
|
|
|
err := eventually(func() error {
|
|
if opt.ClusterSlots != nil {
|
|
return nil
|
|
}
|
|
|
|
state, err := client.LoadState(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !state.IsConsistent(ctx) {
|
|
return fmt.Errorf("cluster state is not consistent")
|
|
}
|
|
|
|
return nil
|
|
}, 30*time.Second)
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
|
|
return client
|
|
}
|
|
|
|
func (s *clusterScenario) Close() error {
|
|
ctx := context.TODO()
|
|
for _, master := range s.masters() {
|
|
err := master.FlushAll(ctx).Err()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// since 7.2 forget calls should be propagated, calling only master
|
|
// nodes should be sufficient.
|
|
for _, nID := range s.nodeIDs {
|
|
master.ClusterForget(ctx, nID)
|
|
}
|
|
}
|
|
|
|
for _, port := range s.ports {
|
|
if process, ok := processes[port]; ok {
|
|
if process != nil {
|
|
process.Close()
|
|
}
|
|
|
|
delete(processes, port)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func configureClusterTopology(ctx context.Context, scenario *clusterScenario) error {
|
|
err := collectNodeInformation(ctx, scenario)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Meet cluster nodes.
|
|
for _, client := range scenario.clients {
|
|
err := client.ClusterMeet(ctx, "127.0.0.1", scenario.ports[0]).Err()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
slots := scenario.slots()
|
|
for pos, master := range scenario.masters() {
|
|
err := master.ClusterAddSlotsRange(ctx, slots[pos], slots[pos+1]-1).Err()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Bootstrap slaves.
|
|
for idx, slave := range scenario.slaves() {
|
|
masterID := scenario.nodeIDs[idx]
|
|
|
|
// Wait until master is available
|
|
err := eventually(func() error {
|
|
s := slave.ClusterNodes(ctx).Val()
|
|
wanted := masterID
|
|
if !strings.Contains(s, wanted) {
|
|
return fmt.Errorf("%q does not contain %q", s, wanted)
|
|
}
|
|
return nil
|
|
}, 10*time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = slave.ClusterReplicate(ctx, masterID).Err()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// Wait until all nodes have consistent info.
|
|
wanted := []redis.ClusterSlot{{
|
|
Start: 0,
|
|
End: 5460,
|
|
Nodes: []redis.ClusterNode{{
|
|
ID: "",
|
|
Addr: "127.0.0.1:16600",
|
|
}, {
|
|
ID: "",
|
|
Addr: "127.0.0.1:16603",
|
|
}},
|
|
}, {
|
|
Start: 5461,
|
|
End: 10922,
|
|
Nodes: []redis.ClusterNode{{
|
|
ID: "",
|
|
Addr: "127.0.0.1:16601",
|
|
}, {
|
|
ID: "",
|
|
Addr: "127.0.0.1:16604",
|
|
}},
|
|
}, {
|
|
Start: 10923,
|
|
End: 16383,
|
|
Nodes: []redis.ClusterNode{{
|
|
ID: "",
|
|
Addr: "127.0.0.1:16602",
|
|
}, {
|
|
ID: "",
|
|
Addr: "127.0.0.1:16605",
|
|
}},
|
|
}}
|
|
|
|
for _, client := range scenario.clients {
|
|
err := eventually(func() error {
|
|
res, err := client.ClusterSlots(ctx).Result()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return assertSlotsEqual(res, wanted)
|
|
}, 60*time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func collectNodeInformation(ctx context.Context, scenario *clusterScenario) error {
|
|
for pos, port := range scenario.ports {
|
|
client := redis.NewClient(&redis.Options{
|
|
Addr: ":" + port,
|
|
})
|
|
|
|
info, err := client.ClusterNodes(ctx).Result()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
scenario.clients[port] = client
|
|
scenario.nodeIDs[pos] = info[:40]
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// startCluster start a cluster
|
|
func startCluster(ctx context.Context, scenario *clusterScenario) error {
|
|
// Start processes and collect node ids
|
|
for _, port := range scenario.ports {
|
|
process, err := startRedis(port, "--cluster-enabled", "yes")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
scenario.processes[port] = process
|
|
}
|
|
|
|
return configureClusterTopology(ctx, scenario)
|
|
}
|
|
|
|
func assertSlotsEqual(slots, wanted []redis.ClusterSlot) error {
|
|
outerLoop:
|
|
for _, s2 := range wanted {
|
|
for _, s1 := range slots {
|
|
if slotEqual(s1, s2) {
|
|
continue outerLoop
|
|
}
|
|
}
|
|
return fmt.Errorf("%v not found in %v", s2, slots)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func slotEqual(s1, s2 redis.ClusterSlot) bool {
|
|
if s1.Start != s2.Start {
|
|
return false
|
|
}
|
|
if s1.End != s2.End {
|
|
return false
|
|
}
|
|
if len(s1.Nodes) != len(s2.Nodes) {
|
|
return false
|
|
}
|
|
for i, n1 := range s1.Nodes {
|
|
if n1.Addr != s2.Nodes[i].Addr {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
var _ = Describe("ClusterClient", func() {
|
|
var failover bool
|
|
var opt *redis.ClusterOptions
|
|
var client *redis.ClusterClient
|
|
|
|
assertClusterClient := func() {
|
|
It("should GET/SET/DEL", func() {
|
|
err := client.Get(ctx, "A").Err()
|
|
Expect(err).To(Equal(redis.Nil))
|
|
|
|
err = client.Set(ctx, "A", "VALUE", 0).Err()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
Eventually(func() string {
|
|
return client.Get(ctx, "A").Val()
|
|
}, 30*time.Second).Should(Equal("VALUE"))
|
|
|
|
cnt, err := client.Del(ctx, "A").Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(cnt).To(Equal(int64(1)))
|
|
})
|
|
|
|
It("GET follows redirects", func() {
|
|
err := client.Set(ctx, "A", "VALUE", 0).Err()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
if !failover {
|
|
Eventually(func() int64 {
|
|
nodes, err := client.Nodes(ctx, "A")
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
return nodes[1].Client.DBSize(ctx).Val()
|
|
}, 30*time.Second).Should(Equal(int64(1)))
|
|
|
|
Eventually(func() error {
|
|
return client.SwapNodes(ctx, "A")
|
|
}, 30*time.Second).ShouldNot(HaveOccurred())
|
|
}
|
|
|
|
v, err := client.Get(ctx, "A").Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(v).To(Equal("VALUE"))
|
|
})
|
|
|
|
It("SET follows redirects", func() {
|
|
if !failover {
|
|
Eventually(func() error {
|
|
return client.SwapNodes(ctx, "A")
|
|
}, 30*time.Second).ShouldNot(HaveOccurred())
|
|
}
|
|
|
|
err := client.Set(ctx, "A", "VALUE", 0).Err()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
v, err := client.Get(ctx, "A").Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(v).To(Equal("VALUE"))
|
|
})
|
|
|
|
It("distributes keys", func() {
|
|
for i := 0; i < 100; i++ {
|
|
err := client.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
}
|
|
|
|
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
|
defer GinkgoRecover()
|
|
Eventually(func() string {
|
|
return master.Info(ctx, "keyspace").Val()
|
|
}, 30*time.Second).Should(Or(
|
|
ContainSubstring("keys=32"),
|
|
ContainSubstring("keys=36"),
|
|
ContainSubstring("keys=32"),
|
|
))
|
|
return nil
|
|
})
|
|
|
|
Expect(err).NotTo(HaveOccurred())
|
|
})
|
|
|
|
It("distributes keys when using EVAL", func() {
|
|
script := redis.NewScript(`
|
|
local r = redis.call('SET', KEYS[1], ARGV[1])
|
|
return r
|
|
`)
|
|
|
|
var key string
|
|
for i := 0; i < 100; i++ {
|
|
key = fmt.Sprintf("key%d", i)
|
|
err := script.Run(ctx, client, []string{key}, "value").Err()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
}
|
|
|
|
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
|
defer GinkgoRecover()
|
|
Eventually(func() string {
|
|
return master.Info(ctx, "keyspace").Val()
|
|
}, 30*time.Second).Should(Or(
|
|
ContainSubstring("keys=32"),
|
|
ContainSubstring("keys=36"),
|
|
ContainSubstring("keys=32"),
|
|
))
|
|
return nil
|
|
})
|
|
|
|
Expect(err).NotTo(HaveOccurred())
|
|
})
|
|
|
|
It("distributes scripts when using Script Load", func() {
|
|
client.ScriptFlush(ctx)
|
|
|
|
script := redis.NewScript(`return 'Unique script'`)
|
|
|
|
script.Load(ctx, client)
|
|
|
|
err := client.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
|
|
defer GinkgoRecover()
|
|
|
|
val, _ := script.Exists(ctx, shard).Result()
|
|
Expect(val[0]).To(Equal(true))
|
|
return nil
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
})
|
|
|
|
It("checks all shards when using Script Exists", func() {
|
|
client.ScriptFlush(ctx)
|
|
|
|
script := redis.NewScript(`return 'First script'`)
|
|
lostScriptSrc := `return 'Lost script'`
|
|
lostScript := redis.NewScript(lostScriptSrc)
|
|
|
|
script.Load(ctx, client)
|
|
client.Do(ctx, "script", "load", lostScriptSrc)
|
|
|
|
val, _ := client.ScriptExists(ctx, script.Hash(), lostScript.Hash()).Result()
|
|
|
|
Expect(val).To(Equal([]bool{true, false}))
|
|
})
|
|
|
|
It("flushes scripts from all shards when using ScriptFlush", func() {
|
|
script := redis.NewScript(`return 'Unnecessary script'`)
|
|
script.Load(ctx, client)
|
|
|
|
val, _ := client.ScriptExists(ctx, script.Hash()).Result()
|
|
Expect(val).To(Equal([]bool{true}))
|
|
|
|
client.ScriptFlush(ctx)
|
|
|
|
val, _ = client.ScriptExists(ctx, script.Hash()).Result()
|
|
Expect(val).To(Equal([]bool{false}))
|
|
})
|
|
|
|
It("supports Watch", func() {
|
|
var incr func(string) error
|
|
|
|
// Transactionally increments key using GET and SET commands.
|
|
incr = func(key string) error {
|
|
err := client.Watch(ctx, func(tx *redis.Tx) error {
|
|
n, err := tx.Get(ctx, key).Int64()
|
|
if err != nil && err != redis.Nil {
|
|
return err
|
|
}
|
|
|
|
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
|
|
pipe.Set(ctx, key, strconv.FormatInt(n+1, 10), 0)
|
|
return nil
|
|
})
|
|
return err
|
|
}, key)
|
|
if err == redis.TxFailedErr {
|
|
return incr(key)
|
|
}
|
|
return err
|
|
}
|
|
|
|
var wg sync.WaitGroup
|
|
for i := 0; i < 100; i++ {
|
|
wg.Add(1)
|
|
go func() {
|
|
defer GinkgoRecover()
|
|
defer wg.Done()
|
|
|
|
err := incr("key")
|
|
Expect(err).NotTo(HaveOccurred())
|
|
}()
|
|
}
|
|
wg.Wait()
|
|
|
|
Eventually(func() string {
|
|
return client.Get(ctx, "key").Val()
|
|
}, 30*time.Second).Should(Equal("100"))
|
|
})
|
|
|
|
Describe("pipelining", func() {
|
|
var pipe *redis.Pipeline
|
|
|
|
assertPipeline := func() {
|
|
keys := []string{"A", "B", "C", "D", "E", "F", "G"}
|
|
|
|
It("follows redirects", func() {
|
|
if !failover {
|
|
for _, key := range keys {
|
|
Eventually(func() error {
|
|
return client.SwapNodes(ctx, key)
|
|
}, 30*time.Second).ShouldNot(HaveOccurred())
|
|
}
|
|
}
|
|
|
|
for i, key := range keys {
|
|
pipe.Set(ctx, key, key+"_value", 0)
|
|
pipe.Expire(ctx, key, time.Duration(i+1)*time.Hour)
|
|
}
|
|
cmds, err := pipe.Exec(ctx)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(cmds).To(HaveLen(14))
|
|
|
|
_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
|
|
defer GinkgoRecover()
|
|
Eventually(func() int64 {
|
|
return node.DBSize(ctx).Val()
|
|
}, 30*time.Second).ShouldNot(BeZero())
|
|
return nil
|
|
})
|
|
|
|
if !failover {
|
|
for _, key := range keys {
|
|
Eventually(func() error {
|
|
return client.SwapNodes(ctx, key)
|
|
}, 30*time.Second).ShouldNot(HaveOccurred())
|
|
}
|
|
}
|
|
|
|
for _, key := range keys {
|
|
pipe.Get(ctx, key)
|
|
pipe.TTL(ctx, key)
|
|
}
|
|
cmds, err = pipe.Exec(ctx)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(cmds).To(HaveLen(14))
|
|
|
|
for i, key := range keys {
|
|
get := cmds[i*2].(*redis.StringCmd)
|
|
Expect(get.Val()).To(Equal(key + "_value"))
|
|
|
|
ttl := cmds[(i*2)+1].(*redis.DurationCmd)
|
|
dur := time.Duration(i+1) * time.Hour
|
|
Expect(ttl.Val()).To(BeNumerically("~", dur, 30*time.Second))
|
|
}
|
|
})
|
|
|
|
It("works with missing keys", func() {
|
|
pipe.Set(ctx, "A", "A_value", 0)
|
|
pipe.Set(ctx, "C", "C_value", 0)
|
|
_, err := pipe.Exec(ctx)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
a := pipe.Get(ctx, "A")
|
|
b := pipe.Get(ctx, "B")
|
|
c := pipe.Get(ctx, "C")
|
|
cmds, err := pipe.Exec(ctx)
|
|
Expect(err).To(Equal(redis.Nil))
|
|
Expect(cmds).To(HaveLen(3))
|
|
|
|
Expect(a.Err()).NotTo(HaveOccurred())
|
|
Expect(a.Val()).To(Equal("A_value"))
|
|
|
|
Expect(b.Err()).To(Equal(redis.Nil))
|
|
Expect(b.Val()).To(Equal(""))
|
|
|
|
Expect(c.Err()).NotTo(HaveOccurred())
|
|
Expect(c.Val()).To(Equal("C_value"))
|
|
})
|
|
}
|
|
|
|
Describe("with Pipeline", func() {
|
|
BeforeEach(func() {
|
|
pipe = client.Pipeline().(*redis.Pipeline)
|
|
})
|
|
|
|
AfterEach(func() {})
|
|
|
|
assertPipeline()
|
|
})
|
|
|
|
Describe("with TxPipeline", func() {
|
|
BeforeEach(func() {
|
|
pipe = client.TxPipeline().(*redis.Pipeline)
|
|
})
|
|
|
|
AfterEach(func() {})
|
|
|
|
assertPipeline()
|
|
})
|
|
})
|
|
|
|
It("supports PubSub", func() {
|
|
pubsub := client.Subscribe(ctx, "mychannel")
|
|
defer pubsub.Close()
|
|
|
|
Eventually(func() error {
|
|
_, err := client.Publish(ctx, "mychannel", "hello").Result()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
msg, err := pubsub.ReceiveTimeout(ctx, time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, ok := msg.(*redis.Message)
|
|
if !ok {
|
|
return fmt.Errorf("got %T, wanted *redis.Message", msg)
|
|
}
|
|
|
|
return nil
|
|
}, 30*time.Second).ShouldNot(HaveOccurred())
|
|
})
|
|
|
|
It("supports sharded PubSub", func() {
|
|
pubsub := client.SSubscribe(ctx, "mychannel")
|
|
defer pubsub.Close()
|
|
|
|
Eventually(func() error {
|
|
_, err := client.SPublish(ctx, "mychannel", "hello").Result()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
msg, err := pubsub.ReceiveTimeout(ctx, time.Second)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, ok := msg.(*redis.Message)
|
|
if !ok {
|
|
return fmt.Errorf("got %T, wanted *redis.Message", msg)
|
|
}
|
|
|
|
return nil
|
|
}, 30*time.Second).ShouldNot(HaveOccurred())
|
|
})
|
|
|
|
It("supports PubSub.Ping without channels", func() {
|
|
pubsub := client.Subscribe(ctx)
|
|
defer pubsub.Close()
|
|
|
|
err := pubsub.Ping(ctx)
|
|
Expect(err).NotTo(HaveOccurred())
|
|
})
|
|
}
|
|
|
|
Describe("ClusterClient PROTO 2", func() {
|
|
BeforeEach(func() {
|
|
opt = redisClusterOptions()
|
|
opt.Protocol = 2
|
|
client = cluster.newClusterClient(ctx, opt)
|
|
|
|
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
|
return master.FlushDB(ctx).Err()
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
})
|
|
|
|
AfterEach(func() {
|
|
_ = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
|
return master.FlushDB(ctx).Err()
|
|
})
|
|
Expect(client.Close()).NotTo(HaveOccurred())
|
|
})
|
|
|
|
It("should CLUSTER PROTO 2", func() {
|
|
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
|
|
val, err := c.Do(ctx, "HELLO").Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(val).Should(ContainElements("proto", int64(2)))
|
|
return nil
|
|
})
|
|
})
|
|
})
|
|
|
|
Describe("ClusterClient", func() {
|
|
BeforeEach(func() {
|
|
opt = redisClusterOptions()
|
|
opt.ClientName = "cluster_hi"
|
|
client = cluster.newClusterClient(ctx, opt)
|
|
|
|
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
|
return master.FlushDB(ctx).Err()
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
})
|
|
|
|
AfterEach(func() {
|
|
_ = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
|
return master.FlushDB(ctx).Err()
|
|
})
|
|
Expect(client.Close()).NotTo(HaveOccurred())
|
|
})
|
|
|
|
It("returns pool stats", func() {
|
|
stats := client.PoolStats()
|
|
Expect(stats).To(BeAssignableToTypeOf(&redis.PoolStats{}))
|
|
})
|
|
|
|
It("returns an error when there are no attempts left", func() {
|
|
opt := redisClusterOptions()
|
|
opt.MaxRedirects = -1
|
|
client := cluster.newClusterClient(ctx, opt)
|
|
|
|
Eventually(func() error {
|
|
return client.SwapNodes(ctx, "A")
|
|
}, 30*time.Second).ShouldNot(HaveOccurred())
|
|
|
|
err := client.Get(ctx, "A").Err()
|
|
Expect(err).To(HaveOccurred())
|
|
Expect(err.Error()).To(ContainSubstring("MOVED"))
|
|
|
|
Expect(client.Close()).NotTo(HaveOccurred())
|
|
})
|
|
|
|
It("determines hash slots correctly for generic commands", func() {
|
|
opt := redisClusterOptions()
|
|
opt.MaxRedirects = -1
|
|
client := cluster.newClusterClient(ctx, opt)
|
|
|
|
err := client.Do(ctx, "GET", "A").Err()
|
|
Expect(err).To(Equal(redis.Nil))
|
|
|
|
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
|
|
Expect(err).To(Equal(redis.Nil))
|
|
|
|
Eventually(func() error {
|
|
return client.SwapNodes(ctx, "A")
|
|
}, 30*time.Second).ShouldNot(HaveOccurred())
|
|
|
|
err = client.Do(ctx, "GET", "A").Err()
|
|
Expect(err).To(HaveOccurred())
|
|
Expect(err.Error()).To(ContainSubstring("MOVED"))
|
|
|
|
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
|
|
Expect(err).To(HaveOccurred())
|
|
Expect(err.Error()).To(ContainSubstring("MOVED"))
|
|
|
|
Expect(client.Close()).NotTo(HaveOccurred())
|
|
})
|
|
|
|
It("follows node redirection immediately", func() {
|
|
// Configure retry backoffs far in excess of the expected duration of redirection
|
|
opt := redisClusterOptions()
|
|
opt.MinRetryBackoff = 10 * time.Minute
|
|
opt.MaxRetryBackoff = 20 * time.Minute
|
|
client := cluster.newClusterClient(ctx, opt)
|
|
|
|
Eventually(func() error {
|
|
return client.SwapNodes(ctx, "A")
|
|
}, 30*time.Second).ShouldNot(HaveOccurred())
|
|
|
|
// Note that this context sets a deadline more aggressive than the lowest possible bound
|
|
// of the retry backoff; this verifies that redirection completes immediately.
|
|
redirCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
err := client.Set(redirCtx, "A", "VALUE", 0).Err()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
v, err := client.Get(redirCtx, "A").Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(v).To(Equal("VALUE"))
|
|
|
|
Expect(client.Close()).NotTo(HaveOccurred())
|
|
})
|
|
|
|
It("calls fn for every master node", func() {
|
|
for i := 0; i < 10; i++ {
|
|
Expect(client.Set(ctx, strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred())
|
|
}
|
|
|
|
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
|
return master.FlushDB(ctx).Err()
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
size, err := client.DBSize(ctx).Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(size).To(Equal(int64(0)))
|
|
})
|
|
|
|
It("should CLUSTER SLOTS", func() {
|
|
res, err := client.ClusterSlots(ctx).Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(res).To(HaveLen(3))
|
|
|
|
wanted := []redis.ClusterSlot{{
|
|
Start: 0,
|
|
End: 5460,
|
|
Nodes: []redis.ClusterNode{{
|
|
ID: "",
|
|
Addr: "127.0.0.1:16600",
|
|
}, {
|
|
ID: "",
|
|
Addr: "127.0.0.1:16603",
|
|
}},
|
|
}, {
|
|
Start: 5461,
|
|
End: 10922,
|
|
Nodes: []redis.ClusterNode{{
|
|
ID: "",
|
|
Addr: "127.0.0.1:16601",
|
|
}, {
|
|
ID: "",
|
|
Addr: "127.0.0.1:16604",
|
|
}},
|
|
}, {
|
|
Start: 10923,
|
|
End: 16383,
|
|
Nodes: []redis.ClusterNode{{
|
|
ID: "",
|
|
Addr: "127.0.0.1:16602",
|
|
}, {
|
|
ID: "",
|
|
Addr: "127.0.0.1:16605",
|
|
}},
|
|
}}
|
|
Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
|
|
})
|
|
|
|
It("should CLUSTER SHARDS", func() {
|
|
res, err := client.ClusterShards(ctx).Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(res).NotTo(BeEmpty())
|
|
|
|
// Iterate over the ClusterShard results and validate the fields.
|
|
for _, shard := range res {
|
|
Expect(shard.Slots).NotTo(BeEmpty())
|
|
for _, slotRange := range shard.Slots {
|
|
Expect(slotRange.Start).To(BeNumerically(">=", 0))
|
|
Expect(slotRange.End).To(BeNumerically(">=", slotRange.Start))
|
|
}
|
|
|
|
Expect(shard.Nodes).NotTo(BeEmpty())
|
|
for _, node := range shard.Nodes {
|
|
Expect(node.ID).NotTo(BeEmpty())
|
|
Expect(node.Endpoint).NotTo(BeEmpty())
|
|
Expect(node.IP).NotTo(BeEmpty())
|
|
Expect(node.Port).To(BeNumerically(">", 0))
|
|
|
|
validRoles := []string{"master", "slave", "replica"}
|
|
Expect(validRoles).To(ContainElement(node.Role))
|
|
|
|
Expect(node.ReplicationOffset).To(BeNumerically(">=", 0))
|
|
|
|
validHealthStatuses := []string{"online", "failed", "loading"}
|
|
Expect(validHealthStatuses).To(ContainElement(node.Health))
|
|
}
|
|
}
|
|
})
|
|
|
|
It("should CLUSTER LINKS", func() {
|
|
res, err := client.ClusterLinks(ctx).Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(res).NotTo(BeEmpty())
|
|
|
|
// Iterate over the ClusterLink results and validate the map keys.
|
|
for _, link := range res {
|
|
|
|
Expect(link.Direction).NotTo(BeEmpty())
|
|
Expect([]string{"from", "to"}).To(ContainElement(link.Direction))
|
|
Expect(link.Node).NotTo(BeEmpty())
|
|
Expect(link.CreateTime).To(BeNumerically(">", 0))
|
|
|
|
Expect(link.Events).NotTo(BeEmpty())
|
|
validEventChars := []rune{'r', 'w'}
|
|
for _, eventChar := range link.Events {
|
|
Expect(validEventChars).To(ContainElement(eventChar))
|
|
}
|
|
|
|
Expect(link.SendBufferAllocated).To(BeNumerically(">=", 0))
|
|
Expect(link.SendBufferUsed).To(BeNumerically(">=", 0))
|
|
}
|
|
})
|
|
|
|
It("should cluster client setname", func() {
|
|
err := client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
|
|
return c.Ping(ctx).Err()
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
|
|
val, err := c.ClientList(ctx).Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(val).Should(ContainSubstring("name=cluster_hi"))
|
|
return nil
|
|
})
|
|
})
|
|
|
|
It("should CLUSTER PROTO 3", func() {
|
|
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
|
|
val, err := c.Do(ctx, "HELLO").Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
|
|
return nil
|
|
})
|
|
})
|
|
|
|
It("should CLUSTER MYSHARDID", func() {
|
|
shardID, err := client.ClusterMyShardID(ctx).Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(shardID).ToNot(BeEmpty())
|
|
})
|
|
|
|
It("should CLUSTER NODES", func() {
|
|
res, err := client.ClusterNodes(ctx).Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(len(res)).To(BeNumerically(">", 400))
|
|
})
|
|
|
|
It("should CLUSTER INFO", func() {
|
|
res, err := client.ClusterInfo(ctx).Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
|
|
})
|
|
|
|
It("should CLUSTER KEYSLOT", func() {
|
|
hashSlot, err := client.ClusterKeySlot(ctx, "somekey").Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
|
|
})
|
|
|
|
It("should CLUSTER GETKEYSINSLOT", func() {
|
|
keys, err := client.ClusterGetKeysInSlot(ctx, hashtag.Slot("somekey"), 1).Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(len(keys)).To(Equal(0))
|
|
})
|
|
|
|
It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
|
|
n, err := client.ClusterCountFailureReports(ctx, cluster.nodeIDs[0]).Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(n).To(Equal(int64(0)))
|
|
})
|
|
|
|
It("should CLUSTER COUNTKEYSINSLOT", func() {
|
|
n, err := client.ClusterCountKeysInSlot(ctx, 10).Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(n).To(Equal(int64(0)))
|
|
})
|
|
|
|
It("should CLUSTER SAVECONFIG", func() {
|
|
res, err := client.ClusterSaveConfig(ctx).Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(res).To(Equal("OK"))
|
|
})
|
|
|
|
It("should CLUSTER SLAVES", func() {
|
|
nodesList, err := client.ClusterSlaves(ctx, cluster.nodeIDs[0]).Result()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
|
|
Expect(nodesList).Should(HaveLen(1))
|
|
})
|
|
|
|
It("should RANDOMKEY", func() {
|
|
const nkeys = 100
|
|
|
|
for i := 0; i < nkeys; i++ {
|
|
err := client.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
}
|
|
|
|
var keys []string
|
|
addKey := func(key string) {
|
|
for _, k := range keys {
|
|
if k == key {
|
|
return
|
|
}
|
|
}
|
|
keys = append(keys, key)
|
|
}
|
|
|
|
for i := 0; i < nkeys*10; i++ {
|
|
key := client.RandomKey(ctx).Val()
|
|
addKey(key)
|
|
}
|
|
|
|
Expect(len(keys)).To(BeNumerically("~", nkeys, nkeys/10))
|
|
})
|
|
|
|
It("supports Process hook", func() {
|
|
testCtx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
err := client.Ping(ctx).Err()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
|
|
return node.Ping(ctx).Err()
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
var stack []string
|
|
|
|
clusterHook := &hook{
|
|
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
|
|
return func(ctx context.Context, cmd redis.Cmder) error {
|
|
select {
|
|
case <-testCtx.Done():
|
|
return hook(ctx, cmd)
|
|
default:
|
|
}
|
|
|
|
Expect(cmd.String()).To(Equal("ping: "))
|
|
stack = append(stack, "cluster.BeforeProcess")
|
|
|
|
err := hook(ctx, cmd)
|
|
|
|
Expect(cmd.String()).To(Equal("ping: PONG"))
|
|
stack = append(stack, "cluster.AfterProcess")
|
|
|
|
return err
|
|
}
|
|
},
|
|
}
|
|
client.AddHook(clusterHook)
|
|
|
|
nodeHook := &hook{
|
|
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
|
|
return func(ctx context.Context, cmd redis.Cmder) error {
|
|
select {
|
|
case <-testCtx.Done():
|
|
return hook(ctx, cmd)
|
|
default:
|
|
}
|
|
|
|
Expect(cmd.String()).To(Equal("ping: "))
|
|
stack = append(stack, "shard.BeforeProcess")
|
|
|
|
err := hook(ctx, cmd)
|
|
|
|
Expect(cmd.String()).To(Equal("ping: PONG"))
|
|
stack = append(stack, "shard.AfterProcess")
|
|
|
|
return err
|
|
}
|
|
},
|
|
}
|
|
|
|
_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
|
|
node.AddHook(nodeHook)
|
|
return nil
|
|
})
|
|
|
|
err = client.Ping(ctx).Err()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(stack).To(Equal([]string{
|
|
"cluster.BeforeProcess",
|
|
"shard.BeforeProcess",
|
|
"shard.AfterProcess",
|
|
"cluster.AfterProcess",
|
|
}))
|
|
})
|
|
|
|
It("supports Pipeline hook", func() {
|
|
err := client.Ping(ctx).Err()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
|
|
return node.Ping(ctx).Err()
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
var stack []string
|
|
|
|
client.AddHook(&hook{
|
|
processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
|
|
return func(ctx context.Context, cmds []redis.Cmder) error {
|
|
Expect(cmds).To(HaveLen(1))
|
|
Expect(cmds[0].String()).To(Equal("ping: "))
|
|
stack = append(stack, "cluster.BeforeProcessPipeline")
|
|
|
|
err := hook(ctx, cmds)
|
|
|
|
Expect(cmds).To(HaveLen(1))
|
|
Expect(cmds[0].String()).To(Equal("ping: PONG"))
|
|
stack = append(stack, "cluster.AfterProcessPipeline")
|
|
|
|
return err
|
|
}
|
|
},
|
|
})
|
|
|
|
_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
|
|
node.AddHook(&hook{
|
|
processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
|
|
return func(ctx context.Context, cmds []redis.Cmder) error {
|
|
Expect(cmds).To(HaveLen(1))
|
|
Expect(cmds[0].String()).To(Equal("ping: "))
|
|
stack = append(stack, "shard.BeforeProcessPipeline")
|
|
|
|
err := hook(ctx, cmds)
|
|
|
|
Expect(cmds).To(HaveLen(1))
|
|
Expect(cmds[0].String()).To(Equal("ping: PONG"))
|
|
stack = append(stack, "shard.AfterProcessPipeline")
|
|
|
|
return err
|
|
}
|
|
},
|
|
})
|
|
return nil
|
|
})
|
|
|
|
_, err = client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
|
|
pipe.Ping(ctx)
|
|
return nil
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(stack).To(Equal([]string{
|
|
"cluster.BeforeProcessPipeline",
|
|
"shard.BeforeProcessPipeline",
|
|
"shard.AfterProcessPipeline",
|
|
"cluster.AfterProcessPipeline",
|
|
}))
|
|
})
|
|
|
|
It("supports TxPipeline hook", func() {
|
|
err := client.Ping(ctx).Err()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
|
|
return node.Ping(ctx).Err()
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
var stack []string
|
|
|
|
client.AddHook(&hook{
|
|
processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
|
|
return func(ctx context.Context, cmds []redis.Cmder) error {
|
|
Expect(cmds).To(HaveLen(3))
|
|
Expect(cmds[1].String()).To(Equal("ping: "))
|
|
stack = append(stack, "cluster.BeforeProcessPipeline")
|
|
|
|
err := hook(ctx, cmds)
|
|
|
|
Expect(cmds).To(HaveLen(3))
|
|
Expect(cmds[1].String()).To(Equal("ping: PONG"))
|
|
stack = append(stack, "cluster.AfterProcessPipeline")
|
|
|
|
return err
|
|
}
|
|
},
|
|
})
|
|
|
|
_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
|
|
node.AddHook(&hook{
|
|
processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
|
|
return func(ctx context.Context, cmds []redis.Cmder) error {
|
|
Expect(cmds).To(HaveLen(3))
|
|
Expect(cmds[1].String()).To(Equal("ping: "))
|
|
stack = append(stack, "shard.BeforeProcessPipeline")
|
|
|
|
err := hook(ctx, cmds)
|
|
|
|
Expect(cmds).To(HaveLen(3))
|
|
Expect(cmds[1].String()).To(Equal("ping: PONG"))
|
|
stack = append(stack, "shard.AfterProcessPipeline")
|
|
|
|
return err
|
|
}
|
|
},
|
|
})
|
|
return nil
|
|
})
|
|
|
|
_, err = client.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
|
|
pipe.Ping(ctx)
|
|
return nil
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
Expect(stack).To(Equal([]string{
|
|
"cluster.BeforeProcessPipeline",
|
|
"shard.BeforeProcessPipeline",
|
|
"shard.AfterProcessPipeline",
|
|
"cluster.AfterProcessPipeline",
|
|
}))
|
|
})
|
|
|
|
It("should return correct replica for key", func() {
|
|
client, err := client.SlaveForKey(ctx, "test")
|
|
Expect(err).ToNot(HaveOccurred())
|
|
info := client.Info(ctx, "server")
|
|
Expect(info.Val()).Should(ContainSubstring("tcp_port:16604"))
|
|
})
|
|
|
|
It("should return correct master for key", func() {
|
|
client, err := client.MasterForKey(ctx, "test")
|
|
Expect(err).ToNot(HaveOccurred())
|
|
info := client.Info(ctx, "server")
|
|
Expect(info.Val()).Should(ContainSubstring("tcp_port:16601"))
|
|
})
|
|
|
|
assertClusterClient()
|
|
})
|
|
|
|
Describe("ClusterClient with RouteByLatency", func() {
|
|
BeforeEach(func() {
|
|
opt = redisClusterOptions()
|
|
opt.RouteByLatency = true
|
|
client = cluster.newClusterClient(ctx, opt)
|
|
|
|
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
|
return master.FlushDB(ctx).Err()
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
|
|
Eventually(func() int64 {
|
|
return client.DBSize(ctx).Val()
|
|
}, 30*time.Second).Should(Equal(int64(0)))
|
|
return nil
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
})
|
|
|
|
AfterEach(func() {
|
|
err := client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
|
|
return slave.ReadWrite(ctx).Err()
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
err = client.Close()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
})
|
|
|
|
assertClusterClient()
|
|
})
|
|
|
|
Describe("ClusterClient with ClusterSlots", func() {
|
|
BeforeEach(func() {
|
|
failover = true
|
|
|
|
opt = redisClusterOptions()
|
|
opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
|
|
slots := []redis.ClusterSlot{{
|
|
Start: 0,
|
|
End: 5460,
|
|
Nodes: []redis.ClusterNode{{
|
|
Addr: ":" + ringShard1Port,
|
|
}},
|
|
}, {
|
|
Start: 5461,
|
|
End: 10922,
|
|
Nodes: []redis.ClusterNode{{
|
|
Addr: ":" + ringShard2Port,
|
|
}},
|
|
}, {
|
|
Start: 10923,
|
|
End: 16383,
|
|
Nodes: []redis.ClusterNode{{
|
|
Addr: ":" + ringShard3Port,
|
|
}},
|
|
}}
|
|
return slots, nil
|
|
}
|
|
client = cluster.newClusterClient(ctx, opt)
|
|
|
|
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
|
return master.FlushDB(ctx).Err()
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
|
|
Eventually(func() int64 {
|
|
return client.DBSize(ctx).Val()
|
|
}, 30*time.Second).Should(Equal(int64(0)))
|
|
return nil
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
})
|
|
|
|
AfterEach(func() {
|
|
failover = false
|
|
|
|
err := client.Close()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
})
|
|
|
|
assertClusterClient()
|
|
})
|
|
|
|
Describe("ClusterClient with RouteRandomly and ClusterSlots", func() {
|
|
BeforeEach(func() {
|
|
failover = true
|
|
|
|
opt = redisClusterOptions()
|
|
opt.RouteRandomly = true
|
|
opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
|
|
slots := []redis.ClusterSlot{{
|
|
Start: 0,
|
|
End: 5460,
|
|
Nodes: []redis.ClusterNode{{
|
|
Addr: ":" + ringShard1Port,
|
|
}},
|
|
}, {
|
|
Start: 5461,
|
|
End: 10922,
|
|
Nodes: []redis.ClusterNode{{
|
|
Addr: ":" + ringShard2Port,
|
|
}},
|
|
}, {
|
|
Start: 10923,
|
|
End: 16383,
|
|
Nodes: []redis.ClusterNode{{
|
|
Addr: ":" + ringShard3Port,
|
|
}},
|
|
}}
|
|
return slots, nil
|
|
}
|
|
client = cluster.newClusterClient(ctx, opt)
|
|
|
|
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
|
return master.FlushDB(ctx).Err()
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
|
|
Eventually(func() int64 {
|
|
return client.DBSize(ctx).Val()
|
|
}, 30*time.Second).Should(Equal(int64(0)))
|
|
return nil
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
})
|
|
|
|
AfterEach(func() {
|
|
failover = false
|
|
|
|
err := client.Close()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
})
|
|
|
|
assertClusterClient()
|
|
})
|
|
|
|
Describe("ClusterClient with ClusterSlots with multiple nodes per slot", func() {
|
|
BeforeEach(func() {
|
|
failover = true
|
|
|
|
opt = redisClusterOptions()
|
|
opt.ReadOnly = true
|
|
opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
|
|
slots := []redis.ClusterSlot{{
|
|
Start: 0,
|
|
End: 5460,
|
|
Nodes: []redis.ClusterNode{{
|
|
Addr: ":16600",
|
|
}, {
|
|
Addr: ":16603",
|
|
}},
|
|
}, {
|
|
Start: 5461,
|
|
End: 10922,
|
|
Nodes: []redis.ClusterNode{{
|
|
Addr: ":16601",
|
|
}, {
|
|
Addr: ":16604",
|
|
}},
|
|
}, {
|
|
Start: 10923,
|
|
End: 16383,
|
|
Nodes: []redis.ClusterNode{{
|
|
Addr: ":16602",
|
|
}, {
|
|
Addr: ":16605",
|
|
}},
|
|
}}
|
|
return slots, nil
|
|
}
|
|
client = cluster.newClusterClient(ctx, opt)
|
|
|
|
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
|
|
return master.FlushDB(ctx).Err()
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
|
|
Eventually(func() int64 {
|
|
return client.DBSize(ctx).Val()
|
|
}, 30*time.Second).Should(Equal(int64(0)))
|
|
return nil
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
})
|
|
|
|
AfterEach(func() {
|
|
failover = false
|
|
|
|
err := client.Close()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
})
|
|
|
|
assertClusterClient()
|
|
})
|
|
})
|
|
|
|
var _ = Describe("ClusterClient without nodes", func() {
|
|
var client *redis.ClusterClient
|
|
|
|
BeforeEach(func() {
|
|
client = redis.NewClusterClient(&redis.ClusterOptions{})
|
|
})
|
|
|
|
AfterEach(func() {
|
|
Expect(client.Close()).NotTo(HaveOccurred())
|
|
})
|
|
|
|
It("Ping returns an error", func() {
|
|
err := client.Ping(ctx).Err()
|
|
Expect(err).To(MatchError("redis: cluster has no nodes"))
|
|
})
|
|
|
|
It("pipeline returns an error", func() {
|
|
_, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
|
|
pipe.Ping(ctx)
|
|
return nil
|
|
})
|
|
Expect(err).To(MatchError("redis: cluster has no nodes"))
|
|
})
|
|
})
|
|
|
|
var _ = Describe("ClusterClient without valid nodes", func() {
|
|
var client *redis.ClusterClient
|
|
|
|
BeforeEach(func() {
|
|
client = redis.NewClusterClient(&redis.ClusterOptions{
|
|
Addrs: []string{redisAddr},
|
|
})
|
|
})
|
|
|
|
AfterEach(func() {
|
|
Expect(client.Close()).NotTo(HaveOccurred())
|
|
})
|
|
|
|
It("returns an error", func() {
|
|
err := client.Ping(ctx).Err()
|
|
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
|
|
})
|
|
|
|
It("pipeline returns an error", func() {
|
|
_, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
|
|
pipe.Ping(ctx)
|
|
return nil
|
|
})
|
|
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
|
|
})
|
|
})
|
|
|
|
var _ = Describe("ClusterClient with unavailable Cluster", func() {
|
|
var client *redis.ClusterClient
|
|
|
|
BeforeEach(func() {
|
|
opt := redisClusterOptions()
|
|
opt.ReadTimeout = 250 * time.Millisecond
|
|
opt.WriteTimeout = 250 * time.Millisecond
|
|
opt.MaxRedirects = 1
|
|
client = cluster.newClusterClientUnstable(opt)
|
|
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
|
|
|
|
for _, node := range cluster.clients {
|
|
err := node.ClientPause(ctx, 5*time.Second).Err()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
}
|
|
})
|
|
|
|
AfterEach(func() {
|
|
Expect(client.Close()).NotTo(HaveOccurred())
|
|
})
|
|
|
|
It("recovers when Cluster recovers", func() {
|
|
err := client.Ping(ctx).Err()
|
|
Expect(err).To(HaveOccurred())
|
|
|
|
Eventually(func() error {
|
|
return client.Ping(ctx).Err()
|
|
}, "30s").ShouldNot(HaveOccurred())
|
|
})
|
|
})
|
|
|
|
var _ = Describe("ClusterClient timeout", func() {
|
|
var client *redis.ClusterClient
|
|
|
|
AfterEach(func() {
|
|
_ = client.Close()
|
|
})
|
|
|
|
testTimeout := func() {
|
|
It("Ping timeouts", func() {
|
|
err := client.Ping(ctx).Err()
|
|
Expect(err).To(HaveOccurred())
|
|
Expect(err.(net.Error).Timeout()).To(BeTrue())
|
|
})
|
|
|
|
It("Pipeline timeouts", func() {
|
|
_, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
|
|
pipe.Ping(ctx)
|
|
return nil
|
|
})
|
|
Expect(err).To(HaveOccurred())
|
|
Expect(err.(net.Error).Timeout()).To(BeTrue())
|
|
})
|
|
|
|
It("Tx timeouts", func() {
|
|
err := client.Watch(ctx, func(tx *redis.Tx) error {
|
|
return tx.Ping(ctx).Err()
|
|
}, "foo")
|
|
Expect(err).To(HaveOccurred())
|
|
Expect(err.(net.Error).Timeout()).To(BeTrue())
|
|
})
|
|
|
|
It("Tx Pipeline timeouts", func() {
|
|
err := client.Watch(ctx, func(tx *redis.Tx) error {
|
|
_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
|
|
pipe.Ping(ctx)
|
|
return nil
|
|
})
|
|
return err
|
|
}, "foo")
|
|
Expect(err).To(HaveOccurred())
|
|
Expect(err.(net.Error).Timeout()).To(BeTrue())
|
|
})
|
|
}
|
|
|
|
const pause = 5 * time.Second
|
|
|
|
Context("read/write timeout", func() {
|
|
BeforeEach(func() {
|
|
opt := redisClusterOptions()
|
|
client = cluster.newClusterClient(ctx, opt)
|
|
|
|
err := client.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
|
|
err := client.ClientPause(ctx, pause).Err()
|
|
|
|
opt := client.Options()
|
|
opt.ReadTimeout = time.Nanosecond
|
|
opt.WriteTimeout = time.Nanosecond
|
|
|
|
return err
|
|
})
|
|
Expect(err).NotTo(HaveOccurred())
|
|
|
|
// Overwrite timeouts after the client is initialized.
|
|
opt.ReadTimeout = time.Nanosecond
|
|
opt.WriteTimeout = time.Nanosecond
|
|
opt.MaxRedirects = 0
|
|
})
|
|
|
|
AfterEach(func() {
|
|
_ = client.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
|
|
defer GinkgoRecover()
|
|
|
|
opt := client.Options()
|
|
opt.ReadTimeout = time.Second
|
|
opt.WriteTimeout = time.Second
|
|
|
|
Eventually(func() error {
|
|
return client.Ping(ctx).Err()
|
|
}, 2*pause).ShouldNot(HaveOccurred())
|
|
return nil
|
|
})
|
|
|
|
err := client.Close()
|
|
Expect(err).NotTo(HaveOccurred())
|
|
})
|
|
|
|
testTimeout()
|
|
})
|
|
})
|
|
|
|
var _ = Describe("ClusterClient ParseURL", func() {
|
|
cases := []struct {
|
|
test string
|
|
url string
|
|
o *redis.ClusterOptions // expected value
|
|
err error
|
|
}{
|
|
{
|
|
test: "ParseRedisURL",
|
|
url: "redis://localhost:123",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}},
|
|
}, {
|
|
test: "ParseRedissURL",
|
|
url: "rediss://localhost:123",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
|
|
}, {
|
|
test: "MissingRedisPort",
|
|
url: "redis://localhost",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:6379"}},
|
|
}, {
|
|
test: "MissingRedissPort",
|
|
url: "rediss://localhost",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:6379"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
|
|
}, {
|
|
test: "MultipleRedisURLs",
|
|
url: "redis://localhost:123?addr=localhost:1234&addr=localhost:12345",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234", "localhost:12345"}},
|
|
}, {
|
|
test: "MultipleRedissURLs",
|
|
url: "rediss://localhost:123?addr=localhost:1234&addr=localhost:12345",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234", "localhost:12345"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
|
|
}, {
|
|
test: "OnlyPassword",
|
|
url: "redis://:bar@localhost:123",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Password: "bar"},
|
|
}, {
|
|
test: "OnlyUser",
|
|
url: "redis://foo@localhost:123",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Username: "foo"},
|
|
}, {
|
|
test: "RedisUsernamePassword",
|
|
url: "redis://foo:bar@localhost:123",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Username: "foo", Password: "bar"},
|
|
}, {
|
|
test: "RedissUsernamePassword",
|
|
url: "rediss://foo:bar@localhost:123?addr=localhost:1234",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, Username: "foo", Password: "bar", TLSConfig: &tls.Config{ServerName: "localhost"}},
|
|
}, {
|
|
test: "QueryParameters",
|
|
url: "redis://localhost:123?read_timeout=2&pool_fifo=true&addr=localhost:1234",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, ReadTimeout: 2 * time.Second, PoolFIFO: true},
|
|
}, {
|
|
test: "DisabledTimeout",
|
|
url: "redis://localhost:123?conn_max_idle_time=0",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: -1},
|
|
}, {
|
|
test: "DisabledTimeoutNeg",
|
|
url: "redis://localhost:123?conn_max_idle_time=-1",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: -1},
|
|
}, {
|
|
test: "UseDefault",
|
|
url: "redis://localhost:123?conn_max_idle_time=",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
|
|
}, {
|
|
test: "Protocol",
|
|
url: "redis://localhost:123?protocol=2",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Protocol: 2},
|
|
}, {
|
|
test: "ClientName",
|
|
url: "redis://localhost:123?client_name=cluster_hi",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ClientName: "cluster_hi"},
|
|
}, {
|
|
test: "UseDefaultMissing=",
|
|
url: "redis://localhost:123?conn_max_idle_time",
|
|
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
|
|
}, {
|
|
test: "InvalidQueryAddr",
|
|
url: "rediss://foo:bar@localhost:123?addr=rediss://foo:barr@localhost:1234",
|
|
err: errors.New(`redis: unable to parse addr param: rediss://foo:barr@localhost:1234`),
|
|
}, {
|
|
test: "InvalidInt",
|
|
url: "redis://localhost?pool_size=five",
|
|
err: errors.New(`redis: invalid pool_size number: strconv.Atoi: parsing "five": invalid syntax`),
|
|
}, {
|
|
test: "InvalidBool",
|
|
url: "redis://localhost?pool_fifo=yes",
|
|
err: errors.New(`redis: invalid pool_fifo boolean: expected true/false/1/0 or an empty string, got "yes"`),
|
|
}, {
|
|
test: "UnknownParam",
|
|
url: "redis://localhost?abc=123",
|
|
err: errors.New("redis: unexpected option: abc"),
|
|
}, {
|
|
test: "InvalidScheme",
|
|
url: "https://google.com",
|
|
err: errors.New("redis: invalid URL scheme: https"),
|
|
},
|
|
}
|
|
|
|
It("match ParseClusterURL", func() {
|
|
for i := range cases {
|
|
tc := cases[i]
|
|
actual, err := redis.ParseClusterURL(tc.url)
|
|
if tc.err != nil {
|
|
Expect(err).Should(MatchError(tc.err))
|
|
} else {
|
|
Expect(err).NotTo(HaveOccurred())
|
|
}
|
|
|
|
if err == nil {
|
|
Expect(tc.o).NotTo(BeNil())
|
|
|
|
Expect(tc.o.Addrs).To(Equal(actual.Addrs))
|
|
Expect(tc.o.TLSConfig).To(Equal(actual.TLSConfig))
|
|
Expect(tc.o.Username).To(Equal(actual.Username))
|
|
Expect(tc.o.Password).To(Equal(actual.Password))
|
|
Expect(tc.o.MaxRetries).To(Equal(actual.MaxRetries))
|
|
Expect(tc.o.MinRetryBackoff).To(Equal(actual.MinRetryBackoff))
|
|
Expect(tc.o.MaxRetryBackoff).To(Equal(actual.MaxRetryBackoff))
|
|
Expect(tc.o.DialTimeout).To(Equal(actual.DialTimeout))
|
|
Expect(tc.o.ReadTimeout).To(Equal(actual.ReadTimeout))
|
|
Expect(tc.o.WriteTimeout).To(Equal(actual.WriteTimeout))
|
|
Expect(tc.o.PoolFIFO).To(Equal(actual.PoolFIFO))
|
|
Expect(tc.o.PoolSize).To(Equal(actual.PoolSize))
|
|
Expect(tc.o.MinIdleConns).To(Equal(actual.MinIdleConns))
|
|
Expect(tc.o.ConnMaxLifetime).To(Equal(actual.ConnMaxLifetime))
|
|
Expect(tc.o.ConnMaxIdleTime).To(Equal(actual.ConnMaxIdleTime))
|
|
Expect(tc.o.PoolTimeout).To(Equal(actual.PoolTimeout))
|
|
}
|
|
}
|
|
})
|
|
})
|