mirror of
https://github.com/redis/go-redis.git
synced 2025-07-29 17:41:15 +03:00
Retry BadConnError
This commit is contained in:
@ -86,7 +86,7 @@ func BenchmarkPoolGetRemove(b *testing.B) {
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
connPool.Remove(cn)
|
||||
connPool.Remove(cn, nil)
|
||||
}
|
||||
})
|
||||
})
|
||||
|
@ -39,7 +39,7 @@ type Pooler interface {
|
||||
|
||||
Get(context.Context) (*Conn, error)
|
||||
Put(*Conn)
|
||||
Remove(*Conn)
|
||||
Remove(*Conn, error)
|
||||
|
||||
Len() int
|
||||
IdleLen() int
|
||||
@ -311,7 +311,7 @@ func (p *ConnPool) popIdle() *Conn {
|
||||
|
||||
func (p *ConnPool) Put(cn *Conn) {
|
||||
if !cn.pooled {
|
||||
p.Remove(cn)
|
||||
p.Remove(cn, nil)
|
||||
return
|
||||
}
|
||||
|
||||
@ -322,7 +322,7 @@ func (p *ConnPool) Put(cn *Conn) {
|
||||
p.freeTurn()
|
||||
}
|
||||
|
||||
func (p *ConnPool) Remove(cn *Conn) {
|
||||
func (p *ConnPool) Remove(cn *Conn, reason error) {
|
||||
p.removeConnWithLock(cn)
|
||||
p.freeTurn()
|
||||
_ = p.closeConn(cn)
|
||||
|
@ -12,7 +12,17 @@ const (
|
||||
stateClosed = 2
|
||||
)
|
||||
|
||||
var ErrBadConn = fmt.Errorf("pg: Conn is in a bad state")
|
||||
type BadConnError struct {
|
||||
wrapped error
|
||||
}
|
||||
|
||||
func (e BadConnError) Error() string {
|
||||
return "pg: Conn is in a bad state"
|
||||
}
|
||||
|
||||
func (e BadConnError) Unwrap() error {
|
||||
return e.wrapped
|
||||
}
|
||||
|
||||
type SingleConnPool struct {
|
||||
pool Pooler
|
||||
@ -20,8 +30,8 @@ type SingleConnPool struct {
|
||||
state uint32 // atomic
|
||||
ch chan *Conn
|
||||
|
||||
level int32 // atomic
|
||||
_hasBadConn uint32 // atomic
|
||||
level int32 // atomic
|
||||
_badConnError atomic.Value
|
||||
}
|
||||
|
||||
var _ Pooler = (*SingleConnPool)(nil)
|
||||
@ -66,10 +76,10 @@ func (p *SingleConnPool) Get(c context.Context) (*Conn, error) {
|
||||
if atomic.CompareAndSwapUint32(&p.state, stateDefault, stateInited) {
|
||||
return cn, nil
|
||||
}
|
||||
p.pool.Remove(cn)
|
||||
p.pool.Remove(cn, ErrClosed)
|
||||
case stateInited:
|
||||
if p.hasBadConn() {
|
||||
return nil, ErrBadConn
|
||||
if err := p.badConnError(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
cn, ok := <-p.ch
|
||||
if !ok {
|
||||
@ -95,20 +105,20 @@ func (p *SingleConnPool) Put(cn *Conn) {
|
||||
}
|
||||
|
||||
func (p *SingleConnPool) freeConn(cn *Conn) {
|
||||
if p.hasBadConn() {
|
||||
p.pool.Remove(cn)
|
||||
if err := p.badConnError(); err != nil {
|
||||
p.pool.Remove(cn, err)
|
||||
} else {
|
||||
p.pool.Put(cn)
|
||||
}
|
||||
}
|
||||
|
||||
func (p *SingleConnPool) Remove(cn *Conn) {
|
||||
func (p *SingleConnPool) Remove(cn *Conn, reason error) {
|
||||
defer func() {
|
||||
if recover() != nil {
|
||||
p.pool.Remove(cn)
|
||||
p.pool.Remove(cn, ErrClosed)
|
||||
}
|
||||
}()
|
||||
atomic.StoreUint32(&p._hasBadConn, 1)
|
||||
p._badConnError.Store(BadConnError{wrapped: reason})
|
||||
p.ch <- cn
|
||||
}
|
||||
|
||||
@ -158,7 +168,7 @@ func (p *SingleConnPool) Close() error {
|
||||
}
|
||||
|
||||
func (p *SingleConnPool) Reset() error {
|
||||
if !atomic.CompareAndSwapUint32(&p._hasBadConn, 1, 0) {
|
||||
if p.badConnError() == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -167,7 +177,8 @@ func (p *SingleConnPool) Reset() error {
|
||||
if !ok {
|
||||
return ErrClosed
|
||||
}
|
||||
p.pool.Remove(cn)
|
||||
p.pool.Remove(cn, ErrClosed)
|
||||
p._badConnError.Store(nil)
|
||||
default:
|
||||
return fmt.Errorf("pg: SingleConnPool does not have a Conn")
|
||||
}
|
||||
@ -180,6 +191,9 @@ func (p *SingleConnPool) Reset() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *SingleConnPool) hasBadConn() bool {
|
||||
return atomic.LoadUint32(&p._hasBadConn) == 1
|
||||
func (p *SingleConnPool) badConnError() error {
|
||||
if v := p._badConnError.Load(); v != nil {
|
||||
return v.(BadConnError)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -58,13 +58,13 @@ func (p *StickyConnPool) putUpstream() {
|
||||
|
||||
func (p *StickyConnPool) Put(cn *Conn) {}
|
||||
|
||||
func (p *StickyConnPool) removeUpstream() {
|
||||
p.pool.Remove(p.cn)
|
||||
func (p *StickyConnPool) removeUpstream(reason error) {
|
||||
p.pool.Remove(p.cn, reason)
|
||||
p.cn = nil
|
||||
}
|
||||
|
||||
func (p *StickyConnPool) Remove(cn *Conn) {
|
||||
p.removeUpstream()
|
||||
func (p *StickyConnPool) Remove(cn *Conn, reason error) {
|
||||
p.removeUpstream(reason)
|
||||
}
|
||||
|
||||
func (p *StickyConnPool) Len() int {
|
||||
@ -104,7 +104,7 @@ func (p *StickyConnPool) Close() error {
|
||||
if p.reusable {
|
||||
p.putUpstream()
|
||||
} else {
|
||||
p.removeUpstream()
|
||||
p.removeUpstream(ErrClosed)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -65,7 +65,7 @@ var _ = Describe("ConnPool", func() {
|
||||
// ok
|
||||
}
|
||||
|
||||
connPool.Remove(cn)
|
||||
connPool.Remove(cn, nil)
|
||||
|
||||
// Check that Get is unblocked.
|
||||
select {
|
||||
@ -128,7 +128,7 @@ var _ = Describe("MinIdleConns", func() {
|
||||
|
||||
Context("after Remove", func() {
|
||||
BeforeEach(func() {
|
||||
connPool.Remove(cn)
|
||||
connPool.Remove(cn, nil)
|
||||
})
|
||||
|
||||
It("has idle connections", func() {
|
||||
@ -205,7 +205,7 @@ var _ = Describe("MinIdleConns", func() {
|
||||
BeforeEach(func() {
|
||||
perform(len(cns), func(i int) {
|
||||
mu.RLock()
|
||||
connPool.Remove(cns[i])
|
||||
connPool.Remove(cns[i], nil)
|
||||
mu.RUnlock()
|
||||
})
|
||||
|
||||
@ -355,7 +355,7 @@ var _ = Describe("conns reaper", func() {
|
||||
Expect(connPool.Len()).To(Equal(4))
|
||||
Expect(connPool.IdleLen()).To(Equal(0))
|
||||
|
||||
connPool.Remove(cn)
|
||||
connPool.Remove(cn, nil)
|
||||
|
||||
Expect(connPool.Len()).To(Equal(3))
|
||||
Expect(connPool.IdleLen()).To(Equal(0))
|
||||
@ -413,7 +413,7 @@ var _ = Describe("race", func() {
|
||||
cn, err := connPool.Get(c)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if err == nil {
|
||||
connPool.Remove(cn)
|
||||
connPool.Remove(cn, nil)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
Reference in New Issue
Block a user