From 0f66cd002d49a7b37c6677344958c4b2be032c92 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Thu, 19 Jun 2025 12:07:36 +0300 Subject: [PATCH] fix(cmdSlot): Add preferred random slot --- internal_test.go | 11 +++++++++-- osscluster.go | 28 +++++++++++++++++++++------- 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/internal_test.go b/internal_test.go index 8f1f1f31..4a655cff 100644 --- a/internal_test.go +++ b/internal_test.go @@ -364,15 +364,22 @@ var _ = Describe("ClusterClient", func() { It("select slot from args for GETKEYSINSLOT command", func() { cmd := NewStringSliceCmd(ctx, "cluster", "getkeysinslot", 100, 200) - slot := client.cmdSlot(cmd) + slot := client.cmdSlot(cmd, -1) Expect(slot).To(Equal(100)) }) It("select slot from args for COUNTKEYSINSLOT command", func() { cmd := NewStringSliceCmd(ctx, "cluster", "countkeysinslot", 100) - slot := client.cmdSlot(cmd) + slot := client.cmdSlot(cmd, -1) Expect(slot).To(Equal(100)) }) + + It("follows preferred random slot", func() { + cmd := NewStatusCmd(ctx, "ping") + + slot := client.cmdSlot(cmd, 101) + Expect(slot).To(Equal(101)) + }) }) }) diff --git a/osscluster.go b/osscluster.go index 48630929..d92a26b5 100644 --- a/osscluster.go +++ b/osscluster.go @@ -998,7 +998,7 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error { } func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error { - slot := c.cmdSlot(cmd) + slot := c.cmdSlot(cmd, -1) var node *clusterNode var moved bool var ask bool @@ -1344,9 +1344,13 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd return err } + preferredRandomSlot := -1 if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) { for _, cmd := range cmds { - slot := c.cmdSlot(cmd) + slot := c.cmdSlot(cmd, preferredRandomSlot) + if preferredRandomSlot == -1 { + preferredRandomSlot = slot + } node, err := c.slotReadOnlyNode(state, slot) if err != nil { return err @@ -1357,7 +1361,10 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd } for _, cmd := range cmds { - slot := c.cmdSlot(cmd) + slot := c.cmdSlot(cmd, preferredRandomSlot) + if preferredRandomSlot == -1 { + preferredRandomSlot = slot + } node, err := state.slotMasterNode(slot) if err != nil { return err @@ -1594,8 +1601,12 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { cmdsMap := make(map[int][]Cmder) + preferredRandomSlot := -1 for _, cmd := range cmds { - slot := c.cmdSlot(cmd) + slot := c.cmdSlot(cmd, preferredRandomSlot) + if preferredRandomSlot == -1 { + preferredRandomSlot = slot + } cmdsMap[slot] = append(cmdsMap[slot], cmd) } return cmdsMap @@ -1925,17 +1936,20 @@ func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo { return info } -func (c *ClusterClient) cmdSlot(cmd Cmder) int { +func (c *ClusterClient) cmdSlot(cmd Cmder, preferredRandomSlot int) int { args := cmd.Args() if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") { return args[2].(int) } - return cmdSlot(cmd, cmdFirstKeyPos(cmd)) + return cmdSlot(cmd, cmdFirstKeyPos(cmd), preferredRandomSlot) } -func cmdSlot(cmd Cmder, pos int) int { +func cmdSlot(cmd Cmder, pos int, preferredRandomSlot int) int { if pos == 0 { + if preferredRandomSlot != -1 { + return preferredRandomSlot + } return hashtag.RandomSlot() } firstKey := cmd.stringArg(pos)