1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-29 17:41:15 +03:00

Resent client pool when sentinel switches master

This commit is contained in:
Vladimir Mihailenco
2017-06-29 16:53:49 +03:00
parent 75ceb983b7
commit fbc8000fd1
6 changed files with 152 additions and 159 deletions

107
pubsub.go
View File

@ -19,13 +19,11 @@ import (
type PubSub struct {
base baseClient
mu sync.Mutex
cn *pool.Conn
closed bool
subMu sync.Mutex
mu sync.Mutex
cn *pool.Conn
channels []string
patterns []string
closed bool
cmd *Cmd
}
@ -64,9 +62,6 @@ func (c *PubSub) conn() (*pool.Conn, bool, error) {
}
func (c *PubSub) resubscribe(cn *pool.Conn) error {
c.subMu.Lock()
defer c.subMu.Unlock()
var firstErr error
if len(c.channels) > 0 {
if err := c._subscribe(cn, "subscribe", c.channels...); err != nil && firstErr == nil {
@ -81,6 +76,18 @@ func (c *PubSub) resubscribe(cn *pool.Conn) error {
return firstErr
}
func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error {
args := make([]interface{}, 1+len(channels))
args[0] = redisCmd
for i, channel := range channels {
args[1+i] = channel
}
cmd := NewSliceCmd(args...)
cn.SetWriteTimeout(c.base.opt.WriteTimeout)
return writeCmd(cn, cmd)
}
func (c *PubSub) putConn(cn *pool.Conn, err error) {
if !internal.IsBadConn(err, true) {
return
@ -114,6 +121,42 @@ func (c *PubSub) Close() error {
return nil
}
// Subscribes the client to the specified channels. It returns
// empty subscription if there are no channels.
func (c *PubSub) Subscribe(channels ...string) error {
c.mu.Lock()
c.channels = appendIfNotExists(c.channels, channels...)
c.mu.Unlock()
return c.subscribe("subscribe", channels...)
}
// Subscribes the client to the given patterns. It returns
// empty subscription if there are no patterns.
func (c *PubSub) PSubscribe(patterns ...string) error {
c.mu.Lock()
c.patterns = appendIfNotExists(c.patterns, patterns...)
c.mu.Unlock()
return c.subscribe("psubscribe", patterns...)
}
// Unsubscribes the client from the given channels, or from all of
// them if none is given.
func (c *PubSub) Unsubscribe(channels ...string) error {
c.mu.Lock()
c.channels = remove(c.channels, channels...)
c.mu.Unlock()
return c.subscribe("unsubscribe", channels...)
}
// Unsubscribes the client from the given patterns, or from all of
// them if none is given.
func (c *PubSub) PUnsubscribe(patterns ...string) error {
c.mu.Lock()
c.patterns = remove(c.patterns, patterns...)
c.mu.Unlock()
return c.subscribe("punsubscribe", patterns...)
}
func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
cn, isNew, err := c.conn()
if err != nil {
@ -129,54 +172,6 @@ func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
return err
}
func (c *PubSub) _subscribe(cn *pool.Conn, redisCmd string, channels ...string) error {
args := make([]interface{}, 1+len(channels))
args[0] = redisCmd
for i, channel := range channels {
args[1+i] = channel
}
cmd := NewSliceCmd(args...)
cn.SetWriteTimeout(c.base.opt.WriteTimeout)
return writeCmd(cn, cmd)
}
// Subscribes the client to the specified channels. It returns
// empty subscription if there are no channels.
func (c *PubSub) Subscribe(channels ...string) error {
c.subMu.Lock()
c.channels = appendIfNotExists(c.channels, channels...)
c.subMu.Unlock()
return c.subscribe("subscribe", channels...)
}
// Subscribes the client to the given patterns. It returns
// empty subscription if there are no patterns.
func (c *PubSub) PSubscribe(patterns ...string) error {
c.subMu.Lock()
c.patterns = appendIfNotExists(c.patterns, patterns...)
c.subMu.Unlock()
return c.subscribe("psubscribe", patterns...)
}
// Unsubscribes the client from the given channels, or from all of
// them if none is given.
func (c *PubSub) Unsubscribe(channels ...string) error {
c.subMu.Lock()
c.channels = remove(c.channels, channels...)
c.subMu.Unlock()
return c.subscribe("unsubscribe", channels...)
}
// Unsubscribes the client from the given patterns, or from all of
// them if none is given.
func (c *PubSub) PUnsubscribe(patterns ...string) error {
c.subMu.Lock()
c.patterns = remove(c.patterns, patterns...)
c.subMu.Unlock()
return c.subscribe("punsubscribe", patterns...)
}
func (c *PubSub) Ping(payload ...string) error {
args := []interface{}{"ping"}
if len(payload) == 1 {