mirror of
https://github.com/redis/go-redis.git
synced 2025-07-31 05:04:23 +03:00
Gracefully handle situation when Redis Server is down
This commit is contained in:
@ -46,14 +46,21 @@ type Pooler interface {
|
||||
Close() error
|
||||
}
|
||||
|
||||
type dialer func() (net.Conn, error)
|
||||
|
||||
type ConnPool struct {
|
||||
dial dialer
|
||||
type Options struct {
|
||||
Dialer func() (net.Conn, error)
|
||||
OnClose func(*Conn) error
|
||||
|
||||
poolTimeout time.Duration
|
||||
idleTimeout time.Duration
|
||||
PoolSize int
|
||||
PoolTimeout time.Duration
|
||||
IdleTimeout time.Duration
|
||||
IdleCheckFrequency time.Duration
|
||||
}
|
||||
|
||||
type ConnPool struct {
|
||||
opt *Options
|
||||
|
||||
dialErrorsNum uint32 // atomic
|
||||
_lastDialError atomic.Value
|
||||
|
||||
queue chan struct{}
|
||||
|
||||
@ -65,24 +72,21 @@ type ConnPool struct {
|
||||
|
||||
stats Stats
|
||||
|
||||
_closed int32 // atomic
|
||||
_closed uint32 // atomic
|
||||
}
|
||||
|
||||
var _ Pooler = (*ConnPool)(nil)
|
||||
|
||||
func NewConnPool(dial dialer, poolSize int, poolTimeout, idleTimeout, idleCheckFrequency time.Duration) *ConnPool {
|
||||
func NewConnPool(opt *Options) *ConnPool {
|
||||
p := &ConnPool{
|
||||
dial: dial,
|
||||
opt: opt,
|
||||
|
||||
poolTimeout: poolTimeout,
|
||||
idleTimeout: idleTimeout,
|
||||
|
||||
queue: make(chan struct{}, poolSize),
|
||||
conns: make([]*Conn, 0, poolSize),
|
||||
freeConns: make([]*Conn, 0, poolSize),
|
||||
queue: make(chan struct{}, opt.PoolSize),
|
||||
conns: make([]*Conn, 0, opt.PoolSize),
|
||||
freeConns: make([]*Conn, 0, opt.PoolSize),
|
||||
}
|
||||
if idleTimeout > 0 && idleCheckFrequency > 0 {
|
||||
go p.reaper(idleCheckFrequency)
|
||||
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
|
||||
go p.reaper(opt.IdleCheckFrequency)
|
||||
}
|
||||
return p
|
||||
}
|
||||
@ -92,8 +96,16 @@ func (p *ConnPool) NewConn() (*Conn, error) {
|
||||
return nil, ErrClosed
|
||||
}
|
||||
|
||||
netConn, err := p.dial()
|
||||
if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
|
||||
return nil, p.lastDialError()
|
||||
}
|
||||
|
||||
netConn, err := p.opt.Dialer()
|
||||
if err != nil {
|
||||
p.setLastDialError(err)
|
||||
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
|
||||
go p.tryDial()
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -105,12 +117,35 @@ func (p *ConnPool) NewConn() (*Conn, error) {
|
||||
return cn, nil
|
||||
}
|
||||
|
||||
func (p *ConnPool) tryDial() {
|
||||
for {
|
||||
conn, err := p.opt.Dialer()
|
||||
if err != nil {
|
||||
p.setLastDialError(err)
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
|
||||
atomic.StoreUint32(&p.dialErrorsNum, 0)
|
||||
_ = conn.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ConnPool) setLastDialError(err error) {
|
||||
p._lastDialError.Store(err)
|
||||
}
|
||||
|
||||
func (p *ConnPool) lastDialError() error {
|
||||
return p._lastDialError.Load().(error)
|
||||
}
|
||||
|
||||
func (p *ConnPool) PopFree() *Conn {
|
||||
select {
|
||||
case p.queue <- struct{}{}:
|
||||
default:
|
||||
timer := timers.Get().(*time.Timer)
|
||||
timer.Reset(p.poolTimeout)
|
||||
timer.Reset(p.opt.PoolTimeout)
|
||||
|
||||
select {
|
||||
case p.queue <- struct{}{}:
|
||||
@ -158,7 +193,7 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
|
||||
case p.queue <- struct{}{}:
|
||||
default:
|
||||
timer := timers.Get().(*time.Timer)
|
||||
timer.Reset(p.poolTimeout)
|
||||
timer.Reset(p.opt.PoolTimeout)
|
||||
|
||||
select {
|
||||
case p.queue <- struct{}{}:
|
||||
@ -182,7 +217,7 @@ func (p *ConnPool) Get() (*Conn, bool, error) {
|
||||
break
|
||||
}
|
||||
|
||||
if cn.IsStale(p.idleTimeout) {
|
||||
if cn.IsStale(p.opt.IdleTimeout) {
|
||||
p.CloseConn(cn)
|
||||
continue
|
||||
}
|
||||
@ -232,8 +267,8 @@ func (p *ConnPool) CloseConn(cn *Conn) error {
|
||||
}
|
||||
|
||||
func (p *ConnPool) closeConn(cn *Conn) error {
|
||||
if p.OnClose != nil {
|
||||
_ = p.OnClose(cn)
|
||||
if p.opt.OnClose != nil {
|
||||
_ = p.opt.OnClose(cn)
|
||||
}
|
||||
return cn.Close()
|
||||
}
|
||||
@ -265,11 +300,11 @@ func (p *ConnPool) Stats() *Stats {
|
||||
}
|
||||
|
||||
func (p *ConnPool) closed() bool {
|
||||
return atomic.LoadInt32(&p._closed) == 1
|
||||
return atomic.LoadUint32(&p._closed) == 1
|
||||
}
|
||||
|
||||
func (p *ConnPool) Close() error {
|
||||
if !atomic.CompareAndSwapInt32(&p._closed, 0, 1) {
|
||||
if !atomic.CompareAndSwapUint32(&p._closed, 0, 1) {
|
||||
return ErrClosed
|
||||
}
|
||||
|
||||
@ -299,7 +334,7 @@ func (p *ConnPool) reapStaleConn() bool {
|
||||
}
|
||||
|
||||
cn := p.freeConns[0]
|
||||
if !cn.IsStale(p.idleTimeout) {
|
||||
if !cn.IsStale(p.opt.IdleTimeout) {
|
||||
return false
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user