mirror of
https://github.com/redis/go-redis.git
synced 2025-07-28 06:42:00 +03:00
Merge branch 'v4' of github.com:go-redis/redis into v4
This commit is contained in:
86
cluster.go
86
cluster.go
@ -316,6 +316,92 @@ func (c *ClusterClient) reaper(frequency time.Duration) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ClusterClient) Pipeline() *Pipeline {
|
||||
pipe := &Pipeline{
|
||||
exec: c.pipelineExec,
|
||||
}
|
||||
pipe.commandable.process = pipe.process
|
||||
return pipe
|
||||
}
|
||||
|
||||
func (c *ClusterClient) Pipelined(fn func(*Pipeline) error) ([]Cmder, error) {
|
||||
return c.Pipeline().pipelined(fn)
|
||||
}
|
||||
|
||||
func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
|
||||
var retErr error
|
||||
|
||||
cmdsMap := make(map[string][]Cmder)
|
||||
for _, cmd := range cmds {
|
||||
slot := hashtag.Slot(cmd.clusterKey())
|
||||
addr := c.slotMasterAddr(slot)
|
||||
cmdsMap[addr] = append(cmdsMap[addr], cmd)
|
||||
}
|
||||
|
||||
for attempt := 0; attempt <= c.opt.getMaxRedirects(); attempt++ {
|
||||
failedCmds := make(map[string][]Cmder)
|
||||
|
||||
for addr, cmds := range cmdsMap {
|
||||
client, err := c.getClient(addr)
|
||||
if err != nil {
|
||||
setCmdsErr(cmds, err)
|
||||
retErr = err
|
||||
continue
|
||||
}
|
||||
|
||||
cn, err := client.conn()
|
||||
if err != nil {
|
||||
setCmdsErr(cmds, err)
|
||||
retErr = err
|
||||
continue
|
||||
}
|
||||
|
||||
failedCmds, err = c.execClusterCmds(cn, cmds, failedCmds)
|
||||
if err != nil {
|
||||
retErr = err
|
||||
}
|
||||
client.putConn(cn, err, false)
|
||||
}
|
||||
|
||||
cmdsMap = failedCmds
|
||||
}
|
||||
|
||||
return retErr
|
||||
}
|
||||
|
||||
func (c *ClusterClient) execClusterCmds(
|
||||
cn *pool.Conn, cmds []Cmder, failedCmds map[string][]Cmder,
|
||||
) (map[string][]Cmder, error) {
|
||||
if err := writeCmd(cn, cmds...); err != nil {
|
||||
setCmdsErr(cmds, err)
|
||||
return failedCmds, err
|
||||
}
|
||||
|
||||
var firstCmdErr error
|
||||
for i, cmd := range cmds {
|
||||
err := cmd.readReply(cn)
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
if isNetworkError(err) {
|
||||
cmd.reset()
|
||||
failedCmds[""] = append(failedCmds[""], cmds[i:]...)
|
||||
break
|
||||
} else if moved, ask, addr := isMovedError(err); moved {
|
||||
c.lazyReloadSlots()
|
||||
cmd.reset()
|
||||
failedCmds[addr] = append(failedCmds[addr], cmd)
|
||||
} else if ask {
|
||||
cmd.reset()
|
||||
failedCmds[addr] = append(failedCmds[addr], NewCmd("ASKING"), cmd)
|
||||
} else if firstCmdErr == nil {
|
||||
firstCmdErr = err
|
||||
}
|
||||
}
|
||||
|
||||
return failedCmds, firstCmdErr
|
||||
}
|
||||
|
||||
//------------------------------------------------------------------------------
|
||||
|
||||
// ClusterOptions are used to configure a cluster client and should be
|
||||
|
Reference in New Issue
Block a user