1
0
mirror of https://github.com/redis/go-redis.git synced 2025-08-08 23:42:06 +03:00

fix: remove conn reaper from the pool and uptrace option names

This commit is contained in:
Vladimir Mihailenco
2022-07-28 15:11:35 +03:00
parent ae6c6deaf4
commit f6a8adc50c
14 changed files with 270 additions and 485 deletions

View File

@@ -57,13 +57,13 @@ type Options struct {
Dialer func(context.Context) (net.Conn, error)
OnClose func(*Conn) error
PoolFIFO bool
PoolSize int
MinIdleConns int
MaxConnAge time.Duration
PoolTimeout time.Duration
IdleTimeout time.Duration
IdleCheckFrequency time.Duration
PoolFIFO bool
PoolSize int
PoolTimeout time.Duration
MinIdleConns int
MaxIdleConns int
ConnMaxIdleTime time.Duration
ConnMaxLifetime time.Duration
}
type lastDialErrorWrap struct {
@@ -71,17 +71,17 @@ type lastDialErrorWrap struct {
}
type ConnPool struct {
opt *Options
cfg *Options
dialErrorsNum uint32 // atomic
lastDialError atomic.Value
queue chan struct{}
connsMu sync.Mutex
conns []*Conn
idleConns []*Conn
connsMu sync.Mutex
conns []*Conn
idleConns []*Conn
poolSize int
idleConnsLen int
@@ -95,7 +95,7 @@ var _ Pooler = (*ConnPool)(nil)
func NewConnPool(opt *Options) *ConnPool {
p := &ConnPool{
opt: opt,
cfg: opt,
queue: make(chan struct{}, opt.PoolSize),
conns: make([]*Conn, 0, opt.PoolSize),
@@ -107,18 +107,14 @@ func NewConnPool(opt *Options) *ConnPool {
p.checkMinIdleConns()
p.connsMu.Unlock()
if opt.IdleTimeout > 0 && opt.IdleCheckFrequency > 0 {
go p.reaper(opt.IdleCheckFrequency)
}
return p
}
func (p *ConnPool) checkMinIdleConns() {
if p.opt.MinIdleConns == 0 {
if p.cfg.MinIdleConns == 0 {
return
}
for p.poolSize < p.opt.PoolSize && p.idleConnsLen < p.opt.MinIdleConns {
for p.poolSize < p.cfg.PoolSize && p.idleConnsLen < p.cfg.MinIdleConns {
p.poolSize++
p.idleConnsLen++
@@ -176,7 +172,7 @@ func (p *ConnPool) newConn(ctx context.Context, pooled bool) (*Conn, error) {
p.conns = append(p.conns, cn)
if pooled {
// If pool is full remove the cn on next Put.
if p.poolSize >= p.opt.PoolSize {
if p.poolSize >= p.cfg.PoolSize {
cn.pooled = false
} else {
p.poolSize++
@@ -191,14 +187,14 @@ func (p *ConnPool) dialConn(ctx context.Context, pooled bool) (*Conn, error) {
return nil, ErrClosed
}
if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.opt.PoolSize) {
if atomic.LoadUint32(&p.dialErrorsNum) >= uint32(p.cfg.PoolSize) {
return nil, p.getLastDialError()
}
netConn, err := p.opt.Dialer(ctx)
netConn, err := p.cfg.Dialer(ctx)
if err != nil {
p.setLastDialError(err)
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.opt.PoolSize) {
if atomic.AddUint32(&p.dialErrorsNum, 1) == uint32(p.cfg.PoolSize) {
go p.tryDial()
}
return nil, err
@@ -215,7 +211,7 @@ func (p *ConnPool) tryDial() {
return
}
conn, err := p.opt.Dialer(context.Background())
conn, err := p.cfg.Dialer(context.Background())
if err != nil {
p.setLastDialError(err)
time.Sleep(time.Second)
@@ -263,7 +259,7 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
break
}
if p.isStaleConn(cn) {
if !p.isHealthyConn(cn) {
_ = p.CloseConn(cn)
continue
}
@@ -283,10 +279,6 @@ func (p *ConnPool) Get(ctx context.Context) (*Conn, error) {
return newcn, nil
}
func (p *ConnPool) getTurn() {
p.queue <- struct{}{}
}
func (p *ConnPool) waitTurn(ctx context.Context) error {
select {
case <-ctx.Done():
@@ -301,7 +293,7 @@ func (p *ConnPool) waitTurn(ctx context.Context) error {
}
timer := timers.Get().(*time.Timer)
timer.Reset(p.opt.PoolTimeout)
timer.Reset(p.cfg.PoolTimeout)
select {
case <-ctx.Done():
@@ -337,7 +329,7 @@ func (p *ConnPool) popIdle() (*Conn, error) {
}
var cn *Conn
if p.opt.PoolFIFO {
if p.cfg.PoolFIFO {
cn = p.idleConns[0]
copy(p.idleConns, p.idleConns[1:])
p.idleConns = p.idleConns[:n-1]
@@ -363,11 +355,25 @@ func (p *ConnPool) Put(ctx context.Context, cn *Conn) {
return
}
var shouldCloseConn bool
p.connsMu.Lock()
p.idleConns = append(p.idleConns, cn)
p.idleConnsLen++
if p.cfg.MaxIdleConns == 0 || p.idleConnsLen < p.cfg.MaxIdleConns {
p.idleConns = append(p.idleConns, cn)
p.idleConnsLen++
} else {
p.removeConn(cn)
shouldCloseConn = true
}
p.connsMu.Unlock()
p.freeTurn()
if shouldCloseConn {
_ = p.closeConn(cn)
}
}
func (p *ConnPool) Remove(ctx context.Context, cn *Conn, reason error) {
@@ -383,8 +389,8 @@ func (p *ConnPool) CloseConn(cn *Conn) error {
func (p *ConnPool) removeConnWithLock(cn *Conn) {
p.connsMu.Lock()
defer p.connsMu.Unlock()
p.removeConn(cn)
p.connsMu.Unlock()
}
func (p *ConnPool) removeConn(cn *Conn) {
@@ -395,14 +401,14 @@ func (p *ConnPool) removeConn(cn *Conn) {
p.poolSize--
p.checkMinIdleConns()
}
return
break
}
}
}
func (p *ConnPool) closeConn(cn *Conn) error {
if p.opt.OnClose != nil {
_ = p.opt.OnClose(cn)
if p.cfg.OnClose != nil {
_ = p.cfg.OnClose(cn)
}
return cn.Close()
}
@@ -477,81 +483,21 @@ func (p *ConnPool) Close() error {
return firstErr
}
func (p *ConnPool) reaper(frequency time.Duration) {
ticker := time.NewTicker(frequency)
defer ticker.Stop()
for {
select {
case <-ticker.C:
// It is possible that ticker and closedCh arrive together,
// and select pseudo-randomly pick ticker case, we double
// check here to prevent being executed after closed.
if p.closed() {
return
}
_, err := p.ReapStaleConns()
if err != nil {
internal.Logger.Printf(context.Background(), "ReapStaleConns failed: %s", err)
continue
}
case <-p.closedCh:
return
}
}
}
func (p *ConnPool) ReapStaleConns() (int, error) {
var n int
for {
p.getTurn()
p.connsMu.Lock()
cn := p.reapStaleConn()
p.connsMu.Unlock()
p.freeTurn()
if cn != nil {
_ = p.closeConn(cn)
n++
} else {
break
}
}
atomic.AddUint32(&p.stats.StaleConns, uint32(n))
return n, nil
}
func (p *ConnPool) reapStaleConn() *Conn {
if len(p.idleConns) == 0 {
return nil
}
cn := p.idleConns[0]
if !p.isStaleConn(cn) {
return nil
}
p.idleConns = append(p.idleConns[:0], p.idleConns[1:]...)
p.idleConnsLen--
p.removeConn(cn)
return cn
}
func (p *ConnPool) isStaleConn(cn *Conn) bool {
if p.opt.IdleTimeout == 0 && p.opt.MaxConnAge == 0 {
return connCheck(cn.netConn) != nil
}
func (p *ConnPool) isHealthyConn(cn *Conn) bool {
now := time.Now()
if p.opt.IdleTimeout > 0 && now.Sub(cn.UsedAt()) >= p.opt.IdleTimeout {
return true
if p.cfg.ConnMaxLifetime > 0 && now.Sub(cn.createdAt) >= p.cfg.ConnMaxLifetime {
return false
}
if p.opt.MaxConnAge > 0 && now.Sub(cn.createdAt) >= p.opt.MaxConnAge {
return true
if p.cfg.ConnMaxIdleTime > 0 && now.Sub(cn.UsedAt()) >= p.cfg.ConnMaxIdleTime {
atomic.AddUint32(&p.stats.IdleConns, 1)
return false
}
return connCheck(cn.netConn) != nil
if connCheck(cn.netConn) != nil {
return false
}
cn.SetUsedAt(now)
return true
}