diff --git a/osscluster.go b/osscluster.go index 0dce50a4..b56edfd7 100644 --- a/osscluster.go +++ b/osscluster.go @@ -1507,8 +1507,36 @@ func (c *ClusterClient) processTxPipeline(ctx context.Context, cmds []Cmder) err return err } - cmdsMap := c.mapCmdsBySlot(cmds) + cmdsMap := map[int][]Cmder{} + slot := -1 + // split keyed and keyless commands + keyedCmds, _ := c.keyedAndKeyessCmds(cmds) + if len(keyedCmds) == 0 { + // no keyed commands try random slot + slot = hashtag.RandomSlot() + } else { + // keyed commands, get slot from them + // if more than one slot, return cross slot error + cmdsBySlot := c.mapCmdsBySlot(keyedCmds) + if len(cmdsBySlot) > 1 { + // cross slot error, we have more than one slot for keyed commands + setCmdsErr(cmds, ErrCrossSlot) + return ErrCrossSlot + } + // get the slot, should be only one + for sl := range cmdsBySlot { + slot = sl + break + } + } + // slot was not determined, try random one + if slot == -1 { + slot = hashtag.RandomSlot() + } + cmdsMap[slot] = cmds + // TxPipeline does not support cross slot transaction. + // double check the commands are in the same slot if len(cmdsMap) > 1 { setCmdsErr(cmds, ErrCrossSlot) return ErrCrossSlot @@ -1560,6 +1588,18 @@ func (c *ClusterClient) mapCmdsBySlot(cmds []Cmder) map[int][]Cmder { } return cmdsMap } +func (c *ClusterClient) keyedAndKeyessCmds(cmds []Cmder) ([]Cmder, []Cmder) { + keyedCmds := make([]Cmder, 0, len(cmds)) + keylessCmds := make([]Cmder, 0, len(cmds)) + for _, cmd := range cmds { + if cmdFirstKeyPos(cmd) == 0 { + keylessCmds = append(keylessCmds, cmd) + } else { + keyedCmds = append(keyedCmds, cmd) + } + } + return keyedCmds, keylessCmds +} func (c *ClusterClient) processTxPipelineNode( ctx context.Context, node *clusterNode, cmds []Cmder, failedCmds *cmdsMap,