From a9e329d3bcd1e9b0336b40d7c72413c0d0880ad0 Mon Sep 17 00:00:00 2001 From: zhanyr Date: Tue, 11 Sep 2018 17:37:57 +0800 Subject: [PATCH] execute commans concurrently on each cluster node in method `defaultProcessPipeline` (#861) Execute commands concurrently on each cluster node --- cluster.go | 53 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 15 deletions(-) diff --git a/cluster.go b/cluster.go index 6b05f1a5..8cbd447c 100644 --- a/cluster.go +++ b/cluster.go @@ -1254,24 +1254,47 @@ func (c *ClusterClient) defaultProcessPipeline(cmds []Cmder) error { failedCmds := make(map[*clusterNode][]Cmder) + var wg sync.WaitGroup + var lock sync.RWMutex for node, cmds := range cmdsMap { - cn, err := node.Client.getConn() - if err != nil { - if err == pool.ErrClosed { - c.remapCmds(cmds, failedCmds) - } else { - setCmdsErr(cmds, err) - } - continue - } + wg.Add(1) + go func(node *clusterNode, cmds []Cmder) { + defer wg.Done() - err = c.pipelineProcessCmds(node, cn, cmds, failedCmds) - if err == nil || internal.IsRedisError(err) { - node.Client.connPool.Put(cn) - } else { - node.Client.connPool.Remove(cn) - } + failedCmdsTmp := make(map[*clusterNode][]Cmder) + + cn, err := node.Client.getConn() + if err != nil { + if err == pool.ErrClosed { + c.remapCmds(cmds, failedCmdsTmp) + } else { + setCmdsErr(cmds, err) + } + + } else { + err = c.pipelineProcessCmds(node, cn, cmds, failedCmdsTmp) + if err == nil || internal.IsRedisError(err) { + node.Client.connPool.Put(cn) + } else { + node.Client.connPool.Remove(cn) + } + } + + if len(failedCmdsTmp) > 0 { + for node, cs := range failedCmdsTmp { + lock.Lock() + if _, ok := failedCmds[node]; ok { + failedCmds[node] = append(failedCmds[node], cs...) + } else { + failedCmds[node] = cs + } + lock.Unlock() + } + } + + }(node, cmds) } + wg.Wait() if len(failedCmds) == 0 { break