mirror of
https://github.com/redis/go-redis.git
synced 2025-04-17 20:17:02 +03:00
feat: Add support for CLUSTER SHARDS command (#2507)
* feat: Adding support for CLUSTER SHARDS command Co-authored-by: Anuragkillswitch <70265851+Anuragkillswitch@users.noreply.github.com>
This commit is contained in:
parent
2cdd5ea34a
commit
9aba95a74f
@ -677,6 +677,37 @@ var _ = Describe("ClusterClient", func() {
|
|||||||
Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
|
Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should CLUSTER SHARDS", func() {
|
||||||
|
res, err := client.ClusterShards(ctx).Result()
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
Expect(res).NotTo(BeEmpty())
|
||||||
|
|
||||||
|
// Iterate over the ClusterShard results and validate the fields.
|
||||||
|
for _, shard := range res {
|
||||||
|
Expect(shard.Slots).NotTo(BeEmpty())
|
||||||
|
for _, slotRange := range shard.Slots {
|
||||||
|
Expect(slotRange.Start).To(BeNumerically(">=", 0))
|
||||||
|
Expect(slotRange.End).To(BeNumerically(">=", slotRange.Start))
|
||||||
|
}
|
||||||
|
|
||||||
|
Expect(shard.Nodes).NotTo(BeEmpty())
|
||||||
|
for _, node := range shard.Nodes {
|
||||||
|
Expect(node.ID).NotTo(BeEmpty())
|
||||||
|
Expect(node.Endpoint).NotTo(BeEmpty())
|
||||||
|
Expect(node.IP).NotTo(BeEmpty())
|
||||||
|
Expect(node.Port).To(BeNumerically(">", 0))
|
||||||
|
|
||||||
|
validRoles := []string{"master", "slave", "replica"}
|
||||||
|
Expect(validRoles).To(ContainElement(node.Role))
|
||||||
|
|
||||||
|
Expect(node.ReplicationOffset).To(BeNumerically(">=", 0))
|
||||||
|
|
||||||
|
validHealthStatuses := []string{"online", "failed", "loading"}
|
||||||
|
Expect(validHealthStatuses).To(ContainElement(node.Health))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
It("should CLUSTER LINKS", func() {
|
It("should CLUSTER LINKS", func() {
|
||||||
res, err := client.ClusterLinks(ctx).Result()
|
res, err := client.ClusterLinks(ctx).Result()
|
||||||
Expect(err).NotTo(HaveOccurred())
|
Expect(err).NotTo(HaveOccurred())
|
||||||
|
150
command.go
150
command.go
@ -4335,3 +4335,153 @@ func (cmd *ClusterLinksCmd) readReply(rd *proto.Reader) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ------------------------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
type SlotRange struct {
|
||||||
|
Start int64
|
||||||
|
End int64
|
||||||
|
}
|
||||||
|
|
||||||
|
type Node struct {
|
||||||
|
ID string
|
||||||
|
Endpoint string
|
||||||
|
IP string
|
||||||
|
Hostname string
|
||||||
|
Port int64
|
||||||
|
TLSPort int64
|
||||||
|
Role string
|
||||||
|
ReplicationOffset int64
|
||||||
|
Health string
|
||||||
|
}
|
||||||
|
|
||||||
|
type ClusterShard struct {
|
||||||
|
Slots []SlotRange
|
||||||
|
Nodes []Node
|
||||||
|
}
|
||||||
|
|
||||||
|
type ClusterShardsCmd struct {
|
||||||
|
baseCmd
|
||||||
|
|
||||||
|
val []ClusterShard
|
||||||
|
}
|
||||||
|
|
||||||
|
var _ Cmder = (*ClusterShardsCmd)(nil)
|
||||||
|
|
||||||
|
func NewClusterShardsCmd(ctx context.Context, args ...interface{}) *ClusterShardsCmd {
|
||||||
|
return &ClusterShardsCmd{
|
||||||
|
baseCmd: baseCmd{
|
||||||
|
ctx: ctx,
|
||||||
|
args: args,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *ClusterShardsCmd) SetVal(val []ClusterShard) {
|
||||||
|
cmd.val = val
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *ClusterShardsCmd) Val() []ClusterShard {
|
||||||
|
return cmd.val
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *ClusterShardsCmd) Result() ([]ClusterShard, error) {
|
||||||
|
return cmd.Val(), cmd.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *ClusterShardsCmd) String() string {
|
||||||
|
return cmdString(cmd, cmd.val)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cmd *ClusterShardsCmd) readReply(rd *proto.Reader) error {
|
||||||
|
n, err := rd.ReadArrayLen()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cmd.val = make([]ClusterShard, n)
|
||||||
|
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
m, err := rd.ReadMapLen()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for j := 0; j < m; j++ {
|
||||||
|
key, err := rd.ReadString()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch key {
|
||||||
|
case "slots":
|
||||||
|
l, err := rd.ReadArrayLen()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for k := 0; k < l; k += 2 {
|
||||||
|
start, err := rd.ReadInt()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
end, err := rd.ReadInt()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
cmd.val[i].Slots = append(cmd.val[i].Slots, SlotRange{Start: start, End: end})
|
||||||
|
}
|
||||||
|
case "nodes":
|
||||||
|
nodesLen, err := rd.ReadArrayLen()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
cmd.val[i].Nodes = make([]Node, nodesLen)
|
||||||
|
for k := 0; k < nodesLen; k++ {
|
||||||
|
nodeMapLen, err := rd.ReadMapLen()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for l := 0; l < nodeMapLen; l++ {
|
||||||
|
nodeKey, err := rd.ReadString()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
switch nodeKey {
|
||||||
|
case "id":
|
||||||
|
cmd.val[i].Nodes[k].ID, err = rd.ReadString()
|
||||||
|
case "endpoint":
|
||||||
|
cmd.val[i].Nodes[k].Endpoint, err = rd.ReadString()
|
||||||
|
case "ip":
|
||||||
|
cmd.val[i].Nodes[k].IP, err = rd.ReadString()
|
||||||
|
case "hostname":
|
||||||
|
cmd.val[i].Nodes[k].Hostname, err = rd.ReadString()
|
||||||
|
case "port":
|
||||||
|
cmd.val[i].Nodes[k].Port, err = rd.ReadInt()
|
||||||
|
case "tls-port":
|
||||||
|
cmd.val[i].Nodes[k].TLSPort, err = rd.ReadInt()
|
||||||
|
case "role":
|
||||||
|
cmd.val[i].Nodes[k].Role, err = rd.ReadString()
|
||||||
|
case "replication-offset":
|
||||||
|
cmd.val[i].Nodes[k].ReplicationOffset, err = rd.ReadInt()
|
||||||
|
case "health":
|
||||||
|
cmd.val[i].Nodes[k].Health, err = rd.ReadString()
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("redis: unexpected key %q in CLUSTER SHARDS node reply", nodeKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("redis: unexpected key %q in CLUSTER SHARDS reply", key)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
@ -422,6 +422,7 @@ type Cmdable interface {
|
|||||||
PubSubShardNumSub(ctx context.Context, channels ...string) *MapStringIntCmd
|
PubSubShardNumSub(ctx context.Context, channels ...string) *MapStringIntCmd
|
||||||
|
|
||||||
ClusterSlots(ctx context.Context) *ClusterSlotsCmd
|
ClusterSlots(ctx context.Context) *ClusterSlotsCmd
|
||||||
|
ClusterShards(ctx context.Context) *ClusterShardsCmd
|
||||||
ClusterLinks(ctx context.Context) *ClusterLinksCmd
|
ClusterLinks(ctx context.Context) *ClusterLinksCmd
|
||||||
ClusterNodes(ctx context.Context) *StringCmd
|
ClusterNodes(ctx context.Context) *StringCmd
|
||||||
ClusterMeet(ctx context.Context, host, port string) *StatusCmd
|
ClusterMeet(ctx context.Context, host, port string) *StatusCmd
|
||||||
@ -3506,6 +3507,12 @@ func (c cmdable) ClusterSlots(ctx context.Context) *ClusterSlotsCmd {
|
|||||||
return cmd
|
return cmd
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c cmdable) ClusterShards(ctx context.Context) *ClusterShardsCmd {
|
||||||
|
cmd := NewClusterShardsCmd(ctx, "cluster", "shards")
|
||||||
|
_ = c(ctx, cmd)
|
||||||
|
return cmd
|
||||||
|
}
|
||||||
|
|
||||||
func (c cmdable) ClusterLinks(ctx context.Context) *ClusterLinksCmd {
|
func (c cmdable) ClusterLinks(ctx context.Context) *ClusterLinksCmd {
|
||||||
cmd := NewClusterLinksCmd(ctx, "cluster", "links")
|
cmd := NewClusterLinksCmd(ctx, "cluster", "links")
|
||||||
_ = c(ctx, cmd)
|
_ = c(ctx, cmd)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user