mirror of
https://github.com/redis/go-redis.git
synced 2025-10-20 09:52:25 +03:00
few small improvements
This commit is contained in:
@@ -1,6 +1,7 @@
|
|||||||
package auth
|
package auth
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/redis/go-redis/v9/internal/pool"
|
"github.com/redis/go-redis/v9/internal/pool"
|
||||||
@@ -47,12 +48,14 @@ func (c *ConnReAuthCredentialsListener) OnNext(credentials Credentials) {
|
|||||||
// this is important because the connection pool may be in the process of reconnecting the connection
|
// this is important because the connection pool may be in the process of reconnecting the connection
|
||||||
// and we don't want to interfere with that process
|
// and we don't want to interfere with that process
|
||||||
// but we also don't want to block for too long, so incorporate a timeout
|
// but we also don't want to block for too long, so incorporate a timeout
|
||||||
|
compandswap:
|
||||||
for !c.conn.Usable.CompareAndSwap(true, false) {
|
for !c.conn.Usable.CompareAndSwap(true, false) {
|
||||||
select {
|
select {
|
||||||
case <-timeout:
|
case <-timeout:
|
||||||
err = pool.ErrConnUnusableTimeout
|
err = pool.ErrConnUnusableTimeout
|
||||||
break
|
break compandswap
|
||||||
default:
|
default:
|
||||||
|
runtime.Gosched()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -61,9 +64,7 @@ func (c *ConnReAuthCredentialsListener) OnNext(credentials Credentials) {
|
|||||||
}
|
}
|
||||||
// we set the usable flag, so restore it back to usable after we're done
|
// we set the usable flag, so restore it back to usable after we're done
|
||||||
defer c.conn.SetUsable(true)
|
defer c.conn.SetUsable(true)
|
||||||
|
if err = c.reAuth(c.conn, credentials); err != nil {
|
||||||
err = c.reAuth(c.conn, credentials)
|
|
||||||
if err != nil {
|
|
||||||
c.OnError(err)
|
c.OnError(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -94,6 +95,5 @@ func NewConnReAuthCredentialsListener(conn *pool.Conn, reAuth func(conn *pool.Co
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Ensure ConnReAuthCredentialsListener implements the CredentialsListener interface.
|
// Ensure ConnReAuthCredentialsListener implements the CredentialsListener interface.
|
||||||
var _ CredentialsListener = (*ConnReAuthCredentialsListener)(nil)
|
var _ CredentialsListener = (*ConnReAuthCredentialsListener)(nil)
|
||||||
|
@@ -87,7 +87,7 @@ type Conn struct {
|
|||||||
// Stores *HandoffState to ensure atomic updates of all handoff-related fields
|
// Stores *HandoffState to ensure atomic updates of all handoff-related fields
|
||||||
handoffStateAtomic atomic.Value // stores *HandoffState
|
handoffStateAtomic atomic.Value // stores *HandoffState
|
||||||
|
|
||||||
onClose func() error
|
onClose func() error
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConn(netConn net.Conn) *Conn {
|
func NewConn(netConn net.Conn) *Conn {
|
||||||
@@ -456,15 +456,16 @@ func (cn *Conn) MarkQueuedForHandoff() error {
|
|||||||
const maxRetries = 50
|
const maxRetries = 50
|
||||||
const baseDelay = time.Microsecond
|
const baseDelay = time.Microsecond
|
||||||
|
|
||||||
|
connAquired := false
|
||||||
for attempt := 0; attempt < maxRetries; attempt++ {
|
for attempt := 0; attempt < maxRetries; attempt++ {
|
||||||
// first we need to mark the connection as not usable
|
// first we need to mark the connection as not usable
|
||||||
// to prevent the pool from returning it to the caller
|
// to prevent the pool from returning it to the caller
|
||||||
if !cn.Usable.CompareAndSwap(true, false) {
|
if !connAquired && !cn.Usable.CompareAndSwap(true, false) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
connAquired = true
|
||||||
|
|
||||||
currentState := cn.getHandoffState()
|
currentState := cn.getHandoffState()
|
||||||
|
|
||||||
// Check if marked for handoff
|
// Check if marked for handoff
|
||||||
if !currentState.ShouldHandoff {
|
if !currentState.ShouldHandoff {
|
||||||
return errors.New("connection was not marked for handoff")
|
return errors.New("connection was not marked for handoff")
|
||||||
@@ -479,6 +480,9 @@ func (cn *Conn) MarkQueuedForHandoff() error {
|
|||||||
|
|
||||||
// Atomic compare-and-swap to update state
|
// Atomic compare-and-swap to update state
|
||||||
if cn.handoffStateAtomic.CompareAndSwap(currentState, newState) {
|
if cn.handoffStateAtomic.CompareAndSwap(currentState, newState) {
|
||||||
|
if connAquired {
|
||||||
|
cn.Usable.Store(true)
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
4
redis.go
4
redis.go
@@ -322,7 +322,7 @@ func (c *baseClient) connReAuthCredentialsListener(poolCn *pool.Conn) (auth.Cred
|
|||||||
// Design decision: The main case we expect the connection to be in a non-usable state is when it is being reconnected
|
// Design decision: The main case we expect the connection to be in a non-usable state is when it is being reconnected
|
||||||
// during a handoff from maintnotifications.
|
// during a handoff from maintnotifications.
|
||||||
// Setting the checkUsableTimeout to the handoff timeout if maintnotifications are enabled
|
// Setting the checkUsableTimeout to the handoff timeout if maintnotifications are enabled
|
||||||
// the default timeout if no maintnotifications are disabled is going to be 1 second.
|
// the default timeout if no maintnotifications are disabled is going to PoolTimeout.
|
||||||
//
|
//
|
||||||
// Note: Due to the auto by default mode of MaintNotificationsConfig
|
// Note: Due to the auto by default mode of MaintNotificationsConfig
|
||||||
// the timeout for the first connection will probably be the value of MaintNotificationsConfig.HandoffTimeout
|
// the timeout for the first connection will probably be the value of MaintNotificationsConfig.HandoffTimeout
|
||||||
@@ -330,6 +330,8 @@ func (c *baseClient) connReAuthCredentialsListener(poolCn *pool.Conn) (auth.Cred
|
|||||||
// of enabling maintnotifications later.
|
// of enabling maintnotifications later.
|
||||||
if c.opt.MaintNotificationsConfig != nil && c.opt.MaintNotificationsConfig.Mode != maintnotifications.ModeDisabled {
|
if c.opt.MaintNotificationsConfig != nil && c.opt.MaintNotificationsConfig.Mode != maintnotifications.ModeDisabled {
|
||||||
newCredListener.SetCheckUsableTimeout(c.opt.MaintNotificationsConfig.HandoffTimeout)
|
newCredListener.SetCheckUsableTimeout(c.opt.MaintNotificationsConfig.HandoffTimeout)
|
||||||
|
} else {
|
||||||
|
newCredListener.SetCheckUsableTimeout(c.opt.PoolTimeout)
|
||||||
}
|
}
|
||||||
c.credListeners[poolCn] = newCredListener
|
c.credListeners[poolCn] = newCredListener
|
||||||
return newCredListener, func() {
|
return newCredListener, func() {
|
||||||
|
Reference in New Issue
Block a user