1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-29 17:41:15 +03:00

Merge branch 'v8'

This commit is contained in:
Vladimir Mihailenco
2020-05-21 10:16:44 +03:00
47 changed files with 3373 additions and 3122 deletions

112
pubsub.go
View File

@ -8,9 +8,9 @@ import (
"sync"
"time"
"github.com/go-redis/redis/v7/internal"
"github.com/go-redis/redis/v7/internal/pool"
"github.com/go-redis/redis/v7/internal/proto"
"github.com/go-redis/redis/v8/internal"
"github.com/go-redis/redis/v8/internal/pool"
"github.com/go-redis/redis/v8/internal/proto"
)
const pingTimeout = 30 * time.Second
@ -26,7 +26,7 @@ var errPingTimeout = errors.New("redis: ping timeout")
type PubSub struct {
opt *Options
newConn func([]string) (*pool.Conn, error)
newConn func(ctx context.Context, channels []string) (*pool.Conn, error)
closeConn func(*pool.Conn) error
mu sync.Mutex
@ -55,14 +55,14 @@ func (c *PubSub) init() {
c.exit = make(chan struct{})
}
func (c *PubSub) connWithLock() (*pool.Conn, error) {
func (c *PubSub) connWithLock(ctx context.Context) (*pool.Conn, error) {
c.mu.Lock()
cn, err := c.conn(nil)
cn, err := c.conn(ctx, nil)
c.mu.Unlock()
return cn, err
}
func (c *PubSub) conn(newChannels []string) (*pool.Conn, error) {
func (c *PubSub) conn(ctx context.Context, newChannels []string) (*pool.Conn, error) {
if c.closed {
return nil, pool.ErrClosed
}
@ -73,12 +73,12 @@ func (c *PubSub) conn(newChannels []string) (*pool.Conn, error) {
channels := mapKeys(c.channels)
channels = append(channels, newChannels...)
cn, err := c.newConn(channels)
cn, err := c.newConn(ctx, channels)
if err != nil {
return nil, err
}
if err := c.resubscribe(cn); err != nil {
if err := c.resubscribe(ctx, cn); err != nil {
_ = c.closeConn(cn)
return nil, err
}
@ -93,15 +93,15 @@ func (c *PubSub) writeCmd(ctx context.Context, cn *pool.Conn, cmd Cmder) error {
})
}
func (c *PubSub) resubscribe(cn *pool.Conn) error {
func (c *PubSub) resubscribe(ctx context.Context, cn *pool.Conn) error {
var firstErr error
if len(c.channels) > 0 {
firstErr = c._subscribe(cn, "subscribe", mapKeys(c.channels))
firstErr = c._subscribe(ctx, cn, "subscribe", mapKeys(c.channels))
}
if len(c.patterns) > 0 {
err := c._subscribe(cn, "psubscribe", mapKeys(c.patterns))
err := c._subscribe(ctx, cn, "psubscribe", mapKeys(c.patterns))
if err != nil && firstErr == nil {
firstErr = err
}
@ -121,35 +121,40 @@ func mapKeys(m map[string]struct{}) []string {
}
func (c *PubSub) _subscribe(
cn *pool.Conn, redisCmd string, channels []string,
ctx context.Context, cn *pool.Conn, redisCmd string, channels []string,
) error {
args := make([]interface{}, 0, 1+len(channels))
args = append(args, redisCmd)
for _, channel := range channels {
args = append(args, channel)
}
cmd := NewSliceCmd(args...)
return c.writeCmd(context.TODO(), cn, cmd)
cmd := NewSliceCmd(ctx, args...)
return c.writeCmd(ctx, cn, cmd)
}
func (c *PubSub) releaseConnWithLock(cn *pool.Conn, err error, allowTimeout bool) {
func (c *PubSub) releaseConnWithLock(
ctx context.Context,
cn *pool.Conn,
err error,
allowTimeout bool,
) {
c.mu.Lock()
c.releaseConn(cn, err, allowTimeout)
c.releaseConn(ctx, cn, err, allowTimeout)
c.mu.Unlock()
}
func (c *PubSub) releaseConn(cn *pool.Conn, err error, allowTimeout bool) {
func (c *PubSub) releaseConn(ctx context.Context, cn *pool.Conn, err error, allowTimeout bool) {
if c.cn != cn {
return
}
if isBadConn(err, allowTimeout) {
c.reconnect(err)
c.reconnect(ctx, err)
}
}
func (c *PubSub) reconnect(reason error) {
func (c *PubSub) reconnect(ctx context.Context, reason error) {
_ = c.closeTheCn(reason)
_, _ = c.conn(nil)
_, _ = c.conn(ctx, nil)
}
func (c *PubSub) closeTheCn(reason error) error {
@ -179,11 +184,11 @@ func (c *PubSub) Close() error {
// Subscribe the client to the specified channels. It returns
// empty subscription if there are no channels.
func (c *PubSub) Subscribe(channels ...string) error {
func (c *PubSub) Subscribe(ctx context.Context, channels ...string) error {
c.mu.Lock()
defer c.mu.Unlock()
err := c.subscribe("subscribe", channels...)
err := c.subscribe(ctx, "subscribe", channels...)
if c.channels == nil {
c.channels = make(map[string]struct{})
}
@ -195,11 +200,11 @@ func (c *PubSub) Subscribe(channels ...string) error {
// PSubscribe the client to the given patterns. It returns
// empty subscription if there are no patterns.
func (c *PubSub) PSubscribe(patterns ...string) error {
func (c *PubSub) PSubscribe(ctx context.Context, patterns ...string) error {
c.mu.Lock()
defer c.mu.Unlock()
err := c.subscribe("psubscribe", patterns...)
err := c.subscribe(ctx, "psubscribe", patterns...)
if c.patterns == nil {
c.patterns = make(map[string]struct{})
}
@ -211,55 +216,55 @@ func (c *PubSub) PSubscribe(patterns ...string) error {
// Unsubscribe the client from the given channels, or from all of
// them if none is given.
func (c *PubSub) Unsubscribe(channels ...string) error {
func (c *PubSub) Unsubscribe(ctx context.Context, channels ...string) error {
c.mu.Lock()
defer c.mu.Unlock()
for _, channel := range channels {
delete(c.channels, channel)
}
err := c.subscribe("unsubscribe", channels...)
err := c.subscribe(ctx, "unsubscribe", channels...)
return err
}
// PUnsubscribe the client from the given patterns, or from all of
// them if none is given.
func (c *PubSub) PUnsubscribe(patterns ...string) error {
func (c *PubSub) PUnsubscribe(ctx context.Context, patterns ...string) error {
c.mu.Lock()
defer c.mu.Unlock()
for _, pattern := range patterns {
delete(c.patterns, pattern)
}
err := c.subscribe("punsubscribe", patterns...)
err := c.subscribe(ctx, "punsubscribe", patterns...)
return err
}
func (c *PubSub) subscribe(redisCmd string, channels ...string) error {
cn, err := c.conn(channels)
func (c *PubSub) subscribe(ctx context.Context, redisCmd string, channels ...string) error {
cn, err := c.conn(ctx, channels)
if err != nil {
return err
}
err = c._subscribe(cn, redisCmd, channels)
c.releaseConn(cn, err, false)
err = c._subscribe(ctx, cn, redisCmd, channels)
c.releaseConn(ctx, cn, err, false)
return err
}
func (c *PubSub) Ping(payload ...string) error {
func (c *PubSub) Ping(ctx context.Context, payload ...string) error {
args := []interface{}{"ping"}
if len(payload) == 1 {
args = append(args, payload[0])
}
cmd := NewCmd(args...)
cmd := NewCmd(ctx, args...)
cn, err := c.connWithLock()
cn, err := c.connWithLock(ctx)
if err != nil {
return err
}
err = c.writeCmd(context.TODO(), cn, cmd)
c.releaseConnWithLock(cn, err, false)
err = c.writeCmd(ctx, cn, cmd)
c.releaseConnWithLock(ctx, cn, err, false)
return err
}
@ -342,21 +347,21 @@ func (c *PubSub) newMessage(reply interface{}) (interface{}, error) {
// ReceiveTimeout acts like Receive but returns an error if message
// is not received in time. This is low-level API and in most cases
// Channel should be used instead.
func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
func (c *PubSub) ReceiveTimeout(ctx context.Context, timeout time.Duration) (interface{}, error) {
if c.cmd == nil {
c.cmd = NewCmd()
c.cmd = NewCmd(ctx)
}
cn, err := c.connWithLock()
cn, err := c.connWithLock(ctx)
if err != nil {
return nil, err
}
err = cn.WithReader(context.TODO(), timeout, func(rd *proto.Reader) error {
err = cn.WithReader(ctx, timeout, func(rd *proto.Reader) error {
return c.cmd.readReply(rd)
})
c.releaseConnWithLock(cn, err, timeout > 0)
c.releaseConnWithLock(ctx, cn, err, timeout > 0)
if err != nil {
return nil, err
}
@ -367,16 +372,16 @@ func (c *PubSub) ReceiveTimeout(timeout time.Duration) (interface{}, error) {
// Receive returns a message as a Subscription, Message, Pong or error.
// See PubSub example for details. This is low-level API and in most cases
// Channel should be used instead.
func (c *PubSub) Receive() (interface{}, error) {
return c.ReceiveTimeout(0)
func (c *PubSub) Receive(ctx context.Context) (interface{}, error) {
return c.ReceiveTimeout(ctx, 0)
}
// ReceiveMessage returns a Message or error ignoring Subscription and Pong
// messages. This is low-level API and in most cases Channel should be used
// instead.
func (c *PubSub) ReceiveMessage() (*Message, error) {
func (c *PubSub) ReceiveMessage(ctx context.Context) (*Message, error) {
for {
msg, err := c.Receive()
msg, err := c.Receive(ctx)
if err != nil {
return nil, err
}
@ -429,7 +434,7 @@ func (c *PubSub) ChannelSize(size int) <-chan *Message {
// reconnections.
//
// ChannelWithSubscriptions can not be used together with Channel or ChannelSize.
func (c *PubSub) ChannelWithSubscriptions(size int) <-chan interface{} {
func (c *PubSub) ChannelWithSubscriptions(ctx context.Context, size int) <-chan interface{} {
c.chOnce.Do(func() {
c.initPing()
c.initAllChan(size)
@ -446,6 +451,7 @@ func (c *PubSub) ChannelWithSubscriptions(size int) <-chan interface{} {
}
func (c *PubSub) initPing() {
ctx := context.TODO()
c.ping = make(chan struct{}, 1)
go func() {
timer := time.NewTimer(pingTimeout)
@ -461,7 +467,7 @@ func (c *PubSub) initPing() {
<-timer.C
}
case <-timer.C:
pingErr := c.Ping()
pingErr := c.Ping(ctx)
if healthy {
healthy = false
} else {
@ -469,7 +475,7 @@ func (c *PubSub) initPing() {
pingErr = errPingTimeout
}
c.mu.Lock()
c.reconnect(pingErr)
c.reconnect(ctx, pingErr)
healthy = true
c.mu.Unlock()
}
@ -482,6 +488,7 @@ func (c *PubSub) initPing() {
// initMsgChan must be in sync with initAllChan.
func (c *PubSub) initMsgChan(size int) {
ctx := context.TODO()
c.msgCh = make(chan *Message, size)
go func() {
timer := time.NewTimer(pingTimeout)
@ -489,7 +496,7 @@ func (c *PubSub) initMsgChan(size int) {
var errCount int
for {
msg, err := c.Receive()
msg, err := c.Receive(ctx)
if err != nil {
if err == pool.ErrClosed {
close(c.msgCh)
@ -535,6 +542,7 @@ func (c *PubSub) initMsgChan(size int) {
// initAllChan must be in sync with initMsgChan.
func (c *PubSub) initAllChan(size int) {
ctx := context.TODO()
c.allCh = make(chan interface{}, size)
go func() {
timer := time.NewTimer(pingTimeout)
@ -542,7 +550,7 @@ func (c *PubSub) initAllChan(size int) {
var errCount int
for {
msg, err := c.Receive()
msg, err := c.Receive(ctx)
if err != nil {
if err == pool.ErrClosed {
close(c.allCh)