mirror of
https://github.com/redis/go-redis.git
synced 2025-07-28 06:42:00 +03:00
fix(cmdSlot): Add preferred random slot
This commit is contained in:
@ -364,15 +364,22 @@ var _ = Describe("ClusterClient", func() {
|
||||
It("select slot from args for GETKEYSINSLOT command", func() {
|
||||
cmd := NewStringSliceCmd(ctx, "cluster", "getkeysinslot", 100, 200)
|
||||
|
||||
slot := client.cmdSlot(cmd)
|
||||
slot := client.cmdSlot(cmd, -1)
|
||||
Expect(slot).To(Equal(100))
|
||||
})
|
||||
|
||||
It("select slot from args for COUNTKEYSINSLOT command", func() {
|
||||
cmd := NewStringSliceCmd(ctx, "cluster", "countkeysinslot", 100)
|
||||
|
||||
slot := client.cmdSlot(cmd)
|
||||
slot := client.cmdSlot(cmd, -1)
|
||||
Expect(slot).To(Equal(100))
|
||||
})
|
||||
|
||||
It("follows preferred random slot", func() {
|
||||
cmd := NewStatusCmd(ctx, "ping")
|
||||
|
||||
slot := client.cmdSlot(cmd, 101)
|
||||
Expect(slot).To(Equal(101))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
@ -998,7 +998,7 @@ func (c *ClusterClient) Process(ctx context.Context, cmd Cmder) error {
|
||||
}
|
||||
|
||||
func (c *ClusterClient) process(ctx context.Context, cmd Cmder) error {
|
||||
slot := c.cmdSlot(cmd)
|
||||
slot := c.cmdSlot(cmd, -1)
|
||||
var node *clusterNode
|
||||
var moved bool
|
||||
var ask bool
|
||||
@ -1344,9 +1344,13 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
|
||||
return err
|
||||
}
|
||||
|
||||
preferredRandomSlot := -1
|
||||
if c.opt.ReadOnly && c.cmdsAreReadOnly(ctx, cmds) {
|
||||
for _, cmd := range cmds {
|
||||
slot := c.cmdSlot(cmd)
|
||||
slot := c.cmdSlot(cmd, preferredRandomSlot)
|
||||
if preferredRandomSlot == -1 {
|
||||
preferredRandomSlot = slot
|
||||
}
|
||||
node, err := c.slotReadOnlyNode(state, slot)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1357,7 +1361,10 @@ func (c *ClusterClient) mapCmdsByNode(ctx context.Context, cmdsMap *cmdsMap, cmd
|
||||
}
|
||||
|
||||
for _, cmd := range cmds {
|
||||
slot := c.cmdSlot(cmd)
|
||||
slot := c.cmdSlot(cmd, preferredRandomSlot)
|
||||
if preferredRandomSlot == -1 {
|
||||
preferredRandomSlot = slot
|
||||
}
|
||||
node, err := state.slotMasterNode(slot)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1594,8 +1601,12 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
|
||||
|
||||
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
|
||||
cmdsMap := make(map[int][]Cmder)
|
||||
preferredRandomSlot := -1
|
||||
for _, cmd := range cmds {
|
||||
slot := c.cmdSlot(cmd)
|
||||
slot := c.cmdSlot(cmd, preferredRandomSlot)
|
||||
if preferredRandomSlot == -1 {
|
||||
preferredRandomSlot = slot
|
||||
}
|
||||
cmdsMap[slot] = append(cmdsMap[slot], cmd)
|
||||
}
|
||||
return cmdsMap
|
||||
@ -1925,17 +1936,20 @@ func (c *ClusterClient) cmdInfo(ctx context.Context, name string) *CommandInfo {
|
||||
return info
|
||||
}
|
||||
|
||||
func (c *ClusterClient) cmdSlot(cmd Cmder) int {
|
||||
func (c *ClusterClient) cmdSlot(cmd Cmder, preferredRandomSlot int) int {
|
||||
args := cmd.Args()
|
||||
if args[0] == "cluster" && (args[1] == "getkeysinslot" || args[1] == "countkeysinslot") {
|
||||
return args[2].(int)
|
||||
}
|
||||
|
||||
return cmdSlot(cmd, cmdFirstKeyPos(cmd))
|
||||
return cmdSlot(cmd, cmdFirstKeyPos(cmd), preferredRandomSlot)
|
||||
}
|
||||
|
||||
func cmdSlot(cmd Cmder, pos int) int {
|
||||
func cmdSlot(cmd Cmder, pos int, preferredRandomSlot int) int {
|
||||
if pos == 0 {
|
||||
if preferredRandomSlot != -1 {
|
||||
return preferredRandomSlot
|
||||
}
|
||||
return hashtag.RandomSlot()
|
||||
}
|
||||
firstKey := cmd.stringArg(pos)
|
||||
|
Reference in New Issue
Block a user