mirror of
https://github.com/redis/go-redis.git
synced 2025-07-31 05:04:23 +03:00
Faster and simpler pool.
This commit is contained in:
@ -3,24 +3,34 @@ package pool
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"gopkg.in/bsm/ratelimit.v1"
|
||||
)
|
||||
|
||||
var Logger = log.New(os.Stderr, "redis: ", log.LstdFlags)
|
||||
var Logger = log.New(ioutil.Discard, "redis: ", log.LstdFlags)
|
||||
|
||||
var (
|
||||
ErrClosed = errors.New("redis: client is closed")
|
||||
errConnClosed = errors.New("redis: connection is closed")
|
||||
ErrPoolTimeout = errors.New("redis: connection pool timeout")
|
||||
|
||||
errConnClosed = errors.New("connection is closed")
|
||||
errConnStale = errors.New("connection is stale")
|
||||
)
|
||||
|
||||
var timers = sync.Pool{
|
||||
New: func() interface{} {
|
||||
return time.NewTimer(0)
|
||||
},
|
||||
}
|
||||
|
||||
// PoolStats contains pool state information and accumulated stats.
|
||||
// TODO: remove Waits
|
||||
type PoolStats struct {
|
||||
Requests uint32 // number of times a connection was requested by the pool
|
||||
Hits uint32 // number of times free connection was found in the pool
|
||||
@ -32,10 +42,9 @@ type PoolStats struct {
|
||||
}
|
||||
|
||||
type Pooler interface {
|
||||
First() *Conn
|
||||
Get() (*Conn, error)
|
||||
Put(*Conn) error
|
||||
Replace(*Conn, error) error
|
||||
Remove(*Conn, error) error
|
||||
Len() int
|
||||
FreeLen() int
|
||||
Stats() *PoolStats
|
||||
@ -53,18 +62,23 @@ type ConnPool struct {
|
||||
poolTimeout time.Duration
|
||||
idleTimeout time.Duration
|
||||
|
||||
conns *connList
|
||||
freeConns *connStack
|
||||
stats PoolStats
|
||||
queue chan struct{}
|
||||
|
||||
_closed int32
|
||||
connsMu sync.Mutex
|
||||
conns []*Conn
|
||||
|
||||
freeConnsMu sync.Mutex
|
||||
freeConns []*Conn
|
||||
|
||||
stats PoolStats
|
||||
|
||||
_closed int32 // atomic
|
||||
lastErr atomic.Value
|
||||
}
|
||||
|
||||
var _ Pooler = (*ConnPool)(nil)
|
||||
|
||||
func NewConnPool(dial dialer, poolSize int, poolTimeout, idleTimeout time.Duration) *ConnPool {
|
||||
func NewConnPool(dial dialer, poolSize int, poolTimeout, idleTimeout, idleCheckFrequency time.Duration) *ConnPool {
|
||||
p := &ConnPool{
|
||||
_dial: dial,
|
||||
DialLimiter: ratelimit.New(3*poolSize, time.Second),
|
||||
@ -72,57 +86,19 @@ func NewConnPool(dial dialer, poolSize int, poolTimeout, idleTimeout time.Durati
|
||||
poolTimeout: poolTimeout,
|
||||
idleTimeout: idleTimeout,
|
||||
|
||||
conns: newConnList(poolSize),
|
||||
freeConns: newConnStack(poolSize),
|
||||
queue: make(chan struct{}, poolSize),
|
||||
conns: make([]*Conn, 0, poolSize),
|
||||
freeConns: make([]*Conn, 0, poolSize),
|
||||
}
|
||||
if idleTimeout > 0 {
|
||||
go p.reaper(getIdleCheckFrequency())
|
||||
for i := 0; i < poolSize; i++ {
|
||||
p.queue <- struct{}{}
|
||||
}
|
||||
if idleTimeout > 0 && idleCheckFrequency > 0 {
|
||||
go p.reaper(idleCheckFrequency)
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func (p *ConnPool) Add(cn *Conn) bool {
|
||||
if !p.conns.Reserve() {
|
||||
return false
|
||||
}
|
||||
p.conns.Add(cn)
|
||||
p.Put(cn)
|
||||
return true
|
||||
}
|
||||
|
||||
// First returns first non-idle connection from the pool or nil if
|
||||
// there are no connections.
|
||||
func (p *ConnPool) First() *Conn {
|
||||
for {
|
||||
cn := p.freeConns.Pop()
|
||||
if cn != nil && cn.IsStale(p.idleTimeout) {
|
||||
var err error
|
||||
cn, err = p.replace(cn)
|
||||
if err != nil {
|
||||
Logger.Printf("pool.replace failed: %s", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
return cn
|
||||
}
|
||||
}
|
||||
|
||||
// wait waits for free non-idle connection. It returns nil on timeout.
|
||||
func (p *ConnPool) wait(timeout time.Duration) *Conn {
|
||||
for {
|
||||
cn := p.freeConns.PopWithTimeout(timeout)
|
||||
if cn != nil && cn.IsStale(p.idleTimeout) {
|
||||
var err error
|
||||
cn, err = p.replace(cn)
|
||||
if err != nil {
|
||||
Logger.Printf("pool.replace failed: %s", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
return cn
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ConnPool) dial() (net.Conn, error) {
|
||||
if p.DialLimiter != nil && p.DialLimiter.Limit() {
|
||||
err := fmt.Errorf(
|
||||
@ -148,6 +124,42 @@ func (p *ConnPool) NewConn() (*Conn, error) {
|
||||
return NewConn(netConn), nil
|
||||
}
|
||||
|
||||
func (p *ConnPool) PopFree() *Conn {
|
||||
timer := timers.Get().(*time.Timer)
|
||||
if !timer.Reset(p.poolTimeout) {
|
||||
<-timer.C
|
||||
}
|
||||
|
||||
select {
|
||||
case <-p.queue:
|
||||
timers.Put(timer)
|
||||
case <-timer.C:
|
||||
timers.Put(timer)
|
||||
atomic.AddUint32(&p.stats.Timeouts, 1)
|
||||
return nil
|
||||
}
|
||||
|
||||
p.freeConnsMu.Lock()
|
||||
cn := p.popFree()
|
||||
p.freeConnsMu.Unlock()
|
||||
|
||||
if cn == nil {
|
||||
p.queue <- struct{}{}
|
||||
}
|
||||
return cn
|
||||
}
|
||||
|
||||
func (p *ConnPool) popFree() *Conn {
|
||||
if len(p.freeConns) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
idx := len(p.freeConns) - 1
|
||||
cn := p.freeConns[idx]
|
||||
p.freeConns = p.freeConns[:idx]
|
||||
return cn
|
||||
}
|
||||
|
||||
// Get returns existed connection from the pool or creates a new one.
|
||||
func (p *ConnPool) Get() (*Conn, error) {
|
||||
if p.Closed() {
|
||||
@ -156,31 +168,46 @@ func (p *ConnPool) Get() (*Conn, error) {
|
||||
|
||||
atomic.AddUint32(&p.stats.Requests, 1)
|
||||
|
||||
// Fetch first non-idle connection, if available.
|
||||
if cn := p.First(); cn != nil {
|
||||
timer := timers.Get().(*time.Timer)
|
||||
if !timer.Reset(p.poolTimeout) {
|
||||
<-timer.C
|
||||
}
|
||||
|
||||
select {
|
||||
case <-p.queue:
|
||||
timers.Put(timer)
|
||||
case <-timer.C:
|
||||
timers.Put(timer)
|
||||
atomic.AddUint32(&p.stats.Timeouts, 1)
|
||||
return nil, ErrPoolTimeout
|
||||
}
|
||||
|
||||
p.freeConnsMu.Lock()
|
||||
cn := p.popFree()
|
||||
p.freeConnsMu.Unlock()
|
||||
|
||||
if cn != nil {
|
||||
atomic.AddUint32(&p.stats.Hits, 1)
|
||||
return cn, nil
|
||||
}
|
||||
|
||||
// Try to create a new one.
|
||||
if p.conns.Reserve() {
|
||||
cn, err := p.NewConn()
|
||||
if err != nil {
|
||||
p.conns.CancelReservation()
|
||||
return nil, err
|
||||
if !cn.IsStale(p.idleTimeout) {
|
||||
return cn, nil
|
||||
}
|
||||
p.conns.Add(cn)
|
||||
return cn, nil
|
||||
_ = cn.Close()
|
||||
}
|
||||
|
||||
// Otherwise, wait for the available connection.
|
||||
atomic.AddUint32(&p.stats.Waits, 1)
|
||||
if cn := p.wait(p.poolTimeout); cn != nil {
|
||||
return cn, nil
|
||||
newcn, err := p.NewConn()
|
||||
if err != nil {
|
||||
p.queue <- struct{}{}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
atomic.AddUint32(&p.stats.Timeouts, 1)
|
||||
return nil, ErrPoolTimeout
|
||||
p.connsMu.Lock()
|
||||
if cn != nil {
|
||||
p.remove(cn, errConnStale)
|
||||
}
|
||||
p.conns = append(p.conns, newcn)
|
||||
p.connsMu.Unlock()
|
||||
|
||||
return newcn, nil
|
||||
}
|
||||
|
||||
func (p *ConnPool) Put(cn *Conn) error {
|
||||
@ -188,71 +215,54 @@ func (p *ConnPool) Put(cn *Conn) error {
|
||||
b, _ := cn.Rd.Peek(cn.Rd.Buffered())
|
||||
err := fmt.Errorf("connection has unread data: %q", b)
|
||||
Logger.Print(err)
|
||||
return p.Replace(cn, err)
|
||||
return p.Remove(cn, err)
|
||||
}
|
||||
p.freeConns.Push(cn)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ConnPool) replace(cn *Conn) (*Conn, error) {
|
||||
_ = cn.Close()
|
||||
|
||||
idx := cn.SetIndex(-1)
|
||||
if idx == -1 {
|
||||
return nil, errConnClosed
|
||||
}
|
||||
|
||||
netConn, err := p.dial()
|
||||
if err != nil {
|
||||
p.conns.Remove(idx)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cn = NewConn(netConn)
|
||||
cn.SetIndex(idx)
|
||||
p.conns.Replace(cn)
|
||||
|
||||
return cn, nil
|
||||
}
|
||||
|
||||
func (p *ConnPool) Replace(cn *Conn, reason error) error {
|
||||
p.storeLastErr(reason.Error())
|
||||
|
||||
// Replace existing connection with new one and unblock waiter.
|
||||
newcn, err := p.replace(cn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
p.freeConns.Push(newcn)
|
||||
p.freeConnsMu.Lock()
|
||||
p.freeConns = append(p.freeConns, cn)
|
||||
p.freeConnsMu.Unlock()
|
||||
p.queue <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ConnPool) Remove(cn *Conn, reason error) error {
|
||||
_ = cn.Close()
|
||||
|
||||
idx := cn.SetIndex(-1)
|
||||
if idx == -1 {
|
||||
return errConnClosed
|
||||
}
|
||||
|
||||
p.storeLastErr(reason.Error())
|
||||
p.conns.Remove(idx)
|
||||
p.connsMu.Lock()
|
||||
p.remove(cn, reason)
|
||||
p.connsMu.Unlock()
|
||||
p.queue <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *ConnPool) remove(cn *Conn, reason error) {
|
||||
p.storeLastErr(reason.Error())
|
||||
for i, c := range p.conns {
|
||||
if c == cn {
|
||||
p.conns = append(p.conns[:i], p.conns[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Len returns total number of connections.
|
||||
func (p *ConnPool) Len() int {
|
||||
return p.conns.Len()
|
||||
p.connsMu.Lock()
|
||||
l := len(p.conns)
|
||||
p.connsMu.Unlock()
|
||||
return l
|
||||
}
|
||||
|
||||
// FreeLen returns number of free connections.
|
||||
func (p *ConnPool) FreeLen() int {
|
||||
return p.freeConns.Len()
|
||||
p.freeConnsMu.Lock()
|
||||
l := len(p.freeConns)
|
||||
p.freeConnsMu.Unlock()
|
||||
return l
|
||||
}
|
||||
|
||||
func (p *ConnPool) Stats() *PoolStats {
|
||||
stats := p.stats
|
||||
stats := PoolStats{}
|
||||
stats.Requests = atomic.LoadUint32(&p.stats.Requests)
|
||||
stats.Hits = atomic.LoadUint32(&p.stats.Hits)
|
||||
stats.Waits = atomic.LoadUint32(&p.stats.Waits)
|
||||
stats.Timeouts = atomic.LoadUint32(&p.stats.Timeouts)
|
||||
stats.TotalConns = uint32(p.Len())
|
||||
@ -269,16 +279,10 @@ func (p *ConnPool) Close() (retErr error) {
|
||||
return ErrClosed
|
||||
}
|
||||
|
||||
// Wait for app to free connections, but don't close them immediately.
|
||||
for i := 0; i < p.Len()-p.FreeLen(); i++ {
|
||||
if cn := p.wait(3 * time.Second); cn == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
p.connsMu.Lock()
|
||||
|
||||
// Close all connections.
|
||||
cns := p.conns.Reset()
|
||||
for _, cn := range cns {
|
||||
for _, cn := range p.conns {
|
||||
if cn == nil {
|
||||
continue
|
||||
}
|
||||
@ -286,6 +290,12 @@ func (p *ConnPool) Close() (retErr error) {
|
||||
retErr = err
|
||||
}
|
||||
}
|
||||
p.conns = nil
|
||||
p.connsMu.Unlock()
|
||||
|
||||
p.freeConnsMu.Lock()
|
||||
p.freeConns = nil
|
||||
p.freeConnsMu.Unlock()
|
||||
|
||||
return retErr
|
||||
}
|
||||
@ -298,16 +308,32 @@ func (p *ConnPool) closeConn(cn *Conn) error {
|
||||
}
|
||||
|
||||
func (p *ConnPool) ReapStaleConns() (n int, err error) {
|
||||
for {
|
||||
cn := p.freeConns.ShiftStale(p.idleTimeout)
|
||||
if cn == nil {
|
||||
<-p.queue
|
||||
p.freeConnsMu.Lock()
|
||||
|
||||
if len(p.freeConns) == 0 {
|
||||
p.freeConnsMu.Unlock()
|
||||
p.queue <- struct{}{}
|
||||
return
|
||||
}
|
||||
|
||||
var idx int
|
||||
var cn *Conn
|
||||
for idx, cn = range p.freeConns {
|
||||
if !cn.IsStale(p.idleTimeout) {
|
||||
break
|
||||
}
|
||||
if err = p.Remove(cn, errors.New("connection is stale")); err != nil {
|
||||
return
|
||||
}
|
||||
p.connsMu.Lock()
|
||||
p.remove(cn, errConnStale)
|
||||
p.connsMu.Unlock()
|
||||
n++
|
||||
}
|
||||
if idx > 0 {
|
||||
p.freeConns = append(p.freeConns[:0], p.freeConns[idx:]...)
|
||||
}
|
||||
|
||||
p.freeConnsMu.Unlock()
|
||||
p.queue <- struct{}{}
|
||||
return
|
||||
}
|
||||
|
||||
@ -322,9 +348,13 @@ func (p *ConnPool) reaper(frequency time.Duration) {
|
||||
n, err := p.ReapStaleConns()
|
||||
if err != nil {
|
||||
Logger.Printf("ReapStaleConns failed: %s", err)
|
||||
} else if n > 0 {
|
||||
Logger.Printf("removed %d stale connections", n)
|
||||
continue
|
||||
}
|
||||
s := p.Stats()
|
||||
Logger.Printf(
|
||||
"reaper: removed %d stale conns (TotalConns=%d FreeConns=%d Requests=%d Hits=%d Timeouts=%d)",
|
||||
n, s.TotalConns, s.FreeConns, s.Requests, s.Hits, s.Timeouts,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
|
Reference in New Issue
Block a user