1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-19 11:43:14 +03:00

chore(txPipeline): refactor slottedCommands impl

This commit is contained in:
Nedyalko Dyakov
2025-06-23 17:41:14 +03:00
parent 50d1484dc4
commit 2bbcdaa32b

View File

@ -1526,102 +1526,78 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
return err return err
} }
cmdsMap := map[int][]Cmder{} keyedCmdsBySlot := c.slottedKeyedCommands(cmds)
slot := -1 slot := -1
// get only the keyed commands switch len(keyedCmdsBySlot) {
keyedCmds := c.keyedCmds(cmds) case 0:
if len(keyedCmds) == 0 {
// no keyed commands try random slot
slot = hashtag.RandomSlot() slot = hashtag.RandomSlot()
} else { case 1:
// keyed commands, get slot from them for sl := range keyedCmdsBySlot {
// if more than one slot, return cross slot error
cmdsBySlot := c.mapCmdsBySlot(keyedCmds)
if len(cmdsBySlot) > 1 {
// cross slot error, we have more than one slot for keyed commands
setCmdsErr(cmds, ErrCrossSlot)
return ErrCrossSlot
}
// get the slot, should be only one
for sl := range cmdsBySlot {
slot = sl slot = sl
break break
} }
} default:
// slot was not determined, try random one // TxPipeline does not support cross slot transaction.
if slot == -1 {
slot = hashtag.RandomSlot()
}
cmdsMap[slot] = cmds
// TxPipeline does not support cross slot transaction.
// double check the commands are in the same slot
if len(cmdsMap) > 1 {
setCmdsErr(cmds, ErrCrossSlot) setCmdsErr(cmds, ErrCrossSlot)
return ErrCrossSlot return ErrCrossSlot
} }
for slot, cmds := range cmdsMap { node, err := state.slotMasterNode(slot)
node, err := state.slotMasterNode(slot) if err != nil {
if err != nil { setCmdsErr(cmds, err)
setCmdsErr(cmds, err) return err
continue }
cmdsMap := map[*clusterNode][]Cmder{node: cmds}
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ {
if attempt > 0 {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
setCmdsErr(cmds, err)
return err
}
} }
cmdsMap := map[*clusterNode][]Cmder{node: cmds} failedCmds := newCmdsMap()
for attempt := 0; attempt <= c.opt.MaxRedirects; attempt++ { var wg sync.WaitGroup
if attempt > 0 {
if err := internal.Sleep(ctx, c.retryBackoff(attempt)); err != nil {
setCmdsErr(cmds, err)
return err
}
}
failedCmds := newCmdsMap() for node, cmds := range cmdsMap {
var wg sync.WaitGroup wg.Add(1)
go func(node *clusterNode, cmds []Cmder) {
for node, cmds := range cmdsMap { defer wg.Done()
wg.Add(1) c.processTxPipelineNode(ctx, node, cmds, failedCmds)
go func(node *clusterNode, cmds []Cmder) { }(node, cmds)
defer wg.Done()
c.processTxPipelineNode(ctx, node, cmds, failedCmds)
}(node, cmds)
}
wg.Wait()
if len(failedCmds.m) == 0 {
break
}
cmdsMap = failedCmds.m
} }
wg.Wait()
if len(failedCmds.m) == 0 {
break
}
cmdsMap = failedCmds.m
} }
return cmdsFirstErr(cmds) return cmdsFirstErr(cmds)
} }
func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { // slottedKeyedCommands returns a map of slot to commands taking into account
cmdsMap := make(map[int][]Cmder) // only commands that have keys.
preferredRandomSlot := -1 func (c *ClusterClient) slottedKeyedCommands(cmds []Cmder) map[int][]Cmder {
for _, cmd := range cmds { cmdsSlots := map[int][]Cmder{}
slot := c.cmdSlot(cmd, preferredRandomSlot)
if preferredRandomSlot == -1 {
preferredRandomSlot = slot
}
cmdsMap[slot] = append(cmdsMap[slot], cmd)
}
return cmdsMap
}
// keyedCmds returns all the keyed commands from the cmds slice prefferedRandomSlot := -1
// it determines keyed commands by checking if the command has a first key position
func (c *ClusterClient) keyedCmds(cmds []Cmder) []Cmder {
keyedCmds := make([]Cmder, 0, len(cmds))
for _, cmd := range cmds { for _, cmd := range cmds {
if cmdFirstKeyPos(cmd) != 0 { if cmdFirstKeyPos(cmd) == 0 {
keyedCmds = append(keyedCmds, cmd) continue
} }
slot := c.cmdSlot(cmd, prefferedRandomSlot)
if prefferedRandomSlot == -1 {
prefferedRandomSlot = slot
}
cmdsSlots[slot] = append(cmdsSlots[slot], cmd)
} }
return keyedCmds
return cmdsSlots
} }
func (c *ClusterClient) processTxPipelineNode( func (c *ClusterClient) processTxPipelineNode(