1
0
mirror of https://github.com/redis/go-redis.git synced 2025-11-30 18:01:23 +03:00
Files
go-redis/osscluster_test.go
ofekshenawa f711eb0f62 feat(cluster): Implement Request and Response Policy Based Routing in Cluster Mode (#3422)
* Add search module builders and tests (#1)

* Add search module builders and tests

* Add tests

* Use builders and Actions in more clean way

* Update search_builders.go

Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com>

* Update search_builders.go

Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com>

* feat(routing): add internal request/response policy enums

* feat: load the policy table in cluster client (#4)

* feat: load the policy table in cluster client

* Remove comments

* modify Tips and command pplicy in commandInfo (#5)

* centralize cluster command routing in osscluster_router.go and refactor osscluster.go (#6)

* centralize cluster command routing in osscluster_router.go and refactor osscluster.go

* enalbe ci on all branches

* Add debug prints

* Add debug prints

* FIX: deal with nil policy

* FIX: fixing clusterClient process

* chore(osscluster): simplify switch case

* wip(command): ai generated clone method for commands

* feat: implement response aggregator for Redis cluster commands

* feat: implement response aggregator for Redis cluster commands

* fix: solve concurrency errors

* fix: solve concurrency errors

* return MaxRedirects settings

* remove locks from getCommandPolicy

* Handle MOVED errors more robustly, remove cluster reloading at exectutions, ennsure better routing

* Fix: supports Process hook test

* Fix: remove response aggregation for single shard commands

* Add more preformant type conversion for Cmd type

* Add router logic into processPipeline

---------

Co-authored-by: Nedyalko Dyakov <nedyalko.dyakov@gmail.com>

* remove thread debugging code

* remove thread debugging code && reject commands with policy that cannot be used in pipeline

* refactor processPipline and cmdType enum

* remove FDescribe from cluster tests

* Add tests

* fix aggregation test

* fix mget test

* fix mget test

* remove aggregateKeyedResponses

* added scaffolding for the req-resp manager

* added default policies for the search commands

* split command map into module->command

* cleanup, added logic to refresh the cache

* added reactive cache refresh

* revert cluster refresh

* fixed lint

* addresed first batch of comments

* rewrote aggregator implementations with atomic for native or nearnative primitives

* addressed more comments, fixed lint

* added batch aggregator operations

* fixed lint

* updated batch aggregator, fixed extractcommandvalue

* fixed lint

* added batching to aggregateResponses

* fixed deadlocks

* changed aggregator logic, added error params

* added preemptive return to the aggregators

* more work on the aggregators

* updated and and or aggregators

* fixed lint

* added configurable policy resolvers

* slight refactor

* removed the interface, slight refactor

* change func signature from cmdName to cmder

* added nil safety assertions

* few small refactors

* added read only policies

* removed leftover prints

* Rebased to master, resolved comnflicts

* fixed lint

* updated gha

* fixed tests, minor consistency refactor

* preallocated simple errors

* changed numeric aggregators to use float64

* speculative test fix

* Update command.go

Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com>

* Update main_test.go

Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com>

* Add static shard picker

* Fix nil value handling in command aggregation

* Modify the Clone method to return a shallow copy

* Add clone method to digest command

* Optimize keyless command routing to respect ShardPicker policy

* Remove MGET references

* Fix MGET aggregation to map individual values to keys across shards

* Add clone method to hybrid search commands

* Undo changes in route keyless test

* remove comments

* Add test for DisableRoutingPolicies option

* Add Routing Policies Comprehensive Test Suite and Fix multi keyed aggregation for different step

---------

Co-authored-by: Nedyalko Dyakov <1547186+ndyakov@users.noreply.github.com>
Co-authored-by: Nedyalko Dyakov <nedyalko.dyakov@gmail.com>
Co-authored-by: Hristo Temelski <hristo.temelski@redis.com>
2025-11-28 11:46:23 +02:00

3249 lines
92 KiB
Go

package redis_test
import (
"context"
"crypto/tls"
"errors"
"fmt"
"net"
"slices"
"strconv"
"strings"
"sync"
"time"
. "github.com/bsm/ginkgo/v2"
. "github.com/bsm/gomega"
"github.com/redis/go-redis/v9"
"github.com/redis/go-redis/v9/internal/hashtag"
"github.com/redis/go-redis/v9/internal/routing"
)
type clusterScenario struct {
ports []string
nodeIDs []string
clients map[string]*redis.Client
}
func (s *clusterScenario) slots() []int {
return []int{0, 5461, 10923, 16384}
}
func (s *clusterScenario) masters() []*redis.Client {
result := make([]*redis.Client, 3)
for pos, port := range s.ports[:3] {
result[pos] = s.clients[port]
}
return result
}
func (s *clusterScenario) slaves() []*redis.Client {
result := make([]*redis.Client, 3)
for pos, port := range s.ports[3:] {
result[pos] = s.clients[port]
}
return result
}
func (s *clusterScenario) addrs() []string {
addrs := make([]string, len(s.ports))
for i, port := range s.ports {
addrs[i] = net.JoinHostPort("127.0.0.1", port)
}
return addrs
}
func (s *clusterScenario) newClusterClientUnstable(opt *redis.ClusterOptions) *redis.ClusterClient {
opt.Addrs = s.addrs()
return redis.NewClusterClient(opt)
}
func (s *clusterScenario) newClusterClient(
ctx context.Context, opt *redis.ClusterOptions,
) *redis.ClusterClient {
client := s.newClusterClientUnstable(opt)
client.SetCommandInfoResolver(client.NewDynamicResolver())
err := eventually(func() error {
if opt.ClusterSlots != nil {
return nil
}
state, err := client.LoadState(ctx)
if err != nil {
return err
}
if !state.IsConsistent(ctx) {
return fmt.Errorf("cluster state is not consistent")
}
return nil
}, 30*time.Second)
if err != nil {
panic(err)
}
return client
}
func (s *clusterScenario) Close() error {
ctx := context.TODO()
for _, master := range s.masters() {
if master == nil {
continue
}
err := master.FlushAll(ctx).Err()
if err != nil {
return err
}
// since 7.2 forget calls should be propagated, calling only master
// nodes should be sufficient.
for _, nID := range s.nodeIDs {
master.ClusterForget(ctx, nID)
}
}
return nil
}
func configureClusterTopology(ctx context.Context, scenario *clusterScenario) error {
allowErrs := []string{
"ERR Slot 0 is already busy",
"ERR Slot 5461 is already busy",
"ERR Slot 10923 is already busy",
"ERR Slot 16384 is already busy",
}
err := collectNodeInformation(ctx, scenario)
if err != nil {
return err
}
// Meet cluster nodes.
for _, client := range scenario.clients {
err := client.ClusterMeet(ctx, "127.0.0.1", scenario.ports[0]).Err()
if err != nil {
return err
}
}
slots := scenario.slots()
for pos, master := range scenario.masters() {
err := master.ClusterAddSlotsRange(ctx, slots[pos], slots[pos+1]-1).Err()
if err != nil && slices.Contains(allowErrs, err.Error()) == false {
return err
}
}
// Bootstrap slaves.
for idx, slave := range scenario.slaves() {
masterID := scenario.nodeIDs[idx]
// Wait until master is available
err := eventually(func() error {
s := slave.ClusterNodes(ctx).Val()
wanted := masterID
if !strings.Contains(s, wanted) {
return fmt.Errorf("%q does not contain %q", s, wanted)
}
return nil
}, 10*time.Second)
if err != nil {
return err
}
err = slave.ClusterReplicate(ctx, masterID).Err()
if err != nil {
return err
}
}
// Wait until all nodes have consistent info.
wanted := []redis.ClusterSlot{{
Start: 0,
End: 5460,
Nodes: []redis.ClusterNode{{
ID: "",
Addr: "127.0.0.1:16600",
}, {
ID: "",
Addr: "127.0.0.1:16603",
}},
}, {
Start: 5461,
End: 10922,
Nodes: []redis.ClusterNode{{
ID: "",
Addr: "127.0.0.1:16601",
}, {
ID: "",
Addr: "127.0.0.1:16604",
}},
}, {
Start: 10923,
End: 16383,
Nodes: []redis.ClusterNode{{
ID: "",
Addr: "127.0.0.1:16602",
}, {
ID: "",
Addr: "127.0.0.1:16605",
}},
}}
for _, client := range scenario.clients {
err := eventually(func() error {
res, err := client.ClusterSlots(ctx).Result()
if err != nil {
return err
}
return assertSlotsEqual(res, wanted)
}, 90*time.Second)
if err != nil {
return err
}
}
return nil
}
func collectNodeInformation(ctx context.Context, scenario *clusterScenario) error {
for pos, port := range scenario.ports {
client := redis.NewClient(&redis.Options{
Addr: ":" + port,
})
myID, err := client.ClusterMyID(ctx).Result()
if err != nil {
return err
}
scenario.clients[port] = client
scenario.nodeIDs[pos] = myID
}
return nil
}
func assertSlotsEqual(slots, wanted []redis.ClusterSlot) error {
outerLoop:
for _, s2 := range wanted {
for _, s1 := range slots {
if slotEqual(s1, s2) {
continue outerLoop
}
}
return fmt.Errorf("%v not found in %v", s2, slots)
}
return nil
}
func slotEqual(s1, s2 redis.ClusterSlot) bool {
if s1.Start != s2.Start {
return false
}
if s1.End != s2.End {
return false
}
if len(s1.Nodes) != len(s2.Nodes) {
return false
}
for i, n1 := range s1.Nodes {
if n1.Addr != s2.Nodes[i].Addr {
return false
}
}
return true
}
// ------------------------------------------------------------------------------
var _ = Describe("ClusterClient", func() {
var failover bool
var opt *redis.ClusterOptions
var client *redis.ClusterClient
assertClusterClient := func() {
It("do", func() {
val, err := client.Do(ctx, "ping").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).To(Equal("PONG"))
})
It("should GET/SET/DEL", func() {
err := client.Get(ctx, "A").Err()
Expect(err).To(Equal(redis.Nil))
err = client.Set(ctx, "A", "VALUE", 0).Err()
Expect(err).NotTo(HaveOccurred())
Eventually(func() string {
return client.Get(ctx, "A").Val()
}, 30*time.Second).Should(Equal("VALUE"))
cnt, err := client.Del(ctx, "A").Result()
Expect(err).NotTo(HaveOccurred())
Expect(cnt).To(Equal(int64(1)))
})
It("should follow redirects for GET", func() {
err := client.Set(ctx, "A", "VALUE", 0).Err()
Expect(err).NotTo(HaveOccurred())
if !failover {
Eventually(func() int64 {
nodes, err := client.Nodes(ctx, "A")
if err != nil {
return 0
}
return nodes[1].Client.DBSize(ctx).Val()
}, 30*time.Second).Should(Equal(int64(1)))
Eventually(func() error {
return client.SwapNodes(ctx, "A")
}, 30*time.Second).ShouldNot(HaveOccurred())
}
v, err := client.Get(ctx, "A").Result()
Expect(err).NotTo(HaveOccurred())
Expect(v).To(Equal("VALUE"))
})
It("should follow redirects for SET", func() {
if !failover {
Eventually(func() error {
return client.SwapNodes(ctx, "A")
}, 30*time.Second).ShouldNot(HaveOccurred())
}
err := client.Set(ctx, "A", "VALUE", 0).Err()
Expect(err).NotTo(HaveOccurred())
v, err := client.Get(ctx, "A").Result()
Expect(err).NotTo(HaveOccurred())
Expect(v).To(Equal("VALUE"))
})
It("should distribute keys", func() {
for i := 0; i < 100; i++ {
err := client.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
Expect(err).NotTo(HaveOccurred())
}
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
defer GinkgoRecover()
Eventually(func() string {
return master.Info(ctx, "keyspace").Val()
}, 30*time.Second).Should(Or(
ContainSubstring("keys=32"),
ContainSubstring("keys=36"),
ContainSubstring("keys=32"),
))
return nil
})
Expect(err).NotTo(HaveOccurred())
})
It("should distribute keys when using EVAL", func() {
script := redis.NewScript(`
local r = redis.call('SET', KEYS[1], ARGV[1])
return r
`)
var key string
for i := 0; i < 100; i++ {
key = fmt.Sprintf("key%d", i)
err := script.Run(ctx, client, []string{key}, "value").Err()
Expect(err).NotTo(HaveOccurred())
}
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
defer GinkgoRecover()
Eventually(func() string {
return master.Info(ctx, "keyspace").Val()
}, 30*time.Second).Should(Or(
ContainSubstring("keys=32"),
ContainSubstring("keys=36"),
ContainSubstring("keys=32"),
))
return nil
})
Expect(err).NotTo(HaveOccurred())
})
It("should distribute scripts when using Script Load", func() {
client.ScriptFlush(ctx)
script := redis.NewScript(`return 'Unique script'`)
script.Load(ctx, client)
err := client.ForEachShard(ctx, func(ctx context.Context, shard *redis.Client) error {
defer GinkgoRecover()
val, _ := script.Exists(ctx, shard).Result()
Expect(val[0]).To(Equal(true))
return nil
})
Expect(err).NotTo(HaveOccurred())
})
It("should check all shards when using Script Exists", func() {
client.ScriptFlush(ctx)
script := redis.NewScript(`return 'First script'`)
lostScriptSrc := `return 'Lost script'`
lostScript := redis.NewScript(lostScriptSrc)
script.Load(ctx, client)
client.Do(ctx, "script", "load", lostScriptSrc)
val, _ := client.ScriptExists(ctx, script.Hash(), lostScript.Hash()).Result()
Expect(val).To(Equal([]bool{true, false}))
})
It("should flush scripts from all shards when using ScriptFlush", func() {
script := redis.NewScript(`return 'Unnecessary script'`)
script.Load(ctx, client)
val, _ := client.ScriptExists(ctx, script.Hash()).Result()
Expect(val).To(Equal([]bool{true}))
client.ScriptFlush(ctx)
val, _ = client.ScriptExists(ctx, script.Hash()).Result()
Expect(val).To(Equal([]bool{false}))
})
It("should support Watch", func() {
var incr func(string) error
// Transactionally increments key using GET and SET commands.
incr = func(key string) error {
err := client.Watch(ctx, func(tx *redis.Tx) error {
n, err := tx.Get(ctx, key).Int64()
if err != nil && err != redis.Nil {
return err
}
_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, key, strconv.FormatInt(n+1, 10), 0)
return nil
})
return err
}, key)
if err == redis.TxFailedErr {
return incr(key)
}
return err
}
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
wg.Add(1)
go func() {
defer GinkgoRecover()
defer wg.Done()
err := incr("key")
Expect(err).NotTo(HaveOccurred())
}()
}
wg.Wait()
Eventually(func() string {
return client.Get(ctx, "key").Val()
}, 30*time.Second).Should(Equal("100"))
})
Describe("pipelining", func() {
var pipe *redis.Pipeline
assertPipeline := func(keys []string) {
It("should follow redirects", func() {
if !failover {
for _, key := range keys {
Eventually(func() error {
return client.SwapNodes(ctx, key)
}, 30*time.Second).ShouldNot(HaveOccurred())
}
}
for i, key := range keys {
pipe.Set(ctx, key, key+"_value", 0)
pipe.Expire(ctx, key, time.Duration(i+1)*time.Hour)
}
cmds, err := pipe.Exec(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(14))
// Check that all keys are set.
for _, key := range keys {
Eventually(func() string {
return client.Get(ctx, key).Val()
}, 30*time.Second).Should(Equal(key + "_value"))
}
if !failover {
for _, key := range keys {
Eventually(func() error {
return client.SwapNodes(ctx, key)
}, 30*time.Second).ShouldNot(HaveOccurred())
}
}
for _, key := range keys {
pipe.Get(ctx, key)
pipe.TTL(ctx, key)
}
cmds, err = pipe.Exec(ctx)
Expect(err).NotTo(HaveOccurred())
Expect(cmds).To(HaveLen(14))
for i, key := range keys {
get := cmds[i*2].(*redis.StringCmd)
Expect(get.Val()).To(Equal(key + "_value"))
ttl := cmds[(i*2)+1].(*redis.DurationCmd)
dur := time.Duration(i+1) * time.Hour
Expect(ttl.Val()).To(BeNumerically("~", dur, 30*time.Second))
}
})
It("should work with missing keys", func() {
pipe.Set(ctx, "A{s}", "A_value", 0)
pipe.Set(ctx, "C{s}", "C_value", 0)
_, err := pipe.Exec(ctx)
Expect(err).NotTo(HaveOccurred())
a := pipe.Get(ctx, "A{s}")
b := pipe.Get(ctx, "B{s}")
c := pipe.Get(ctx, "C{s}")
cmds, err := pipe.Exec(ctx)
Expect(err).To(Equal(redis.Nil))
Expect(cmds).To(HaveLen(3))
Expect(a.Err()).NotTo(HaveOccurred())
Expect(a.Val()).To(Equal("A_value"))
Expect(b.Err()).To(Equal(redis.Nil))
Expect(b.Val()).To(Equal(""))
Expect(c.Err()).NotTo(HaveOccurred())
Expect(c.Val()).To(Equal("C_value"))
})
}
Describe("with Pipeline", func() {
BeforeEach(func() {
pipe = client.Pipeline().(*redis.Pipeline)
})
AfterEach(func() {})
keys := []string{"A", "B", "C", "D", "E", "F", "G"}
assertPipeline(keys)
It("should not fail node with context.Canceled error", func() {
ctx, cancel := context.WithCancel(context.Background())
cancel()
pipe.Set(ctx, "A", "A_value", 0)
_, err := pipe.Exec(ctx)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, context.Canceled)).To(BeTrue())
clientNodes, _ := client.Nodes(ctx, "A")
for _, node := range clientNodes {
Expect(node.Failing()).To(BeFalse())
}
})
It("should not fail node with context.DeadlineExceeded error", func() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Nanosecond)
defer cancel()
pipe.Set(ctx, "A", "A_value", 0)
_, err := pipe.Exec(ctx)
Expect(err).To(HaveOccurred())
Expect(errors.Is(err, context.DeadlineExceeded)).To(BeTrue())
clientNodes, _ := client.Nodes(ctx, "A")
for _, node := range clientNodes {
Expect(node.Failing()).To(BeFalse())
}
})
})
Describe("with TxPipeline", func() {
BeforeEach(func() {
pipe = client.TxPipeline().(*redis.Pipeline)
})
AfterEach(func() {})
// TxPipeline doesn't support cross slot commands.
// Use hashtag to force all keys to the same slot.
keys := []string{"A{s}", "B{s}", "C{s}", "D{s}", "E{s}", "F{s}", "G{s}"}
assertPipeline(keys)
// make sure CrossSlot error is returned
It("returns CrossSlot error", func() {
pipe.Set(ctx, "A{s}", "A_value", 0)
pipe.Set(ctx, "B{t}", "B_value", 0)
Expect(hashtag.Slot("A{s}")).NotTo(Equal(hashtag.Slot("B{t}")))
_, err := pipe.Exec(ctx)
Expect(err).To(MatchError(redis.ErrCrossSlot))
})
It("works normally with keyless commands and no CrossSlot error", func() {
pipe.Set(ctx, "A{s}", "A_value", 0)
pipe.Ping(ctx)
pipe.Set(ctx, "B{s}", "B_value", 0)
pipe.Ping(ctx)
_, err := pipe.Exec(ctx)
Expect(err).To(Not(HaveOccurred()))
})
// doesn't fail when no commands are queued
It("returns no error when there are no commands", func() {
_, err := pipe.Exec(ctx)
Expect(err).NotTo(HaveOccurred())
})
})
})
It("should support PubSub", func() {
pubsub := client.Subscribe(ctx, "mychannel")
defer pubsub.Close()
Eventually(func() error {
_, err := client.Publish(ctx, "mychannel", "hello").Result()
if err != nil {
return err
}
msg, err := pubsub.ReceiveTimeout(ctx, time.Second)
if err != nil {
return err
}
_, ok := msg.(*redis.Message)
if !ok {
return fmt.Errorf("got %T, wanted *redis.Message", msg)
}
return nil
}, 30*time.Second).ShouldNot(HaveOccurred())
})
It("should support sharded PubSub", func() {
pubsub := client.SSubscribe(ctx, "mychannel")
defer pubsub.Close()
Eventually(func() error {
_, err := client.SPublish(ctx, "mychannel", "hello").Result()
if err != nil {
return err
}
msg, err := pubsub.ReceiveTimeout(ctx, time.Second)
if err != nil {
return err
}
_, ok := msg.(*redis.Message)
if !ok {
return fmt.Errorf("got %T, wanted *redis.Message", msg)
}
return nil
}, 30*time.Second).ShouldNot(HaveOccurred())
})
It("should support PubSub.Ping without channels", func() {
pubsub := client.Subscribe(ctx)
defer pubsub.Close()
err := pubsub.Ping(ctx)
Expect(err).NotTo(HaveOccurred())
})
}
Describe("ClusterClient PROTO 2", func() {
BeforeEach(func() {
opt = redisClusterOptions()
opt.Protocol = 2
client = cluster.newClusterClient(ctx, opt)
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
_ = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should CLUSTER PROTO 2", func() {
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainElements("proto", int64(2)))
return nil
})
})
})
Describe("ClusterClient", func() {
BeforeEach(func() {
opt = redisClusterOptions()
opt.ClientName = "cluster_hi"
client = cluster.newClusterClient(ctx, opt)
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
_ = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should return pool stats", func() {
stats := client.PoolStats()
Expect(stats).To(BeAssignableToTypeOf(&redis.PoolStats{}))
})
It("should return an error when there are no attempts left", func() {
opt := redisClusterOptions()
opt.MaxRedirects = -1
client := cluster.newClusterClient(ctx, opt)
Eventually(func() error {
return client.SwapNodes(ctx, "A")
}, 30*time.Second).ShouldNot(HaveOccurred())
err := client.Get(ctx, "A").Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should determine hash slots correctly for generic commands", func() {
opt := redisClusterOptions()
opt.MaxRedirects = -1
client := cluster.newClusterClient(ctx, opt)
err := client.Do(ctx, "GET", "A").Err()
Expect(err).To(Equal(redis.Nil))
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
Expect(err).To(Equal(redis.Nil))
Eventually(func() error {
return client.SwapNodes(ctx, "A")
}, 30*time.Second).ShouldNot(HaveOccurred())
err = client.Do(ctx, "GET", "A").Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))
err = client.Do(ctx, []byte("GET"), []byte("A")).Err()
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("MOVED"))
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should follow node redirection immediately", func() {
// Configure retry backoffs far in excess of the expected duration of redirection
opt := redisClusterOptions()
opt.MinRetryBackoff = 10 * time.Minute
opt.MaxRetryBackoff = 20 * time.Minute
client := cluster.newClusterClient(ctx, opt)
Eventually(func() error {
return client.SwapNodes(ctx, "A")
}, 30*time.Second).ShouldNot(HaveOccurred())
// Note that this context sets a deadline more aggressive than the lowest possible bound
// of the retry backoff; this verifies that redirection completes immediately.
redirCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
err := client.Set(redirCtx, "A", "VALUE", 0).Err()
Expect(err).NotTo(HaveOccurred())
v, err := client.Get(redirCtx, "A").Result()
Expect(err).NotTo(HaveOccurred())
Expect(v).To(Equal("VALUE"))
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should call fn for every master node", func() {
for i := 0; i < 10; i++ {
Expect(client.Set(ctx, strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred())
}
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
size, err := client.DBSize(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(size).To(Equal(int64(0)))
})
It("should CLUSTER SLOTS", func() {
res, err := client.ClusterSlots(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(HaveLen(3))
wanted := []redis.ClusterSlot{{
Start: 0,
End: 5460,
Nodes: []redis.ClusterNode{{
ID: "",
Addr: "127.0.0.1:16600",
}, {
ID: "",
Addr: "127.0.0.1:16603",
}},
}, {
Start: 5461,
End: 10922,
Nodes: []redis.ClusterNode{{
ID: "",
Addr: "127.0.0.1:16601",
}, {
ID: "",
Addr: "127.0.0.1:16604",
}},
}, {
Start: 10923,
End: 16383,
Nodes: []redis.ClusterNode{{
ID: "",
Addr: "127.0.0.1:16602",
}, {
ID: "",
Addr: "127.0.0.1:16605",
}},
}}
Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
})
It("should CLUSTER SHARDS", func() {
res, err := client.ClusterShards(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).NotTo(BeEmpty())
// Iterate over the ClusterShard results and validate the fields.
for _, shard := range res {
Expect(shard.Slots).NotTo(BeEmpty())
for _, slotRange := range shard.Slots {
Expect(slotRange.Start).To(BeNumerically(">=", 0))
Expect(slotRange.End).To(BeNumerically(">=", slotRange.Start))
}
Expect(shard.Nodes).NotTo(BeEmpty())
for _, node := range shard.Nodes {
Expect(node.ID).NotTo(BeEmpty())
Expect(node.Endpoint).NotTo(BeEmpty())
Expect(node.IP).NotTo(BeEmpty())
Expect(node.Port).To(BeNumerically(">", 0))
validRoles := []string{"master", "slave", "replica"}
Expect(validRoles).To(ContainElement(node.Role))
Expect(node.ReplicationOffset).To(BeNumerically(">=", 0))
validHealthStatuses := []string{"online", "failed", "loading"}
Expect(validHealthStatuses).To(ContainElement(node.Health))
}
}
})
It("should CLUSTER LINKS", func() {
res, err := client.ClusterLinks(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).NotTo(BeEmpty())
// Iterate over the ClusterLink results and validate the map keys.
for _, link := range res {
Expect(link.Direction).NotTo(BeEmpty())
Expect([]string{"from", "to"}).To(ContainElement(link.Direction))
Expect(link.Node).NotTo(BeEmpty())
Expect(link.CreateTime).To(BeNumerically(">", 0))
Expect(link.Events).NotTo(BeEmpty())
validEventChars := []rune{'r', 'w'}
for _, eventChar := range link.Events {
Expect(validEventChars).To(ContainElement(eventChar))
}
Expect(link.SendBufferAllocated).To(BeNumerically(">=", 0))
Expect(link.SendBufferUsed).To(BeNumerically(">=", 0))
}
})
It("should cluster client setname", func() {
err := client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
return c.Ping(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.ClientList(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(ContainSubstring("name=cluster_hi"))
return nil
})
})
It("should CLUSTER PROTO 3", func() {
_ = client.ForEachShard(ctx, func(ctx context.Context, c *redis.Client) error {
val, err := c.Do(ctx, "HELLO").Result()
Expect(err).NotTo(HaveOccurred())
Expect(val).Should(HaveKeyWithValue("proto", int64(3)))
return nil
})
})
It("should CLUSTER MYSHARDID", func() {
shardID, err := client.ClusterMyShardID(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(shardID).ToNot(BeEmpty())
})
It("should CLUSTER NODES", func() {
res, err := client.ClusterNodes(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(res)).To(BeNumerically(">", 400))
})
It("should CLUSTER INFO", func() {
res, err := client.ClusterInfo(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
})
It("should CLUSTER KEYSLOT", func() {
hashSlot, err := client.ClusterKeySlot(ctx, "somekey").Result()
Expect(err).NotTo(HaveOccurred())
Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
})
It("should CLUSTER GETKEYSINSLOT", func() {
keys, err := client.ClusterGetKeysInSlot(ctx, hashtag.Slot("somekey"), 1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(keys)).To(Equal(0))
})
It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
n, err := client.ClusterCountFailureReports(ctx, cluster.nodeIDs[0]).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(0)))
})
It("should CLUSTER COUNTKEYSINSLOT", func() {
n, err := client.ClusterCountKeysInSlot(ctx, 10).Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(0)))
})
It("should CLUSTER SAVECONFIG", func() {
res, err := client.ClusterSaveConfig(ctx).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(Equal("OK"))
})
It("should CLUSTER SLAVES", func() {
nodesList, err := client.ClusterSlaves(ctx, cluster.nodeIDs[0]).Result()
Expect(err).NotTo(HaveOccurred())
Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
Expect(nodesList).Should(HaveLen(1))
})
It("should RANDOMKEY", func() {
const nkeys = 100
for i := 0; i < nkeys; i++ {
err := client.Set(ctx, fmt.Sprintf("key%d", i), "value", 0).Err()
Expect(err).NotTo(HaveOccurred())
}
var keys []string
addKey := func(key string) {
for _, k := range keys {
if k == key {
return
}
}
keys = append(keys, key)
}
for i := 0; i < nkeys*10; i++ {
key := client.RandomKey(ctx).Val()
addKey(key)
}
Expect(len(keys)).To(BeNumerically("~", nkeys, nkeys/10))
})
It("should support Process hook", func() {
testCtx, cancel := context.WithCancel(ctx)
defer cancel()
err := client.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred())
err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
return node.Ping(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
var mu sync.Mutex
var stack []string
clusterHook := &hook{
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
select {
case <-testCtx.Done():
return hook(ctx, cmd)
default:
}
Expect(cmd.String()).To(Equal("ping: "))
mu.Lock()
stack = append(stack, "cluster.BeforeProcess")
mu.Unlock()
err := hook(ctx, cmd)
Expect(cmd.String()).To(Equal("ping: PONG"))
mu.Lock()
stack = append(stack, "cluster.AfterProcess")
mu.Unlock()
return err
}
},
}
client.AddHook(clusterHook)
nodeHook := &hook{
processHook: func(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
select {
case <-testCtx.Done():
return hook(ctx, cmd)
default:
}
Expect(cmd.String()).To(Equal("ping: "))
mu.Lock()
stack = append(stack, "shard.BeforeProcess")
mu.Unlock()
err := hook(ctx, cmd)
Expect(cmd.String()).To(Equal("ping: PONG"))
mu.Lock()
stack = append(stack, "shard.AfterProcess")
mu.Unlock()
return err
}
},
}
_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
node.AddHook(nodeHook)
return nil
})
err = client.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred())
mu.Lock()
finalStack := make([]string, len(stack))
copy(finalStack, stack)
mu.Unlock()
Expect(finalStack).To(ContainElements([]string{
"cluster.BeforeProcess",
"shard.BeforeProcess",
"shard.AfterProcess",
"cluster.AfterProcess",
}))
})
It("should support Pipeline hook", func() {
err := client.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred())
err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
return node.Ping(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
var mu sync.Mutex
var stack []string
client.AddHook(&hook{
processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(1))
cmdStr := cmds[0].String()
// Handle SET command (should succeed)
if cmdStr == "set pipeline_test_key pipeline_test_value: " {
mu.Lock()
stack = append(stack, "cluster.BeforeProcessPipeline")
mu.Unlock()
err := hook(ctx, cmds)
Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("set pipeline_test_key pipeline_test_value: OK"))
mu.Lock()
stack = append(stack, "cluster.AfterProcessPipeline")
mu.Unlock()
return err
}
// For other commands (like ping), just pass through without expectations
// since they might fail before reaching this point
return hook(ctx, cmds)
}
},
})
_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
node.AddHook(&hook{
processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(1))
cmdStr := cmds[0].String()
// Handle SET command (should succeed)
if cmdStr == "set pipeline_test_key pipeline_test_value: " {
mu.Lock()
stack = append(stack, "shard.BeforeProcessPipeline")
mu.Unlock()
err := hook(ctx, cmds)
Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("set pipeline_test_key pipeline_test_value: OK"))
mu.Lock()
stack = append(stack, "shard.AfterProcessPipeline")
mu.Unlock()
return err
}
// For other commands (like ping), just pass through without expectations
return hook(ctx, cmds)
}
},
})
return nil
})
_, err = client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Set(ctx, "pipeline_test_key", "pipeline_test_value", 0)
return nil
})
Expect(err).NotTo(HaveOccurred())
mu.Lock()
finalStack := make([]string, len(stack))
copy(finalStack, stack)
mu.Unlock()
Expect(finalStack).To(Equal([]string{
"cluster.BeforeProcessPipeline",
"shard.BeforeProcessPipeline",
"shard.AfterProcessPipeline",
"cluster.AfterProcessPipeline",
}))
})
It("should reject ping command in pipeline", func() {
// Test that ping command fails in pipeline as expected
_, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Ping(ctx)
return nil
})
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("redis: cannot pipeline command \"ping\" with request policy ReqAllNodes/ReqAllShards/ReqMultiShard"))
})
It("should support TxPipeline hook", func() {
err := client.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred())
err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
return node.Ping(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
var mu sync.Mutex
var stack []string
client.AddHook(&hook{
processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(3))
Expect(cmds[1].String()).To(Equal("ping: "))
mu.Lock()
stack = append(stack, "cluster.BeforeProcessPipeline")
mu.Unlock()
err := hook(ctx, cmds)
Expect(cmds).To(HaveLen(3))
Expect(cmds[1].String()).To(Equal("ping: PONG"))
mu.Lock()
stack = append(stack, "cluster.AfterProcessPipeline")
mu.Unlock()
return err
}
},
})
_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
node.AddHook(&hook{
processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(3))
Expect(cmds[1].String()).To(Equal("ping: "))
mu.Lock()
stack = append(stack, "shard.BeforeProcessPipeline")
mu.Unlock()
err := hook(ctx, cmds)
Expect(cmds).To(HaveLen(3))
Expect(cmds[1].String()).To(Equal("ping: PONG"))
mu.Lock()
stack = append(stack, "shard.AfterProcessPipeline")
mu.Unlock()
return err
}
},
})
return nil
})
_, err = client.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Ping(ctx)
return nil
})
Expect(err).NotTo(HaveOccurred())
mu.Lock()
finalStack := make([]string, len(stack))
copy(finalStack, stack)
mu.Unlock()
Expect(finalStack).To(Equal([]string{
"cluster.BeforeProcessPipeline",
"shard.BeforeProcessPipeline",
"shard.AfterProcessPipeline",
"cluster.AfterProcessPipeline",
}))
})
It("should return correct replica for key", func() {
client, err := client.SlaveForKey(ctx, "test")
Expect(err).ToNot(HaveOccurred())
info := client.Info(ctx, "server")
Expect(info.Val()).Should(ContainSubstring("tcp_port:16604"))
})
It("should return correct master for key", func() {
client, err := client.MasterForKey(ctx, "test")
Expect(err).ToNot(HaveOccurred())
info := client.Info(ctx, "server")
Expect(info.Val()).Should(ContainSubstring("tcp_port:16601"))
})
assertClusterClient()
})
Describe("ClusterClient with RouteByLatency", func() {
BeforeEach(func() {
opt = redisClusterOptions()
opt.RouteByLatency = true
client = cluster.newClusterClient(ctx, opt)
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
Eventually(func() int64 {
return client.DBSize(ctx).Val()
}, 30*time.Second).Should(Equal(int64(0)))
return nil
})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
err := client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
return slave.ReadWrite(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
err = client.Close()
Expect(err).NotTo(HaveOccurred())
})
assertClusterClient()
})
Describe("ClusterClient with ClusterSlots", func() {
BeforeEach(func() {
failover = true
opt = redisClusterOptions()
opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
slots := []redis.ClusterSlot{{
Start: 0,
End: 5460,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard1Port,
}},
}, {
Start: 5461,
End: 10922,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard2Port,
}},
}, {
Start: 10923,
End: 16383,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard3Port,
}},
}}
return slots, nil
}
client = cluster.newClusterClient(ctx, opt)
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
Eventually(func() int64 {
return client.DBSize(ctx).Val()
}, 30*time.Second).Should(Equal(int64(0)))
return nil
})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
failover = false
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
assertClusterClient()
})
Describe("ClusterClient with RouteRandomly and ClusterSlots", func() {
BeforeEach(func() {
failover = true
opt = redisClusterOptions()
opt.RouteRandomly = true
opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
slots := []redis.ClusterSlot{{
Start: 0,
End: 5460,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard1Port,
}},
}, {
Start: 5461,
End: 10922,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard2Port,
}},
}, {
Start: 10923,
End: 16383,
Nodes: []redis.ClusterNode{{
Addr: ":" + ringShard3Port,
}},
}}
return slots, nil
}
client = cluster.newClusterClient(ctx, opt)
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
Eventually(func() int64 {
return client.DBSize(ctx).Val()
}, 30*time.Second).Should(Equal(int64(0)))
return nil
})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
failover = false
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
assertClusterClient()
})
Describe("ClusterClient with ClusterSlots with multiple nodes per slot", func() {
BeforeEach(func() {
failover = true
opt = redisClusterOptions()
opt.ReadOnly = true
opt.ClusterSlots = func(ctx context.Context) ([]redis.ClusterSlot, error) {
slots := []redis.ClusterSlot{{
Start: 0,
End: 5460,
Nodes: []redis.ClusterNode{{
Addr: ":16600",
}, {
Addr: ":16603",
}},
}, {
Start: 5461,
End: 10922,
Nodes: []redis.ClusterNode{{
Addr: ":16601",
}, {
Addr: ":16604",
}},
}, {
Start: 10923,
End: 16383,
Nodes: []redis.ClusterNode{{
Addr: ":16602",
}, {
Addr: ":16605",
}},
}}
return slots, nil
}
client = cluster.newClusterClient(ctx, opt)
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.FlushDB(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
err = client.ForEachSlave(ctx, func(ctx context.Context, slave *redis.Client) error {
Eventually(func() int64 {
return client.DBSize(ctx).Val()
}, 30*time.Second).Should(Equal(int64(0)))
return nil
})
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
failover = false
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
assertClusterClient()
})
})
var _ = Describe("ClusterClient without nodes", func() {
var client *redis.ClusterClient
BeforeEach(func() {
client = redis.NewClusterClient(&redis.ClusterOptions{})
})
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should return an error for Ping", func() {
err := client.Ping(ctx).Err()
Expect(err).To(MatchError("redis: cluster has no nodes"))
})
It("should return an error for pipeline", func() {
_, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Ping(ctx)
return nil
})
Expect(err).To(MatchError("redis: cluster has no nodes"))
})
})
var _ = Describe("ClusterClient without valid nodes", func() {
var client *redis.ClusterClient
BeforeEach(func() {
client = redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{redisAddr},
})
})
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should return an error when cluster support is disabled", func() {
err := client.Ping(ctx).Err()
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
})
It("should return an error for pipeline when cluster support is disabled", func() {
_, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Ping(ctx)
return nil
})
Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
})
})
var _ = Describe("ClusterClient with unavailable Cluster", func() {
var client *redis.ClusterClient
BeforeEach(func() {
opt := redisClusterOptions()
opt.ReadTimeout = 250 * time.Millisecond
opt.WriteTimeout = 250 * time.Millisecond
opt.MaxRedirects = 1
client = cluster.newClusterClientUnstable(opt)
Expect(client.Ping(ctx).Err()).NotTo(HaveOccurred())
for _, node := range cluster.clients {
err := node.ClientPause(ctx, 5*time.Second).Err()
Expect(err).NotTo(HaveOccurred())
}
})
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should recover when Cluster recovers", func() {
err := client.Ping(ctx).Err()
Expect(err).To(HaveOccurred())
Eventually(func() error {
return client.Ping(ctx).Err()
}, "30s").ShouldNot(HaveOccurred())
})
})
var _ = Describe("ClusterClient timeout", func() {
var client *redis.ClusterClient
AfterEach(func() {
_ = client.Close()
})
testTimeout := func() {
It("should timeout Ping", func() {
err := client.Ping(ctx).Err()
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})
It("should timeout Pipeline", func() {
_, err := client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Ping(ctx)
return nil
})
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})
It("should timeout Tx", func() {
err := client.Watch(ctx, func(tx *redis.Tx) error {
return tx.Ping(ctx).Err()
}, "foo")
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})
It("should timeout Tx Pipeline", func() {
err := client.Watch(ctx, func(tx *redis.Tx) error {
_, err := tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
pipe.Ping(ctx)
return nil
})
return err
}, "foo")
Expect(err).To(HaveOccurred())
Expect(err.(net.Error).Timeout()).To(BeTrue())
})
}
const pause = 5 * time.Second
Context("read/write timeout", func() {
BeforeEach(func() {
opt := redisClusterOptions()
client = cluster.newClusterClient(ctx, opt)
err := client.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
err := client.ClientPause(ctx, pause).Err()
opt := client.Options()
opt.ReadTimeout = time.Nanosecond
opt.WriteTimeout = time.Nanosecond
return err
})
Expect(err).NotTo(HaveOccurred())
// Overwrite timeouts after the client is initialized.
opt.ReadTimeout = time.Nanosecond
opt.WriteTimeout = time.Nanosecond
opt.MaxRedirects = 0
})
AfterEach(func() {
_ = client.ForEachShard(ctx, func(ctx context.Context, client *redis.Client) error {
defer GinkgoRecover()
opt := client.Options()
opt.ReadTimeout = time.Second
opt.WriteTimeout = time.Second
Eventually(func() error {
return client.Ping(ctx).Err()
}, 2*pause).ShouldNot(HaveOccurred())
return nil
})
err := client.Close()
Expect(err).NotTo(HaveOccurred())
})
testTimeout()
})
})
var _ = Describe("Command Tips tests", func() {
var client *redis.ClusterClient
BeforeEach(func() {
opt := redisClusterOptions()
client = cluster.newClusterClient(ctx, opt)
})
AfterEach(func() {
Expect(client.Close()).NotTo(HaveOccurred())
})
It("should verify COMMAND tips match router policy types", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
expectedPolicies := map[string]struct {
RequestPolicy string
ResponsePolicy string
}{
"touch": {
RequestPolicy: "multi_shard",
ResponsePolicy: "agg_sum",
},
"flushall": {
RequestPolicy: "all_shards",
ResponsePolicy: "all_succeeded",
},
}
cmds, err := client.Command(ctx).Result()
Expect(err).NotTo(HaveOccurred())
for cmdName, expected := range expectedPolicies {
actualCmd := cmds[cmdName]
Expect(actualCmd.CommandPolicy).NotTo(BeNil())
// Verify request_policy from COMMAND matches router policy
actualRequestPolicy := actualCmd.CommandPolicy.Request.String()
Expect(actualRequestPolicy).To(Equal(expected.RequestPolicy))
// Verify response_policy from COMMAND matches router policy
actualResponsePolicy := actualCmd.CommandPolicy.Response.String()
Expect(actualResponsePolicy).To(Equal(expected.ResponsePolicy))
}
})
Describe("Explicit Routing Policy Tests", func() {
It("should test explicit routing policy for TOUCH", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Verify TOUCH command has multi_shard policy
cmds, err := client.Command(ctx).Result()
Expect(err).NotTo(HaveOccurred())
touchCmd := cmds["touch"]
Expect(touchCmd.CommandPolicy).NotTo(BeNil())
Expect(touchCmd.CommandPolicy.Request.String()).To(Equal("multi_shard"))
Expect(touchCmd.CommandPolicy.Response.String()).To(Equal("agg_sum"))
keys := []string{"key1", "key2", "key3", "key4", "key5"}
for _, key := range keys {
err := client.Set(ctx, key, "value", 0).Err()
Expect(err).NotTo(HaveOccurred())
}
result := client.Touch(ctx, keys...)
Expect(result.Err()).NotTo(HaveOccurred())
Expect(result.Val()).To(Equal(int64(len(keys))))
})
It("should test explicit routing policy for FLUSHALL", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Verify FLUSHALL command has all_shards policy
cmds, err := client.Command(ctx).Result()
Expect(err).NotTo(HaveOccurred())
flushallCmd := cmds["flushall"]
Expect(flushallCmd.CommandPolicy).NotTo(BeNil())
Expect(flushallCmd.CommandPolicy.Request.String()).To(Equal("all_shards"))
Expect(flushallCmd.CommandPolicy.Response.String()).To(Equal("all_succeeded"))
testKeys := []string{"test1", "test2", "test3"}
for _, key := range testKeys {
err := client.Set(ctx, key, "value", 0).Err()
Expect(err).NotTo(HaveOccurred())
}
err = client.FlushAll(ctx).Err()
Expect(err).NotTo(HaveOccurred())
for _, key := range testKeys {
exists := client.Exists(ctx, key)
Expect(exists.Val()).To(Equal(int64(0)))
}
})
It("should test explicit routing policy for PING", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Verify PING command has all_shards policy
cmds, err := client.Command(ctx).Result()
Expect(err).NotTo(HaveOccurred())
pingCmd := cmds["ping"]
Expect(pingCmd.CommandPolicy).NotTo(BeNil())
Expect(pingCmd.CommandPolicy.Request.String()).To(Equal("all_shards"))
Expect(pingCmd.CommandPolicy.Response.String()).To(Equal("all_succeeded"))
result := client.Ping(ctx)
Expect(result.Err()).NotTo(HaveOccurred())
Expect(result.Val()).To(Equal("PONG"))
})
It("should test explicit routing policy for DBSIZE", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Verify DBSIZE command has all_shards policy with agg_sum response
cmds, err := client.Command(ctx).Result()
Expect(err).NotTo(HaveOccurred())
dbsizeCmd := cmds["dbsize"]
Expect(dbsizeCmd.CommandPolicy).NotTo(BeNil())
Expect(dbsizeCmd.CommandPolicy.Request.String()).To(Equal("all_shards"))
Expect(dbsizeCmd.CommandPolicy.Response.String()).To(Equal("agg_sum"))
testKeys := []string{"dbsize_test1", "dbsize_test2", "dbsize_test3"}
for _, key := range testKeys {
err := client.Set(ctx, key, "value", 0).Err()
Expect(err).NotTo(HaveOccurred())
}
size := client.DBSize(ctx)
Expect(size.Err()).NotTo(HaveOccurred())
Expect(size.Val()).To(BeNumerically(">=", int64(len(testKeys))))
})
})
Describe("DDL Commands Routing Policy Tests", func() {
BeforeEach(func() {
info := client.Info(ctx, "modules")
if info.Err() != nil || !strings.Contains(info.Val(), "search") {
Skip("Search module not available")
}
})
It("should test DDL commands routing policy for FT.CREATE", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Verify FT.CREATE command routing policy
cmds, err := client.Command(ctx).Result()
Expect(err).NotTo(HaveOccurred())
ftCreateCmd, exists := cmds["ft.create"]
if !exists || ftCreateCmd.CommandPolicy == nil {
Skip("FT.CREATE command or tips not available")
}
// DDL commands should NOT be broadcasted - they should go to coordinator only
Expect(ftCreateCmd.CommandPolicy).NotTo(BeNil())
requestPolicy := ftCreateCmd.CommandPolicy.Request.String()
Expect(requestPolicy).NotTo(Equal("all_shards"))
Expect(requestPolicy).NotTo(Equal("all_nodes"))
indexName := "test_index_create"
client.FTDropIndex(ctx, indexName)
result := client.FTCreate(ctx, indexName,
&redis.FTCreateOptions{
OnHash: true,
Prefix: []interface{}{"doc:"},
},
&redis.FieldSchema{
FieldName: "title",
FieldType: redis.SearchFieldTypeText,
})
Expect(result.Err()).NotTo(HaveOccurred())
Expect(result.Val()).To(Equal("OK"))
infoResult := client.FTInfo(ctx, indexName)
Expect(infoResult.Err()).NotTo(HaveOccurred())
Expect(infoResult.Val().IndexName).To(Equal(indexName))
client.FTDropIndex(ctx, indexName)
})
It("should test DDL commands routing policy for FT.ALTER", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Verify FT.ALTER command routing policy
cmds, err := client.Command(ctx).Result()
Expect(err).NotTo(HaveOccurred())
ftAlterCmd, exists := cmds["ft.alter"]
if !exists || ftAlterCmd.CommandPolicy == nil {
Skip("FT.ALTER command or tips not available")
}
Expect(ftAlterCmd.CommandPolicy).NotTo(BeNil())
requestPolicy := ftAlterCmd.CommandPolicy.Request.String()
Expect(requestPolicy).NotTo(Equal("all_shards"))
Expect(requestPolicy).NotTo(Equal("all_nodes"))
indexName := "test_index_alter"
client.FTDropIndex(ctx, indexName)
result := client.FTCreate(ctx, indexName,
&redis.FTCreateOptions{
OnHash: true,
Prefix: []interface{}{"doc:"},
},
&redis.FieldSchema{
FieldName: "title",
FieldType: redis.SearchFieldTypeText,
})
Expect(result.Err()).NotTo(HaveOccurred())
alterResult := client.FTAlter(ctx, indexName, false,
[]interface{}{"description", redis.SearchFieldTypeText.String()})
Expect(alterResult.Err()).NotTo(HaveOccurred())
Expect(alterResult.Val()).To(Equal("OK"))
client.FTDropIndex(ctx, indexName)
})
It("should route keyed commands to correct shard based on hash slot", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
type masterNode struct {
client *redis.Client
addr string
}
var masterNodes []masterNode
var mu sync.Mutex
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
addr := master.Options().Addr
mu.Lock()
masterNodes = append(masterNodes, masterNode{
client: master,
addr: addr,
})
mu.Unlock()
return nil
})
Expect(err).NotTo(HaveOccurred())
Expect(len(masterNodes)).To(BeNumerically(">", 1))
// Single keyed command should go to exactly one shard - determined by hash slot
testKey := "test_key_12345"
testValue := "test_value"
result := client.Set(ctx, testKey, testValue, 0)
Expect(result.Err()).NotTo(HaveOccurred())
Expect(result.Val()).To(Equal("OK"))
time.Sleep(200 * time.Millisecond)
var targetNodeAddr string
foundNodes := 0
for _, node := range masterNodes {
getResult := node.client.Get(ctx, testKey)
if getResult.Err() == nil && getResult.Val() == testValue {
foundNodes++
targetNodeAddr = node.addr
} else {
}
}
Expect(foundNodes).To(Equal(1))
Expect(targetNodeAddr).NotTo(BeEmpty())
// Multiple commands with same key should go to same shard
finalValue := ""
for i := 0; i < 5; i++ {
finalValue = fmt.Sprintf("value_%d", i)
result := client.Set(ctx, testKey, finalValue, 0)
Expect(result.Err()).NotTo(HaveOccurred())
Expect(result.Val()).To(Equal("OK"))
}
time.Sleep(200 * time.Millisecond)
var currentTargetNode string
foundNodesAfterUpdate := 0
for _, node := range masterNodes {
getResult := node.client.Get(ctx, testKey)
if getResult.Err() == nil && getResult.Val() == finalValue {
foundNodesAfterUpdate++
currentTargetNode = node.addr
} else {
}
}
// All commands with same key should go to same shard
Expect(foundNodesAfterUpdate).To(Equal(1))
Expect(currentTargetNode).To(Equal(targetNodeAddr))
})
It("should aggregate responses according to explicit aggregation policies", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
type masterNode struct {
client *redis.Client
addr string
}
var masterNodes []masterNode
var mu sync.Mutex
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
addr := master.Options().Addr
mu.Lock()
masterNodes = append(masterNodes, masterNode{
client: master,
addr: addr,
})
mu.Unlock()
return nil
})
Expect(err).NotTo(HaveOccurred())
Expect(len(masterNodes)).To(BeNumerically(">", 1))
// verify TOUCH command has agg_sum policy
cmds, err := client.Command(ctx).Result()
Expect(err).NotTo(HaveOccurred())
touchCmd, exists := cmds["touch"]
if !exists || touchCmd.CommandPolicy == nil {
Skip("TOUCH command or tips not available")
}
Expect(touchCmd.CommandPolicy.Response.String()).To(Equal("agg_sum"))
testKeys := []string{
"touch_test_key_1111", // These keys should map to different hash slots
"touch_test_key_2222",
"touch_test_key_3333",
"touch_test_key_4444",
"touch_test_key_5555",
}
// Set keys on different shards
keysPerShard := make(map[string][]string)
for _, key := range testKeys {
result := client.Set(ctx, key, "test_value", 0)
Expect(result.Err()).NotTo(HaveOccurred())
// Find which shard contains this key
for _, node := range masterNodes {
getResult := node.client.Get(ctx, key)
if getResult.Err() == nil {
keysPerShard[node.addr] = append(keysPerShard[node.addr], key)
break
}
}
}
// Verify keys are distributed across multiple shards
shardsWithKeys := len(keysPerShard)
Expect(shardsWithKeys).To(BeNumerically(">", 1))
// Execute TOUCH command on all keys - this should aggregate results using agg_sum
touchResult := client.Touch(ctx, testKeys...)
Expect(touchResult.Err()).NotTo(HaveOccurred())
totalTouched := touchResult.Val()
Expect(totalTouched).To(Equal(int64(len(testKeys))))
totalKeysOnShards := 0
for _, keys := range keysPerShard {
totalKeysOnShards += len(keys)
}
Expect(totalKeysOnShards).To(Equal(len(testKeys)))
// FLUSHALL command with all_succeeded aggregation policy
flushallCmd, exists := cmds["flushall"]
if !exists || flushallCmd.CommandPolicy == nil {
Skip("FLUSHALL command or tips not available")
}
Expect(flushallCmd.CommandPolicy.Response.String()).To(Equal("all_succeeded"))
for i := 0; i < len(masterNodes); i++ {
testKey := fmt.Sprintf("flush_test_key_%d_%d", i, time.Now().UnixNano())
result := client.Set(ctx, testKey, "test_data", 0)
Expect(result.Err()).NotTo(HaveOccurred())
}
flushResult := client.FlushAll(ctx)
Expect(flushResult.Err()).NotTo(HaveOccurred())
Expect(flushResult.Val()).To(Equal("OK"))
for _, node := range masterNodes {
dbSizeResult := node.client.DBSize(ctx)
Expect(dbSizeResult.Err()).NotTo(HaveOccurred())
Expect(dbSizeResult.Val()).To(Equal(int64(0)))
}
// WAIT command aggregation policy - verify agg_min policy
waitCmd, exists := cmds["wait"]
if !exists || waitCmd.CommandPolicy == nil {
Skip("WAIT command or tips not available")
}
Expect(waitCmd.CommandPolicy.Response.String()).To(Equal("agg_min"))
// Set up some data to replicate
testKey := "wait_test_key_1111"
result := client.Set(ctx, testKey, "test_value", 0)
Expect(result.Err()).NotTo(HaveOccurred())
// Execute WAIT command - should aggregate using agg_min across all shards
// WAIT waits for a given number of replicas to acknowledge writes
// With agg_min policy, it returns the minimum number of replicas that acknowledged
waitResult := client.Wait(ctx, 0, 1000) // Wait for 0 replicas with 1 second timeout
Expect(waitResult.Err()).NotTo(HaveOccurred())
// The result should be the minimum number of replicas across all shards
// Since we're asking for 0 replicas, all shards should return 0, so min is 0
minReplicas := waitResult.Val()
Expect(minReplicas).To(BeNumerically(">=", 0))
// SCRIPT EXISTS command aggregation policy - verify agg_logical_and policy
scriptExistsCmd, exists := cmds["script exists"]
if !exists || scriptExistsCmd.CommandPolicy == nil {
Skip("SCRIPT EXISTS command or tips not available")
}
Expect(scriptExistsCmd.CommandPolicy.Response.String()).To(Equal("agg_logical_and"))
// Load a script on all shards
testScript := "return 'hello'"
scriptLoadResult := client.ScriptLoad(ctx, testScript)
Expect(scriptLoadResult.Err()).NotTo(HaveOccurred())
scriptSHA := scriptLoadResult.Val()
// Verify script exists on all shards using SCRIPT EXISTS
// With agg_logical_and policy, it should return true only if script exists on ALL shards
scriptExistsResult := client.ScriptExists(ctx, scriptSHA)
Expect(scriptExistsResult.Err()).NotTo(HaveOccurred())
existsResults := scriptExistsResult.Val()
Expect(len(existsResults)).To(Equal(1))
Expect(existsResults[0]).To(BeTrue()) // Script should exist on all shards
// Test with a non-existent script SHA
nonExistentSHA := "0000000000000000000000000000000000000000"
scriptExistsResult2 := client.ScriptExists(ctx, nonExistentSHA)
Expect(scriptExistsResult2.Err()).NotTo(HaveOccurred())
existsResults2 := scriptExistsResult2.Val()
Expect(len(existsResults2)).To(Equal(1))
Expect(existsResults2[0]).To(BeFalse()) // Script should not exist on any shard
// Test with mixed scenario - flush scripts from one shard manually
// This is harder to test in practice since SCRIPT FLUSH affects all shards
// So we'll just verify the basic functionality works
})
It("should verify command aggregation policies", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
cmds, err := client.Command(ctx).Result()
Expect(err).NotTo(HaveOccurred())
commandPolicies := map[string]string{
"touch": "agg_sum",
"flushall": "all_succeeded",
"pfcount": "default(hashslot)",
"exists": "agg_sum",
"script exists": "agg_logical_and",
"wait": "agg_min",
}
for cmdName, expectedPolicy := range commandPolicies {
cmd, exists := cmds[cmdName]
if !exists {
continue
}
if cmd.CommandPolicy == nil {
continue
}
actualPolicy := cmd.CommandPolicy.Response.String()
Expect(actualPolicy).To(Equal(expectedPolicy))
}
})
It("should properly aggregate responses from keyless commands executed on multiple shards", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
type masterNode struct {
client *redis.Client
addr string
}
var masterNodes []masterNode
var mu sync.Mutex
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
addr := master.Options().Addr
mu.Lock()
masterNodes = append(masterNodes, masterNode{
client: master,
addr: addr,
})
mu.Unlock()
return nil
})
Expect(err).NotTo(HaveOccurred())
Expect(len(masterNodes)).To(BeNumerically(">", 1))
// PING command with all_shards policy - should aggregate responses
cmds, err := client.Command(ctx).Result()
Expect(err).NotTo(HaveOccurred())
pingCmd, exists := cmds["ping"]
if exists && pingCmd.CommandPolicy != nil {
}
pingResult := client.Ping(ctx)
Expect(pingResult.Err()).NotTo(HaveOccurred())
Expect(pingResult.Val()).To(Equal("PONG"))
// Verify PING was executed on all shards by checking individual nodes
for _, node := range masterNodes {
nodePingResult := node.client.Ping(ctx)
Expect(nodePingResult.Err()).NotTo(HaveOccurred())
Expect(nodePingResult.Val()).To(Equal("PONG"))
}
// Test 2: DBSIZE command aggregation across shards - verify agg_sum policy
testKeys := []string{
"dbsize_test_key_1111",
"dbsize_test_key_2222",
"dbsize_test_key_3333",
"dbsize_test_key_4444",
}
for _, key := range testKeys {
result := client.Set(ctx, key, "test_value", 0)
Expect(result.Err()).NotTo(HaveOccurred())
}
dbSizeResult := client.DBSize(ctx)
Expect(dbSizeResult.Err()).NotTo(HaveOccurred())
totalSize := dbSizeResult.Val()
Expect(totalSize).To(BeNumerically(">=", int64(len(testKeys))))
// Verify aggregation by manually getting sizes from each shard
totalManualSize := int64(0)
for _, node := range masterNodes {
nodeDbSizeResult := node.client.DBSize(ctx)
Expect(nodeDbSizeResult.Err()).NotTo(HaveOccurred())
nodeSize := nodeDbSizeResult.Val()
totalManualSize += nodeSize
}
// Verify aggregation worked correctly
Expect(totalSize).To(Equal(totalManualSize))
})
It("should properly aggregate responses from keyed commands executed on multiple shards", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
type masterNode struct {
client *redis.Client
addr string
}
var masterNodes []masterNode
var mu sync.Mutex
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
addr := master.Options().Addr
mu.Lock()
masterNodes = append(masterNodes, masterNode{
client: master,
addr: addr,
})
mu.Unlock()
return nil
})
Expect(err).NotTo(HaveOccurred())
Expect(len(masterNodes)).To(BeNumerically(">", 1))
// MGET command with keys on different shards
testData := map[string]string{
"mget_test_key_1111": "value1",
"mget_test_key_2222": "value2",
"mget_test_key_3333": "value3",
"mget_test_key_4444": "value4",
"mget_test_key_5555": "value5",
}
keyLocations := make(map[string]string)
for key, value := range testData {
result := client.Set(ctx, key, value, 0)
Expect(result.Err()).NotTo(HaveOccurred())
for _, node := range masterNodes {
getResult := node.client.Get(ctx, key)
if getResult.Err() == nil && getResult.Val() == value {
keyLocations[key] = node.addr
break
}
}
}
shardsUsed := make(map[string]bool)
for _, shardAddr := range keyLocations {
shardsUsed[shardAddr] = true
}
Expect(len(shardsUsed)).To(BeNumerically(">", 1))
keys := make([]string, 0, len(testData))
expectedValues := make([]interface{}, 0, len(testData))
for key, value := range testData {
keys = append(keys, key)
expectedValues = append(expectedValues, value)
}
mgetResult := client.MGet(ctx, keys...)
Expect(mgetResult.Err()).NotTo(HaveOccurred())
actualValues := mgetResult.Val()
Expect(len(actualValues)).To(Equal(len(expectedValues)))
for i, value := range actualValues {
if value != nil {
Expect(value).To(Equal(expectedValues[i]))
} else {
Expect(value).To(BeNil())
}
}
// EXISTS command aggregation across multiple keys
existsTestData := map[string]string{
"exists_agg_key_1111": "value1",
"exists_agg_key_2222": "value2",
"exists_agg_key_3333": "value3",
}
existsKeys := make([]string, 0, len(existsTestData))
for key, value := range existsTestData {
result := client.Set(ctx, key, value, 0)
Expect(result.Err()).NotTo(HaveOccurred())
existsKeys = append(existsKeys, key)
}
// Add a non-existent key to the list
nonExistentKey := "non_existent_key_9999"
existsKeys = append(existsKeys, nonExistentKey)
existsResult := client.Exists(ctx, existsKeys...)
Expect(existsResult.Err()).NotTo(HaveOccurred())
existsCount := existsResult.Val()
Expect(existsCount).To(Equal(int64(len(existsTestData))))
})
It("should propagate coordinator errors to client without modification", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
type masterNode struct {
client *redis.Client
addr string
}
var masterNodes []masterNode
var mu sync.Mutex
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
addr := master.Options().Addr
mu.Lock()
masterNodes = append(masterNodes, masterNode{
client: master,
addr: addr,
})
mu.Unlock()
return nil
})
Expect(err).NotTo(HaveOccurred())
Expect(len(masterNodes)).To(BeNumerically(">", 0))
invalidSlotResult := client.ClusterAddSlotsRange(ctx, 99999, 100000)
coordinatorErr := invalidSlotResult.Err()
if coordinatorErr != nil {
// Verify the error is a Redis error
var redisErr redis.Error
Expect(errors.As(coordinatorErr, &redisErr)).To(BeTrue())
// Verify error message is preserved exactly as returned by coordinator
errorMsg := coordinatorErr.Error()
Expect(errorMsg).To(SatisfyAny(
ContainSubstring("slot"),
ContainSubstring("ERR"),
ContainSubstring("Invalid"),
))
// Test that the same error occurs when calling coordinator directly
coordinatorNode := masterNodes[0]
directResult := coordinatorNode.client.ClusterAddSlotsRange(ctx, 99999, 100000)
directErr := directResult.Err()
if directErr != nil {
Expect(coordinatorErr.Error()).To(Equal(directErr.Error()))
}
}
// Try cluster forget with invalid node ID
invalidNodeID := "invalid_node_id_12345"
forgetResult := client.ClusterForget(ctx, invalidNodeID)
forgetErr := forgetResult.Err()
if forgetErr != nil {
var redisErr redis.Error
Expect(errors.As(forgetErr, &redisErr)).To(BeTrue())
errorMsg := forgetErr.Error()
Expect(errorMsg).To(SatisfyAny(
ContainSubstring("Unknown node"),
ContainSubstring("Invalid node"),
ContainSubstring("ERR"),
))
coordinatorNode := masterNodes[0]
directForgetResult := coordinatorNode.client.ClusterForget(ctx, invalidNodeID)
directForgetErr := directForgetResult.Err()
if directForgetErr != nil {
Expect(forgetErr.Error()).To(Equal(directForgetErr.Error()))
}
}
// Test error type preservation and format
keySlotResult := client.ClusterKeySlot(ctx, "")
keySlotErr := keySlotResult.Err()
if keySlotErr != nil {
var redisErr redis.Error
Expect(errors.As(keySlotErr, &redisErr)).To(BeTrue())
errorMsg := keySlotErr.Error()
Expect(len(errorMsg)).To(BeNumerically(">", 0))
Expect(errorMsg).NotTo(ContainSubstring("wrapped"))
Expect(errorMsg).NotTo(ContainSubstring("context"))
}
// Verify error propagation consistency
clusterInfoResult := client.ClusterInfo(ctx)
clusterInfoErr := clusterInfoResult.Err()
if clusterInfoErr != nil {
var redisErr redis.Error
Expect(errors.As(clusterInfoErr, &redisErr)).To(BeTrue())
coordinatorNode := masterNodes[0]
directInfoResult := coordinatorNode.client.ClusterInfo(ctx)
directInfoErr := directInfoResult.Err()
if directInfoErr != nil {
Expect(clusterInfoErr.Error()).To(Equal(directInfoErr.Error()))
}
}
// Verify no error modification in router
invalidReplicateResult := client.ClusterReplicate(ctx, "00000000000000000000000000000000invalid00")
invalidReplicateErr := invalidReplicateResult.Err()
if invalidReplicateErr != nil {
var redisErr redis.Error
Expect(errors.As(invalidReplicateErr, &redisErr)).To(BeTrue())
errorMsg := invalidReplicateErr.Error()
Expect(errorMsg).NotTo(ContainSubstring("router"))
Expect(errorMsg).NotTo(ContainSubstring("cluster client"))
Expect(errorMsg).NotTo(ContainSubstring("failed to execute"))
Expect(errorMsg).To(SatisfyAny(
HavePrefix("ERR"),
ContainSubstring("Invalid"),
ContainSubstring("Unknown"),
))
}
})
Describe("Routing Policies Comprehensive Test Suite", func() {
It("should test MGET command with multi-slot routing and key order preservation", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Set up test data across multiple shards
testData := map[string]string{
"mget_test_key_1111": "value1",
"mget_test_key_2222": "value2",
"mget_test_key_3333": "value3",
"mget_test_key_4444": "value4",
"mget_test_key_5555": "value5",
}
// Set all keys
for key, value := range testData {
Expect(client.Set(ctx, key, value, 0).Err()).NotTo(HaveOccurred())
}
// Verify keys are distributed across multiple shards
slotMap := make(map[int]bool)
for key := range testData {
slot := hashtag.Slot(key)
slotMap[slot] = true
}
Expect(len(slotMap)).To(BeNumerically(">", 1))
// Test MGET with specific key order
keys := []string{
"mget_test_key_3333",
"mget_test_key_1111",
"mget_test_key_5555",
"mget_test_key_2222",
"mget_test_key_4444",
}
result := client.MGet(ctx, keys...)
Expect(result.Err()).NotTo(HaveOccurred())
// Verify values are returned in the same order as keys
values := result.Val()
Expect(len(values)).To(Equal(len(keys)))
for i, key := range keys {
Expect(values[i]).To(Equal(testData[key]))
}
})
It("should test MGET with non-existent keys across multiple shards", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Set up some keys
Expect(client.Set(ctx, "mget_exists_1111", "value1", 0).Err()).NotTo(HaveOccurred())
Expect(client.Set(ctx, "mget_exists_3333", "value3", 0).Err()).NotTo(HaveOccurred())
// MGET with mix of existing and non-existing keys
keys := []string{
"mget_exists_1111",
"mget_nonexist_2222",
"mget_exists_3333",
"mget_nonexist_4444",
}
result := client.MGet(ctx, keys...)
Expect(result.Err()).ToNot(HaveOccurred())
// MGET returns nil for non-existent keys (not errors)
// Values should be in the same order as requested keys
values := result.Val()
Expect(len(values)).To(Equal(4))
Expect(values[0]).To(Equal("value1")) // existing key
Expect(values[1]).To(BeNil()) // non-existent key returns nil
Expect(values[2]).To(Equal("value3")) // existing key
Expect(values[3]).To(BeNil()) // non-existent key returns nil
})
It("should test TOUCH command with multi-slot routing", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Set up keys across multiple shards
keys := []string{
"touch_test_key_1111",
"touch_test_key_2222",
"touch_test_key_3333",
"touch_test_key_4444",
}
// Set all keys
for _, key := range keys {
Expect(client.Set(ctx, key, "value", 0).Err()).NotTo(HaveOccurred())
}
// Verify keys are on different shards
slotMap := make(map[int]bool)
for _, key := range keys {
slot := hashtag.Slot(key)
slotMap[slot] = true
}
Expect(len(slotMap)).To(BeNumerically(">", 1))
// TOUCH should work across multiple shards
result := client.Touch(ctx, keys...)
Expect(result.Err()).NotTo(HaveOccurred())
Expect(result.Val()).To(Equal(int64(len(keys))))
})
It("should test DEL command with multi-slot routing", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Set up keys across multiple shards
keys := []string{
"del_test_key_1111",
"del_test_key_2222",
"del_test_key_3333",
}
// Set all keys
for _, key := range keys {
Expect(client.Set(ctx, key, "value", 0).Err()).NotTo(HaveOccurred())
}
// Verify keys are on different shards
slotMap := make(map[int]bool)
for _, key := range keys {
slot := hashtag.Slot(key)
slotMap[slot] = true
}
Expect(len(slotMap)).To(BeNumerically(">", 1))
// DEL should work across multiple shards
result := client.Del(ctx, keys...)
Expect(result.Err()).NotTo(HaveOccurred())
Expect(result.Val()).To(Equal(int64(len(keys))))
// Verify all keys were deleted
for _, key := range keys {
val := client.Get(ctx, key)
Expect(val.Err()).To(Equal(redis.Nil))
}
})
It("should test DBSIZE command with agg_sum aggregation across all shards", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Set keys across multiple shards
keys := []string{
"dbsize_test_1111",
"dbsize_test_2222",
"dbsize_test_3333",
"dbsize_test_4444",
"dbsize_test_5555",
}
// Clean up first
client.Del(ctx, keys...)
// Set all keys
for _, key := range keys {
Expect(client.Set(ctx, key, "value", 0).Err()).NotTo(HaveOccurred())
}
// DBSIZE should aggregate results from all shards
result := client.DBSize(ctx)
Expect(result.Err()).NotTo(HaveOccurred())
Expect(result.Val()).To(BeNumerically(">=", int64(len(keys))))
})
It("should test PING command with all_shards routing", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// PING should be sent to all shards and return one successful response
result := client.Ping(ctx)
Expect(result.Err()).NotTo(HaveOccurred())
Expect(result.Val()).To(Equal("PONG"))
})
It("should test MGET with single shard optimization", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Use hash tags to ensure all keys are on the same shard
keys := []string{
"{sameslot}key1",
"{sameslot}key2",
"{sameslot}key3",
}
// Verify all keys hash to the same slot
slot := hashtag.Slot(keys[0])
for _, key := range keys {
Expect(hashtag.Slot(key)).To(Equal(slot))
}
// Set all keys
for i, key := range keys {
Expect(client.Set(ctx, key, fmt.Sprintf("value%d", i+1), 0).Err()).NotTo(HaveOccurred())
}
// MGET should work even with single shard
result := client.MGet(ctx, keys...)
Expect(result.Err()).NotTo(HaveOccurred())
values := result.Val()
Expect(len(values)).To(Equal(3))
Expect(values[0]).To(Equal("value1"))
Expect(values[1]).To(Equal("value2"))
Expect(values[2]).To(Equal("value3"))
})
It("should test empty MGET command returns error", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// MGET with no keys should return an error
result := client.MGet(ctx)
Expect(result.Err()).To(HaveOccurred())
Expect(result.Err().Error()).To(ContainSubstring("multi-shard command mget has no key arguments"))
})
It("should test MGET integration with MSET across multiple shards", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Create test data
testData := map[string]string{
"integration_key_1111": "alpha",
"integration_key_2222": "beta",
"integration_key_3333": "gamma",
"integration_key_4444": "delta",
"integration_key_5555": "epsilon",
}
// Verify keys are on different shards
slotMap := make(map[int]bool)
for key := range testData {
slot := hashtag.Slot(key)
slotMap[slot] = true
}
Expect(len(slotMap)).To(BeNumerically(">", 1))
// Use individual SET commands instead of MSET
keys := make([]string, 0, len(testData))
for key := range testData {
keys = append(keys, key)
}
msetResult := client.MSet(ctx, testData)
Expect(msetResult.Err()).NotTo(HaveOccurred())
Expect(msetResult.Val()).To(Equal("OK"))
// Execute MGET
mgetResult := client.MGet(ctx, keys...)
Expect(mgetResult.Err()).NotTo(HaveOccurred())
// Verify all values match
values := mgetResult.Val()
Expect(len(values)).To(Equal(len(keys)))
for i, key := range keys {
Expect(values[i]).To(Equal(testData[key]))
}
})
It("should test multi-shard commands cannot be used in pipeline", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Create keys across multiple shards
keys := []string{
"pipeline_test_1111",
"pipeline_test_2222",
"pipeline_test_3333",
}
// Verify keys are on different shards
slotMap := make(map[int]bool)
for _, key := range keys {
slot := hashtag.Slot(key)
slotMap[slot] = true
}
Expect(len(slotMap)).To(BeNumerically(">", 1))
// Try to use MGET in pipeline - should fail
pipe := client.Pipeline()
pipe.MGet(ctx, keys...)
_, err := pipe.Exec(ctx)
Expect(err).To(HaveOccurred())
Expect(err.Error()).To(ContainSubstring("cannot pipeline command"))
})
It("should test DisableRoutingPolicies option disables routing policies", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Test 1: With routing policies enabled (default), MGET should work across slots
testData := map[string]string{
"disable_routing_key_1111": "value1",
"disable_routing_key_2222": "value2",
"disable_routing_key_3333": "value3",
}
// Set keys
for key, value := range testData {
Expect(client.Set(ctx, key, value, 0).Err()).NotTo(HaveOccurred())
}
// Verify keys are on different shards
slotMap := make(map[int]bool)
for key := range testData {
slot := hashtag.Slot(key)
slotMap[slot] = true
}
Expect(len(slotMap)).To(BeNumerically(">", 1))
keys := make([]string, 0, len(testData))
for key := range testData {
keys = append(keys, key)
}
// With routing policies enabled, MGET should work
mgetResult := client.MGet(ctx, keys...)
Expect(mgetResult.Err()).NotTo(HaveOccurred())
Expect(len(mgetResult.Val())).To(Equal(len(keys)))
// Test 2: With routing policies disabled, MGET should fail with CROSSSLOT error
opt := redisClusterOptions()
opt.DisableRoutingPolicies = true
clientWithoutPolicies := cluster.newClusterClient(ctx, opt)
defer clientWithoutPolicies.Close()
// Try MGET with routing policies disabled - should fail with CROSSSLOT error
mgetResultDisabled := clientWithoutPolicies.MGet(ctx, keys...)
Expect(mgetResultDisabled.Err()).To(HaveOccurred())
Expect(mgetResultDisabled.Err().Error()).To(ContainSubstring("CROSSSLOT"))
})
It("should test large MGET with many keys across all shards", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Create many keys to ensure coverage across all shards
numKeys := 100
keys := make([]string, numKeys)
values := make(map[string]string)
for i := 0; i < numKeys; i++ {
key := fmt.Sprintf("large_mget_key_%d", i)
value := fmt.Sprintf("value_%d", i)
keys[i] = key
values[key] = value
Expect(client.Set(ctx, key, value, 0).Err()).NotTo(HaveOccurred())
}
// Verify keys are distributed across multiple shards
slotMap := make(map[int]bool)
for _, key := range keys {
slot := hashtag.Slot(key)
slotMap[slot] = true
}
Expect(len(slotMap)).To(BeNumerically(">", 1))
// Execute MGET
result := client.MGet(ctx, keys...)
Expect(result.Err()).NotTo(HaveOccurred())
// Verify all values are correct
resultValues := result.Val()
Expect(len(resultValues)).To(Equal(numKeys))
for i, key := range keys {
Expect(resultValues[i]).To(Equal(values[key]))
}
})
})
It("should route keyless commands to arbitrary shards using round robin", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
var numMasters int
var numMastersMu sync.Mutex
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
numMastersMu.Lock()
numMasters++
numMastersMu.Unlock()
return nil
})
Expect(err).NotTo(HaveOccurred())
Expect(numMasters).To(BeNumerically(">", 1))
err = client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.ConfigResetStat(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
// Helper function to get ECHO command counts from all nodes
getEchoCounts := func() map[string]int {
echoCounts := make(map[string]int)
var echoCountsMu sync.Mutex
err := client.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
info := master.Info(ctx, "server")
Expect(info.Err()).NotTo(HaveOccurred())
serverInfo := info.Val()
portStart := strings.Index(serverInfo, "tcp_port:")
portLine := serverInfo[portStart:]
portEnd := strings.Index(portLine, "\r\n")
if portEnd == -1 {
portEnd = len(portLine)
}
port := strings.TrimPrefix(portLine[:portEnd], "tcp_port:")
commandStats := master.Info(ctx, "commandstats")
count := 0
if commandStats.Err() == nil {
stats := commandStats.Val()
cmdStatKey := "cmdstat_echo:"
if strings.Contains(stats, cmdStatKey) {
statStart := strings.Index(stats, cmdStatKey)
statLine := stats[statStart:]
statEnd := strings.Index(statLine, "\r\n")
if statEnd == -1 {
statEnd = len(statLine)
}
statLine = statLine[:statEnd]
callsStart := strings.Index(statLine, "calls=")
if callsStart != -1 {
callsStr := statLine[callsStart+6:]
callsEnd := strings.Index(callsStr, ",")
if callsEnd == -1 {
callsEnd = strings.Index(callsStr, "\r")
if callsEnd == -1 {
callsEnd = len(callsStr)
}
}
if callsCount, err := strconv.Atoi(callsStr[:callsEnd]); err == nil {
count = callsCount
}
}
}
}
echoCountsMu.Lock()
echoCounts[port] = count
echoCountsMu.Unlock()
return nil
})
Expect(err).NotTo(HaveOccurred())
return echoCounts
}
// Single ECHO command should go to exactly one shard
result := client.Echo(ctx, "single_test")
Expect(result.Err()).NotTo(HaveOccurred())
Expect(result.Val()).To(Equal("single_test"))
time.Sleep(200 * time.Millisecond)
// Verify single command went to exactly one shard
echoCounts := getEchoCounts()
shardsWithEcho := 0
for _, count := range echoCounts {
if count > 0 {
shardsWithEcho++
Expect(count).To(Equal(1))
}
}
Expect(shardsWithEcho).To(Equal(1))
// Test Multiple ECHO commands should distribute across all shards using round robin
numCommands := numMasters * 5
for i := 0; i < numCommands; i++ {
result := client.Echo(ctx, fmt.Sprintf("multi_test_%d", i))
Expect(result.Err()).NotTo(HaveOccurred())
Expect(result.Val()).To(Equal(fmt.Sprintf("multi_test_%d", i)))
}
time.Sleep(200 * time.Millisecond)
echoCounts = getEchoCounts()
totalEchos := 0
shardsWithEchos := 0
for _, count := range echoCounts {
if count > 0 {
shardsWithEchos++
}
totalEchos += count
}
// All shards should now have some ECHO commands
Expect(shardsWithEchos).To(Equal(numMasters))
expectedTotal := 1 + numCommands
Expect(totalEchos).To(Equal(expectedTotal))
})
})
var _ = Describe("ClusterClient ParseURL", func() {
cases := []struct {
test string
url string
o *redis.ClusterOptions // expected value
err error
}{
{
test: "ParseRedisURL",
url: "redis://localhost:123",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}},
}, {
test: "ParseRedissURL",
url: "rediss://localhost:123",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
}, {
test: "MissingRedisPort",
url: "redis://localhost",
o: &redis.ClusterOptions{Addrs: []string{"localhost:6379"}},
}, {
test: "MissingRedissPort",
url: "rediss://localhost",
o: &redis.ClusterOptions{Addrs: []string{"localhost:6379"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
}, {
test: "MultipleRedisURLs",
url: "redis://localhost:123?addr=localhost:1234&addr=localhost:12345",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234", "localhost:12345"}},
}, {
test: "MultipleRedissURLs",
url: "rediss://localhost:123?addr=localhost:1234&addr=localhost:12345",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234", "localhost:12345"}, TLSConfig: &tls.Config{ServerName: "localhost"}},
}, {
test: "OnlyPassword",
url: "redis://:bar@localhost:123",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Password: "bar"},
}, {
test: "OnlyUser",
url: "redis://foo@localhost:123",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Username: "foo"},
}, {
test: "RedisUsernamePassword",
url: "redis://foo:bar@localhost:123",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Username: "foo", Password: "bar"},
}, {
test: "RedissUsernamePassword",
url: "rediss://foo:bar@localhost:123?addr=localhost:1234",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, Username: "foo", Password: "bar", TLSConfig: &tls.Config{ServerName: "localhost"}},
}, {
test: "QueryParameters",
url: "redis://localhost:123?read_timeout=2&pool_fifo=true&addr=localhost:1234",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123", "localhost:1234"}, ReadTimeout: 2 * time.Second, PoolFIFO: true},
}, {
test: "DisabledTimeout",
url: "redis://localhost:123?conn_max_idle_time=0",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: -1},
}, {
test: "DisabledTimeoutNeg",
url: "redis://localhost:123?conn_max_idle_time=-1",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: -1},
}, {
test: "UseDefault",
url: "redis://localhost:123?conn_max_idle_time=",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
}, {
test: "Protocol",
url: "redis://localhost:123?protocol=2",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, Protocol: 2},
}, {
test: "ClientName",
url: "redis://localhost:123?client_name=cluster_hi",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ClientName: "cluster_hi"},
}, {
test: "UseDefaultMissing=",
url: "redis://localhost:123?conn_max_idle_time",
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
}, {
test: "InvalidQueryAddr",
url: "rediss://foo:bar@localhost:123?addr=rediss://foo:barr@localhost:1234",
err: errors.New(`redis: unable to parse addr param: rediss://foo:barr@localhost:1234`),
}, {
test: "InvalidInt",
url: "redis://localhost?pool_size=five",
err: errors.New(`redis: invalid pool_size number: strconv.Atoi: parsing "five": invalid syntax`),
}, {
test: "InvalidBool",
url: "redis://localhost?pool_fifo=yes",
err: errors.New(`redis: invalid pool_fifo boolean: expected true/false/1/0 or an empty string, got "yes"`),
}, {
test: "UnknownParam",
url: "redis://localhost?abc=123",
err: errors.New("redis: unexpected option: abc"),
}, {
test: "InvalidScheme",
url: "https://google.com",
err: errors.New("redis: invalid URL scheme: https"),
},
}
It("should match ParseClusterURL", func() {
for i := range cases {
tc := cases[i]
actual, err := redis.ParseClusterURL(tc.url)
if tc.err != nil {
Expect(err).Should(MatchError(tc.err))
} else {
Expect(err).NotTo(HaveOccurred())
}
if err == nil {
Expect(tc.o).NotTo(BeNil())
Expect(tc.o.Addrs).To(Equal(actual.Addrs))
Expect(tc.o.TLSConfig).To(Equal(actual.TLSConfig))
Expect(tc.o.Username).To(Equal(actual.Username))
Expect(tc.o.Password).To(Equal(actual.Password))
Expect(tc.o.MaxRetries).To(Equal(actual.MaxRetries))
Expect(tc.o.MinRetryBackoff).To(Equal(actual.MinRetryBackoff))
Expect(tc.o.MaxRetryBackoff).To(Equal(actual.MaxRetryBackoff))
Expect(tc.o.DialTimeout).To(Equal(actual.DialTimeout))
Expect(tc.o.ReadTimeout).To(Equal(actual.ReadTimeout))
Expect(tc.o.WriteTimeout).To(Equal(actual.WriteTimeout))
Expect(tc.o.PoolFIFO).To(Equal(actual.PoolFIFO))
Expect(tc.o.PoolSize).To(Equal(actual.PoolSize))
Expect(tc.o.MinIdleConns).To(Equal(actual.MinIdleConns))
Expect(tc.o.ConnMaxLifetime).To(Equal(actual.ConnMaxLifetime))
Expect(tc.o.ConnMaxIdleTime).To(Equal(actual.ConnMaxIdleTime))
Expect(tc.o.PoolTimeout).To(Equal(actual.PoolTimeout))
}
}
})
It("should distribute keyless commands randomly across shards using random shard picker", func() {
SkipBeforeRedisVersion(7.9, "The tips are included from Redis 8")
// Create a cluster client with random shard picker
opt := redisClusterOptions()
opt.ShardPicker = &routing.RandomPicker{}
randomClient := cluster.newClusterClient(ctx, opt)
defer randomClient.Close()
Eventually(func() error {
return randomClient.Ping(ctx).Err()
}, 30*time.Second).ShouldNot(HaveOccurred())
var numMasters int
var numMastersMu sync.Mutex
err := randomClient.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
numMastersMu.Lock()
numMasters++
numMastersMu.Unlock()
return nil
})
Expect(err).NotTo(HaveOccurred())
Expect(numMasters).To(BeNumerically(">", 1))
// Reset command statistics on all masters
err = randomClient.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
return master.ConfigResetStat(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
// Helper function to get ECHO command counts from all nodes
getEchoCounts := func() map[string]int {
echoCounts := make(map[string]int)
var echoCountsMu sync.Mutex
err := randomClient.ForEachMaster(ctx, func(ctx context.Context, master *redis.Client) error {
addr := master.Options().Addr
port := addr[strings.LastIndex(addr, ":")+1:]
info, err := master.Info(ctx, "commandstats").Result()
if err != nil {
return err
}
count := 0
if strings.Contains(info, "cmdstat_echo:") {
lines := strings.Split(info, "\n")
for _, line := range lines {
if strings.HasPrefix(line, "cmdstat_echo:") {
parts := strings.Split(line, ",")
if len(parts) > 0 {
callsPart := strings.Split(parts[0], "=")
if len(callsPart) > 1 {
if parsedCount, parseErr := strconv.Atoi(callsPart[1]); parseErr == nil {
count = parsedCount
}
}
}
break
}
}
}
echoCountsMu.Lock()
echoCounts[port] = count
echoCountsMu.Unlock()
return nil
})
Expect(err).NotTo(HaveOccurred())
return echoCounts
}
// Execute multiple ECHO commands and measure distribution
numCommands := 100
for i := 0; i < numCommands; i++ {
result := randomClient.Echo(ctx, fmt.Sprintf("random_test_%d", i))
Expect(result.Err()).NotTo(HaveOccurred())
}
echoCounts := getEchoCounts()
totalEchos := 0
shardsWithEchos := 0
for _, count := range echoCounts {
if count > 0 {
shardsWithEchos++
}
totalEchos += count
}
Expect(totalEchos).To(Equal(numCommands))
Expect(shardsWithEchos).To(BeNumerically(">=", 2))
})
})
})
var _ = Describe("ClusterClient FailingTimeoutSeconds", func() {
var client *redis.ClusterClient
AfterEach(func() {
if client != nil {
_ = client.Close()
}
})
It("should use default failing timeout of 15 seconds", func() {
opt := redisClusterOptions()
client = cluster.newClusterClient(ctx, opt)
// Default should be 15 seconds
Expect(opt.FailingTimeoutSeconds).To(Equal(15))
})
It("should use custom failing timeout", func() {
opt := redisClusterOptions()
opt.FailingTimeoutSeconds = 30
client = cluster.newClusterClient(ctx, opt)
// Should use custom value
Expect(opt.FailingTimeoutSeconds).To(Equal(30))
})
It("should parse failing_timeout_seconds from URL", func() {
url := "redis://localhost:16600?failing_timeout_seconds=25"
opt, err := redis.ParseClusterURL(url)
Expect(err).NotTo(HaveOccurred())
Expect(opt.FailingTimeoutSeconds).To(Equal(25))
})
It("should handle node failing timeout correctly", func() {
opt := redisClusterOptions()
opt.FailingTimeoutSeconds = 2 // Short timeout for testing
client = cluster.newClusterClient(ctx, opt)
// Get a node and mark it as failing
nodes, err := client.Nodes(ctx, "A")
Expect(err).NotTo(HaveOccurred())
Expect(len(nodes)).To(BeNumerically(">", 0))
node := nodes[0]
// Initially not failing
Expect(node.Failing()).To(BeFalse())
// Mark as failing
node.MarkAsFailing()
Expect(node.Failing()).To(BeTrue())
// Should still be failing after 1 second (less than timeout)
time.Sleep(1 * time.Second)
Expect(node.Failing()).To(BeTrue())
// Should not be failing after timeout expires
time.Sleep(2 * time.Second) // Total 3 seconds > 2 second timeout
Expect(node.Failing()).To(BeFalse())
})
It("should handle zero timeout by using default", func() {
opt := redisClusterOptions()
opt.FailingTimeoutSeconds = 0 // Should use default
client = cluster.newClusterClient(ctx, opt)
// After initialization, should be set to default
Expect(opt.FailingTimeoutSeconds).To(Equal(15))
})
})