From 9aba95a74fa29216ff9cf453f8ebae452c3551df Mon Sep 17 00:00:00 2001 From: Anurag Bandyopadhyay Date: Wed, 29 Mar 2023 19:14:06 +0530 Subject: [PATCH] feat: Add support for CLUSTER SHARDS command (#2507) * feat: Adding support for CLUSTER SHARDS command Co-authored-by: Anuragkillswitch <70265851+Anuragkillswitch@users.noreply.github.com> --- cluster_test.go | 31 ++++++++++ command.go | 150 ++++++++++++++++++++++++++++++++++++++++++++++++ commands.go | 7 +++ 3 files changed, 188 insertions(+) diff --git a/cluster_test.go b/cluster_test.go index bfc1c259..92471647 100644 --- a/cluster_test.go +++ b/cluster_test.go @@ -677,6 +677,37 @@ var _ = Describe("ClusterClient", func() { Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred()) }) + It("should CLUSTER SHARDS", func() { + res, err := client.ClusterShards(ctx).Result() + Expect(err).NotTo(HaveOccurred()) + Expect(res).NotTo(BeEmpty()) + + // Iterate over the ClusterShard results and validate the fields. + for _, shard := range res { + Expect(shard.Slots).NotTo(BeEmpty()) + for _, slotRange := range shard.Slots { + Expect(slotRange.Start).To(BeNumerically(">=", 0)) + Expect(slotRange.End).To(BeNumerically(">=", slotRange.Start)) + } + + Expect(shard.Nodes).NotTo(BeEmpty()) + for _, node := range shard.Nodes { + Expect(node.ID).NotTo(BeEmpty()) + Expect(node.Endpoint).NotTo(BeEmpty()) + Expect(node.IP).NotTo(BeEmpty()) + Expect(node.Port).To(BeNumerically(">", 0)) + + validRoles := []string{"master", "slave", "replica"} + Expect(validRoles).To(ContainElement(node.Role)) + + Expect(node.ReplicationOffset).To(BeNumerically(">=", 0)) + + validHealthStatuses := []string{"online", "failed", "loading"} + Expect(validHealthStatuses).To(ContainElement(node.Health)) + } + } + }) + It("should CLUSTER LINKS", func() { res, err := client.ClusterLinks(ctx).Result() Expect(err).NotTo(HaveOccurred()) diff --git a/command.go b/command.go index 04aebe93..637e9cdf 100644 --- a/command.go +++ b/command.go @@ -4335,3 +4335,153 @@ func (cmd *ClusterLinksCmd) readReply(rd *proto.Reader) error { return nil } + +// ------------------------------------------------------------------------------------------------------------------ + +type SlotRange struct { + Start int64 + End int64 +} + +type Node struct { + ID string + Endpoint string + IP string + Hostname string + Port int64 + TLSPort int64 + Role string + ReplicationOffset int64 + Health string +} + +type ClusterShard struct { + Slots []SlotRange + Nodes []Node +} + +type ClusterShardsCmd struct { + baseCmd + + val []ClusterShard +} + +var _ Cmder = (*ClusterShardsCmd)(nil) + +func NewClusterShardsCmd(ctx context.Context, args ...interface{}) *ClusterShardsCmd { + return &ClusterShardsCmd{ + baseCmd: baseCmd{ + ctx: ctx, + args: args, + }, + } +} + +func (cmd *ClusterShardsCmd) SetVal(val []ClusterShard) { + cmd.val = val +} + +func (cmd *ClusterShardsCmd) Val() []ClusterShard { + return cmd.val +} + +func (cmd *ClusterShardsCmd) Result() ([]ClusterShard, error) { + return cmd.Val(), cmd.Err() +} + +func (cmd *ClusterShardsCmd) String() string { + return cmdString(cmd, cmd.val) +} + +func (cmd *ClusterShardsCmd) readReply(rd *proto.Reader) error { + n, err := rd.ReadArrayLen() + if err != nil { + return err + } + cmd.val = make([]ClusterShard, n) + + for i := 0; i < n; i++ { + m, err := rd.ReadMapLen() + if err != nil { + return err + } + + for j := 0; j < m; j++ { + key, err := rd.ReadString() + if err != nil { + return err + } + + switch key { + case "slots": + l, err := rd.ReadArrayLen() + if err != nil { + return err + } + for k := 0; k < l; k += 2 { + start, err := rd.ReadInt() + if err != nil { + return err + } + + end, err := rd.ReadInt() + if err != nil { + return err + } + + cmd.val[i].Slots = append(cmd.val[i].Slots, SlotRange{Start: start, End: end}) + } + case "nodes": + nodesLen, err := rd.ReadArrayLen() + if err != nil { + return err + } + cmd.val[i].Nodes = make([]Node, nodesLen) + for k := 0; k < nodesLen; k++ { + nodeMapLen, err := rd.ReadMapLen() + if err != nil { + return err + } + + for l := 0; l < nodeMapLen; l++ { + nodeKey, err := rd.ReadString() + if err != nil { + return err + } + + switch nodeKey { + case "id": + cmd.val[i].Nodes[k].ID, err = rd.ReadString() + case "endpoint": + cmd.val[i].Nodes[k].Endpoint, err = rd.ReadString() + case "ip": + cmd.val[i].Nodes[k].IP, err = rd.ReadString() + case "hostname": + cmd.val[i].Nodes[k].Hostname, err = rd.ReadString() + case "port": + cmd.val[i].Nodes[k].Port, err = rd.ReadInt() + case "tls-port": + cmd.val[i].Nodes[k].TLSPort, err = rd.ReadInt() + case "role": + cmd.val[i].Nodes[k].Role, err = rd.ReadString() + case "replication-offset": + cmd.val[i].Nodes[k].ReplicationOffset, err = rd.ReadInt() + case "health": + cmd.val[i].Nodes[k].Health, err = rd.ReadString() + default: + return fmt.Errorf("redis: unexpected key %q in CLUSTER SHARDS node reply", nodeKey) + } + + if err != nil { + return err + } + } + } + default: + return fmt.Errorf("redis: unexpected key %q in CLUSTER SHARDS reply", key) + } + } + } + + return nil +} diff --git a/commands.go b/commands.go index 2d0b7712..b8eef9dd 100644 --- a/commands.go +++ b/commands.go @@ -422,6 +422,7 @@ type Cmdable interface { PubSubShardNumSub(ctx context.Context, channels ...string) *MapStringIntCmd ClusterSlots(ctx context.Context) *ClusterSlotsCmd + ClusterShards(ctx context.Context) *ClusterShardsCmd ClusterLinks(ctx context.Context) *ClusterLinksCmd ClusterNodes(ctx context.Context) *StringCmd ClusterMeet(ctx context.Context, host, port string) *StatusCmd @@ -3506,6 +3507,12 @@ func (c cmdable) ClusterSlots(ctx context.Context) *ClusterSlotsCmd { return cmd } +func (c cmdable) ClusterShards(ctx context.Context) *ClusterShardsCmd { + cmd := NewClusterShardsCmd(ctx, "cluster", "shards") + _ = c(ctx, cmd) + return cmd +} + func (c cmdable) ClusterLinks(ctx context.Context) *ClusterLinksCmd { cmd := NewClusterLinksCmd(ctx, "cluster", "links") _ = c(ctx, cmd)