1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-29 17:41:15 +03:00

Add proper SingleConnPool implementation

This commit is contained in:
Vladimir Mihailenco
2019-08-03 17:21:12 +03:00
parent d6a99e7be3
commit 4e9cea8876
7 changed files with 212 additions and 69 deletions

View File

@ -1,98 +0,0 @@
package internal
import (
"context"
"errors"
"io"
"net"
"strings"
"github.com/go-redis/redis/internal/proto"
)
var ErrSingleConnPoolClosed = errors.New("redis: SingleConnPool is closed")
func IsRetryableError(err error, retryTimeout bool) bool {
switch err {
case nil, context.Canceled, context.DeadlineExceeded:
return false
case io.EOF:
return true
}
if netErr, ok := err.(net.Error); ok {
if netErr.Timeout() {
return retryTimeout
}
return true
}
if err == ErrSingleConnPoolClosed {
return true
}
s := err.Error()
if s == "ERR max number of clients reached" {
return true
}
if strings.HasPrefix(s, "LOADING ") {
return true
}
if strings.HasPrefix(s, "READONLY ") {
return true
}
if strings.HasPrefix(s, "CLUSTERDOWN ") {
return true
}
return false
}
func IsRedisError(err error) bool {
_, ok := err.(proto.RedisError)
return ok
}
func IsBadConn(err error, allowTimeout bool) bool {
if err == nil {
return false
}
if IsRedisError(err) {
// #790
return IsReadOnlyError(err)
}
if allowTimeout {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
return false
}
}
return true
}
func IsMovedError(err error) (moved bool, ask bool, addr string) {
if !IsRedisError(err) {
return
}
s := err.Error()
switch {
case strings.HasPrefix(s, "MOVED "):
moved = true
case strings.HasPrefix(s, "ASK "):
ask = true
default:
return
}
ind := strings.LastIndex(s, " ")
if ind == -1 {
return false, false, ""
}
addr = s[ind+1:]
return
}
func IsLoadingError(err error) bool {
return strings.HasPrefix(err.Error(), "LOADING ")
}
func IsReadOnlyError(err error) bool {
return strings.HasPrefix(err.Error(), "READONLY ")
}

View File

@ -2,66 +2,184 @@ package pool
import (
"context"
"github.com/go-redis/redis/internal"
"fmt"
"sync/atomic"
)
const (
stateDefault = 0
stateInited = 1
stateClosed = 2
)
var ErrBadConn = fmt.Errorf("pg: Conn is in a bad state")
type SingleConnPool struct {
cn *Conn
cnClosed bool
pool Pooler
state uint32 // atomic
ch chan *Conn
level int32 // atomic
_hasBadConn uint32 // atomic
}
var _ Pooler = (*SingleConnPool)(nil)
func NewSingleConnPool(cn *Conn) *SingleConnPool {
return &SingleConnPool{
cn: cn,
func NewSingleConnPool(pool Pooler) *SingleConnPool {
p, ok := pool.(*SingleConnPool)
if !ok {
p = &SingleConnPool{
pool: pool,
ch: make(chan *Conn, 1),
}
}
atomic.AddInt32(&p.level, 1)
return p
}
func (p *SingleConnPool) SetConn(cn *Conn) {
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
p.ch <- cn
} else {
panic("not reached")
}
}
func (p *SingleConnPool) NewConn(context.Context) (*Conn, error) {
panic("not implemented")
func (p *SingleConnPool) NewConn(c context.Context) (*Conn, error) {
return p.pool.NewConn(c)
}
func (p *SingleConnPool) CloseConn(*Conn) error {
panic("not implemented")
func (p *SingleConnPool) CloseConn(cn *Conn) error {
return p.pool.CloseConn(cn)
}
func (p *SingleConnPool) Get(ctx context.Context) (*Conn, error) {
if p.cnClosed {
return nil, internal.ErrSingleConnPoolClosed
func (p *SingleConnPool) Get(c context.Context) (*Conn, error) {
// In worst case this races with Close which is not a very common operation.
for i := 0; i < 1000; i++ {
switch atomic.LoadUint32(&p.state) {
case stateDefault:
cn, err := p.pool.Get(c)
if err != nil {
return nil, err
}
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
return cn, nil
}
p.pool.Remove(cn)
case stateInited:
if p.hasBadConn() {
return nil, ErrBadConn
}
cn, ok := <-p.ch
if !ok {
return nil, ErrClosed
}
return cn, nil
case stateClosed:
return nil, ErrClosed
default:
panic("not reached")
}
}
return p.cn, nil
return nil, fmt.Errorf("pg: SingleConnPool.Get: infinite loop")
}
func (p *SingleConnPool) Put(cn *Conn) {
if p.cn != cn {
panic("p.cn != cn")
defer func() {
if recover() != nil {
p.freeConn(cn)
}
}()
p.ch <- cn
}
func (p *SingleConnPool) freeConn(cn *Conn) {
if p.hasBadConn() {
p.pool.Remove(cn)
} else {
p.pool.Put(cn)
}
}
func (p *SingleConnPool) Remove(cn *Conn) {
if p.cn != cn {
panic("p.cn != cn")
}
p.cnClosed = true
defer func() {
if recover() != nil {
p.pool.Remove(cn)
}
}()
atomic.StoreUint32(&p._hasBadConn, 1)
p.ch <- cn
}
func (p *SingleConnPool) Len() int {
if p.cnClosed {
switch atomic.LoadUint32(&p.state) {
case stateDefault:
return 0
case stateInited:
return 1
case stateClosed:
return 0
default:
panic("not reached")
}
return 1
}
func (p *SingleConnPool) IdleLen() int {
return 0
return len(p.ch)
}
func (p *SingleConnPool) Stats() *Stats {
return nil
return &Stats{}
}
func (p *SingleConnPool) Close() error {
level := atomic.AddInt32(&p.level, -1)
if level > 0 {
return nil
}
for i := 0; i < 1000; i++ {
state := atomic.LoadUint32(&p.state)
if state == stateClosed {
return ErrClosed
}
if atomic.CompareAndSwapUint32(&p.state, state, stateClosed) {
close(p.ch)
cn, ok := <-p.ch
if ok {
p.freeConn(cn)
}
return nil
}
}
return fmt.Errorf("pg: SingleConnPool.Close: infinite loop")
}
func (p *SingleConnPool) Reset() error {
if !atomic.CompareAndSwapUint32(&p._hasBadConn, 1, 0) {
return nil
}
select {
case cn, ok := <-p.ch:
if !ok {
return ErrClosed
}
p.pool.Remove(cn)
default:
return fmt.Errorf("pg: SingleConnPool does not have a Conn")
}
if !atomic.CompareAndSwapUint32(&p.state, stateInited, stateDefault) {
state := atomic.LoadUint32(&p.state)
return fmt.Errorf("pg: invalid SingleConnPool state: %d", state)
}
return nil
}
func (p *SingleConnPool) hasBadConn() bool {
return atomic.LoadUint32(&p._hasBadConn) == 1
}