1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-22 10:01:50 +03:00

Add PubSub support to Cluster client

This commit is contained in:
Vladimir Mihailenco
2017-07-09 10:07:20 +03:00
parent 564772f045
commit 6060f097e1
7 changed files with 138 additions and 50 deletions

View File

@ -327,7 +327,7 @@ func (c *clusterState) slotClosestNode(slot int) (*clusterNode, error) {
}
func (c *clusterState) slotNodes(slot int) []*clusterNode {
if slot < len(c.slots) {
if slot >= 0 && slot < len(c.slots) {
return c.slots[slot]
}
return nil
@ -720,14 +720,14 @@ func (c *ClusterClient) pipelineExec(cmds []Cmder) error {
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap {
cn, _, err := node.Client.conn()
cn, _, err := node.Client.getConn()
if err != nil {
setCmdsErr(cmds, err)
continue
}
err = c.pipelineProcessCmds(cn, cmds, failedCmds)
node.Client.putConn(cn, err)
node.Client.releaseConn(cn, err)
}
if len(failedCmds) == 0 {
@ -855,14 +855,14 @@ func (c *ClusterClient) txPipelineExec(cmds []Cmder) error {
failedCmds := make(map[*clusterNode][]Cmder)
for node, cmds := range cmdsMap {
cn, _, err := node.Client.conn()
cn, _, err := node.Client.getConn()
if err != nil {
setCmdsErr(cmds, err)
continue
}
err = c.txPipelineProcessCmds(node, cn, cmds, failedCmds)
node.Client.putConn(cn, err)
node.Client.releaseConn(cn, err)
}
if len(failedCmds) == 0 {
@ -966,6 +966,56 @@ func (c *ClusterClient) txPipelineReadQueued(
return firstErr
}
func (c *ClusterClient) pubSub(channels []string) *PubSub {
opt := c.opt.clientOptions()
var node *clusterNode
return &PubSub{
opt: opt,
newConn: func(channels []string) (*pool.Conn, error) {
if node == nil {
var slot int
if len(channels) > 0 {
slot = hashtag.Slot(channels[0])
} else {
slot = -1
}
masterNode, err := c.state().slotMasterNode(slot)
if err != nil {
return nil, err
}
node = masterNode
}
return node.Client.newConn()
},
closeConn: func(cn *pool.Conn) error {
return node.Client.connPool.CloseConn(cn)
},
}
}
// Subscribe subscribes the client to the specified channels.
// Channels can be omitted to create empty subscription.
func (c *ClusterClient) Subscribe(channels ...string) *PubSub {
pubsub := c.pubSub(channels)
if len(channels) > 0 {
_ = pubsub.Subscribe(channels...)
}
return pubsub
}
// PSubscribe subscribes the client to the given patterns.
// Patterns can be omitted to create empty subscription.
func (c *ClusterClient) PSubscribe(channels ...string) *PubSub {
pubsub := c.pubSub(channels)
if len(channels) > 0 {
_ = pubsub.PSubscribe(channels...)
}
return pubsub
}
func isLoopbackAddr(addr string) bool {
host, _, err := net.SplitHostPort(addr)
if err != nil {