From 2bbcdaa32b89e098ea1274c1ba9663b0a42bb424 Mon Sep 17 00:00:00 2001 From: Nedyalko Dyakov Date: Mon, 23 Jun 2025 17:41:14 +0300 Subject: [PATCH] chore(txPipeline): refactor slottedCommands impl --- osscluster.go | 124 ++++++++++++++++++++------------------------------ 1 file changed, 50 insertions(+), 74 deletions(-) diff --git a/osscluster.go b/osscluster.go index d92a26b5..d68eff4f 100644 --- a/osscluster.go +++ b/osscluster.go @@ -1526,102 +1526,78 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err return err } - cmdsMap := map[int][]Cmder{} + keyedCmdsBySlot := c.slottedKeyedCommands(cmds) slot := -1 - // get only the keyed commands - keyedCmds := c.keyedCmds(cmds) - if len(keyedCmds) == 0 { - // no keyed commands try random slot + switch len(keyedCmdsBySlot) { + case 0: slot = hashtag.RandomSlot() - } else { - // keyed commands, get slot from them - // if more than one slot, return cross slot error - cmdsBySlot := c.mapCmdsBySlot(keyedCmds) - if len(cmdsBySlot) > 1 { - // cross slot error, we have more than one slot for keyed commands - setCmdsErr(cmds, ErrCrossSlot) - return ErrCrossSlot - } - // get the slot, should be only one - for sl := range cmdsBySlot { + case 1: + for sl := range keyedCmdsBySlot { slot = sl break } - } - // slot was not determined, try random one - if slot == -1 { - slot = hashtag.RandomSlot() - } - cmdsMap[slot] = cmds - - // TxPipeline does not support cross slot transaction. - // double check the commands are in the same slot - if len(cmdsMap) > 1 { + default: + // TxPipeline does not support cross slot transaction. setCmdsErr(cmds, ErrCrossSlot) return ErrCrossSlot } - for slot, cmds := range cmdsMap { - node, err := state.slotMasterNode(slot) - if err != nil { - setCmdsErr(cmds, err) - continue + node, err := state.slotMasterNode(slot) + if err != nil { + setCmdsErr(cmds, err) + return err + } + + cmdsMap := map[*clusterNode][]Cmder{node: cmds} + for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { + if attempt > 0 { + if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { + setCmdsErr(cmds, err) + return err + } } - cmdsMap := map[*clusterNode][]Cmder{node: cmds} - for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { - if attempt > 0 { - if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil { - setCmdsErr(cmds, err) - return err - } - } + failedCmds := newCmdsMap() + var wg sync.WaitGroup - failedCmds := newCmdsMap() - var wg sync.WaitGroup - - for node, cmds := range cmdsMap { - wg.Add(1) - go func(node *clusterNode, cmds []Cmder) { - defer wg.Done() - c.processTxPipelineNode(ctx, node, cmds, failedCmds) - }(node, cmds) - } - - wg.Wait() - if len(failedCmds.m) == 0 { - break - } - cmdsMap = failedCmds.m + for node, cmds := range cmdsMap { + wg.Add(1) + go func(node *clusterNode, cmds []Cmder) { + defer wg.Done() + c.processTxPipelineNode(ctx, node, cmds, failedCmds) + }(node, cmds) } + + wg.Wait() + if len(failedCmds.m) == 0 { + break + } + cmdsMap = failedCmds.m } return cmdsFirstErr(cmds) } -func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { - cmdsMap := make(map[int][]Cmder) - preferredRandomSlot := -1 - for _, cmd := range cmds { - slot := c.cmdSlot(cmd, preferredRandomSlot) - if preferredRandomSlot == -1 { - preferredRandomSlot = slot - } - cmdsMap[slot] = append(cmdsMap[slot], cmd) - } - return cmdsMap -} +// slottedKeyedCommands returns a map of slot to commands taking into account +// only commands that have keys. +func (c *ClusterClient) slottedKeyedCommands(cmds []Cmder) map[int][]Cmder { + cmdsSlots := map[int][]Cmder{} -// keyedCmds returns all the keyed commands from the cmds slice -// it determines keyed commands by checking if the command has a first key position -func (c *ClusterClient) keyedCmds(cmds []Cmder) []Cmder { - keyedCmds := make([]Cmder, 0, len(cmds)) + prefferedRandomSlot := -1 for _, cmd := range cmds { - if cmdFirstKeyPos(cmd) != 0 { - keyedCmds = append(keyedCmds, cmd) + if cmdFirstKeyPos(cmd) == 0 { + continue } + + slot := c.cmdSlot(cmd, prefferedRandomSlot) + if prefferedRandomSlot == -1 { + prefferedRandomSlot = slot + } + + cmdsSlots[slot] = append(cmdsSlots[slot], cmd) } - return keyedCmds + + return cmdsSlots } func (c *ClusterClient) processTxPipelineNode(