mirror of
https://github.com/redis/go-redis.git
synced 2025-07-18 00:20:57 +03:00
fix(txpipeline): keyless commands should take the slot of the keyed commands
This commit is contained in:
@ -1507,8 +1507,36 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
cmdsMap := c.mapCmdsBySlot(cmds)
|
cmdsMap := map[int][]Cmder{}
|
||||||
|
slot := -1
|
||||||
|
// split keyed and keyless commands
|
||||||
|
keyedCmds, _ := c.keyedAndKeyessCmds(cmds)
|
||||||
|
if len(keyedCmds) == 0 {
|
||||||
|
// no keyed commands try random slot
|
||||||
|
slot = hashtag.RandomSlot()
|
||||||
|
} else {
|
||||||
|
// keyed commands, get slot from them
|
||||||
|
// if more than one slot, return cross slot error
|
||||||
|
cmdsBySlot := c.mapCmdsBySlot(keyedCmds)
|
||||||
|
if len(cmdsBySlot) > 1 {
|
||||||
|
// cross slot error, we have more than one slot for keyed commands
|
||||||
|
setCmdsErr(cmds, ErrCrossSlot)
|
||||||
|
return ErrCrossSlot
|
||||||
|
}
|
||||||
|
// get the slot, should be only one
|
||||||
|
for sl := range cmdsBySlot {
|
||||||
|
slot = sl
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// slot was not determined, try random one
|
||||||
|
if slot == -1 {
|
||||||
|
slot = hashtag.RandomSlot()
|
||||||
|
}
|
||||||
|
cmdsMap[slot] = cmds
|
||||||
|
|
||||||
// TxPipeline does not support cross slot transaction.
|
// TxPipeline does not support cross slot transaction.
|
||||||
|
// double check the commands are in the same slot
|
||||||
if len(cmdsMap) > 1 {
|
if len(cmdsMap) > 1 {
|
||||||
setCmdsErr(cmds, ErrCrossSlot)
|
setCmdsErr(cmds, ErrCrossSlot)
|
||||||
return ErrCrossSlot
|
return ErrCrossSlot
|
||||||
@ -1560,6 +1588,18 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder {
|
|||||||
}
|
}
|
||||||
return cmdsMap
|
return cmdsMap
|
||||||
}
|
}
|
||||||
|
func (c *ClusterClient) keyedAndKeyessCmds(cmds []Cmder) ([]Cmder, []Cmder) {
|
||||||
|
keyedCmds := make([]Cmder, 0, len(cmds))
|
||||||
|
keylessCmds := make([]Cmder, 0, len(cmds))
|
||||||
|
for _, cmd := range cmds {
|
||||||
|
if cmdFirstKeyPos(cmd) == 0 {
|
||||||
|
keylessCmds = append(keylessCmds, cmd)
|
||||||
|
} else {
|
||||||
|
keyedCmds = append(keyedCmds, cmd)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return keyedCmds, keylessCmds
|
||||||
|
}
|
||||||
|
|
||||||
func (c *ClusterClient) processTxPipelineNode(
|
func (c *ClusterClient) processTxPipelineNode(
|
||||||
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
|
ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,
|
||||||
|
Reference in New Issue
Block a user