mirror of
https://github.com/redis/go-redis.git
synced 2025-12-25 00:01:00 +03:00
feat(options): Clean failing timeout implementation (#3472)
* Fix hard code of failing timeout 1. if not set failing time limit, default is 15 seconds. * feat: Complete configurable FailingTimeoutSeconds implementation --------- Co-authored-by: Shino Wu <shino_wu@trendmicro.com>
This commit is contained in:
@@ -231,6 +231,11 @@ type Options struct {
|
||||
// UnstableResp3 enables Unstable mode for Redis Search module with RESP3.
|
||||
// When unstable mode is enabled, the client will use RESP3 protocol and only be able to use RawResult
|
||||
UnstableResp3 bool
|
||||
|
||||
// FailingTimeoutSeconds is the timeout in seconds for marking a cluster node as failing.
|
||||
// When a node is marked as failing, it will be avoided for this duration.
|
||||
// Default is 15 seconds.
|
||||
FailingTimeoutSeconds int
|
||||
}
|
||||
|
||||
func (opt *Options) init() {
|
||||
|
||||
@@ -124,6 +124,11 @@ type ClusterOptions struct {
|
||||
|
||||
// UnstableResp3 enables Unstable mode for Redis Search module with RESP3.
|
||||
UnstableResp3 bool
|
||||
|
||||
// FailingTimeoutSeconds is the timeout in seconds for marking a cluster node as failing.
|
||||
// When a node is marked as failing, it will be avoided for this duration.
|
||||
// Default is 15 seconds.
|
||||
FailingTimeoutSeconds int
|
||||
}
|
||||
|
||||
func (opt *ClusterOptions) init() {
|
||||
@@ -180,6 +185,10 @@ func (opt *ClusterOptions) init() {
|
||||
if opt.NewClient == nil {
|
||||
opt.NewClient = NewClient
|
||||
}
|
||||
|
||||
if opt.FailingTimeoutSeconds == 0 {
|
||||
opt.FailingTimeoutSeconds = 15
|
||||
}
|
||||
}
|
||||
|
||||
// ParseClusterURL parses a URL into ClusterOptions that can be used to connect to Redis.
|
||||
@@ -284,6 +293,7 @@ func setupClusterQueryParams(u *url.URL, o *ClusterOptions) (*ClusterOptions, er
|
||||
o.PoolTimeout = q.duration("pool_timeout")
|
||||
o.ConnMaxLifetime = q.duration("conn_max_lifetime")
|
||||
o.ConnMaxIdleTime = q.duration("conn_max_idle_time")
|
||||
o.FailingTimeoutSeconds = q.int("failing_timeout_seconds")
|
||||
|
||||
if q.err != nil {
|
||||
return nil, q.err
|
||||
@@ -330,20 +340,21 @@ func (opt *ClusterOptions) clientOptions() *Options {
|
||||
WriteTimeout: opt.WriteTimeout,
|
||||
ContextTimeoutEnabled: opt.ContextTimeoutEnabled,
|
||||
|
||||
PoolFIFO: opt.PoolFIFO,
|
||||
PoolSize: opt.PoolSize,
|
||||
PoolTimeout: opt.PoolTimeout,
|
||||
MinIdleConns: opt.MinIdleConns,
|
||||
MaxIdleConns: opt.MaxIdleConns,
|
||||
MaxActiveConns: opt.MaxActiveConns,
|
||||
ConnMaxIdleTime: opt.ConnMaxIdleTime,
|
||||
ConnMaxLifetime: opt.ConnMaxLifetime,
|
||||
ReadBufferSize: opt.ReadBufferSize,
|
||||
WriteBufferSize: opt.WriteBufferSize,
|
||||
DisableIdentity: opt.DisableIdentity,
|
||||
DisableIndentity: opt.DisableIdentity,
|
||||
IdentitySuffix: opt.IdentitySuffix,
|
||||
TLSConfig: opt.TLSConfig,
|
||||
PoolFIFO: opt.PoolFIFO,
|
||||
PoolSize: opt.PoolSize,
|
||||
PoolTimeout: opt.PoolTimeout,
|
||||
MinIdleConns: opt.MinIdleConns,
|
||||
MaxIdleConns: opt.MaxIdleConns,
|
||||
MaxActiveConns: opt.MaxActiveConns,
|
||||
ConnMaxIdleTime: opt.ConnMaxIdleTime,
|
||||
ConnMaxLifetime: opt.ConnMaxLifetime,
|
||||
ReadBufferSize: opt.ReadBufferSize,
|
||||
WriteBufferSize: opt.WriteBufferSize,
|
||||
DisableIdentity: opt.DisableIdentity,
|
||||
DisableIndentity: opt.DisableIdentity,
|
||||
IdentitySuffix: opt.IdentitySuffix,
|
||||
FailingTimeoutSeconds: opt.FailingTimeoutSeconds,
|
||||
TLSConfig: opt.TLSConfig,
|
||||
// If ClusterSlots is populated, then we probably have an artificial
|
||||
// cluster whose nodes are not in clustering mode (otherwise there isn't
|
||||
// much use for ClusterSlots config). This means we cannot execute the
|
||||
@@ -432,7 +443,7 @@ func (n *clusterNode) MarkAsFailing() {
|
||||
}
|
||||
|
||||
func (n *clusterNode) Failing() bool {
|
||||
const timeout = 15 // 15 seconds
|
||||
timeout := int64(n.Client.opt.FailingTimeoutSeconds)
|
||||
|
||||
failing := atomic.LoadUint32(&n.failing)
|
||||
if failing == 0 {
|
||||
|
||||
@@ -1665,6 +1665,10 @@ var _ = Describe("ClusterClient ParseURL", func() {
|
||||
test: "UseDefault",
|
||||
url: "redis://localhost:123?conn_max_idle_time=",
|
||||
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, ConnMaxIdleTime: 0},
|
||||
}, {
|
||||
test: "FailingTimeoutSeconds",
|
||||
url: "redis://localhost:123?failing_timeout_seconds=25",
|
||||
o: &redis.ClusterOptions{Addrs: []string{"localhost:123"}, FailingTimeoutSeconds: 25},
|
||||
}, {
|
||||
test: "Protocol",
|
||||
url: "redis://localhost:123?protocol=2",
|
||||
@@ -1729,7 +1733,79 @@ var _ = Describe("ClusterClient ParseURL", func() {
|
||||
Expect(tc.o.ConnMaxLifetime).To(Equal(actual.ConnMaxLifetime))
|
||||
Expect(tc.o.ConnMaxIdleTime).To(Equal(actual.ConnMaxIdleTime))
|
||||
Expect(tc.o.PoolTimeout).To(Equal(actual.PoolTimeout))
|
||||
Expect(tc.o.FailingTimeoutSeconds).To(Equal(actual.FailingTimeoutSeconds))
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
var _ = Describe("ClusterClient FailingTimeoutSeconds", func() {
|
||||
var client *redis.ClusterClient
|
||||
|
||||
AfterEach(func() {
|
||||
if client != nil {
|
||||
_ = client.Close()
|
||||
}
|
||||
})
|
||||
|
||||
It("should use default failing timeout of 15 seconds", func() {
|
||||
opt := redisClusterOptions()
|
||||
client = cluster.newClusterClient(ctx, opt)
|
||||
|
||||
// Default should be 15 seconds
|
||||
Expect(opt.FailingTimeoutSeconds).To(Equal(15))
|
||||
})
|
||||
|
||||
It("should use custom failing timeout", func() {
|
||||
opt := redisClusterOptions()
|
||||
opt.FailingTimeoutSeconds = 30
|
||||
client = cluster.newClusterClient(ctx, opt)
|
||||
|
||||
// Should use custom value
|
||||
Expect(opt.FailingTimeoutSeconds).To(Equal(30))
|
||||
})
|
||||
|
||||
It("should parse failing_timeout_seconds from URL", func() {
|
||||
url := "redis://localhost:16600?failing_timeout_seconds=25"
|
||||
opt, err := redis.ParseClusterURL(url)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(opt.FailingTimeoutSeconds).To(Equal(25))
|
||||
})
|
||||
|
||||
It("should handle node failing timeout correctly", func() {
|
||||
opt := redisClusterOptions()
|
||||
opt.FailingTimeoutSeconds = 2 // Short timeout for testing
|
||||
client = cluster.newClusterClient(ctx, opt)
|
||||
|
||||
// Get a node and mark it as failing
|
||||
nodes, err := client.Nodes(ctx, "A")
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
Expect(len(nodes)).To(BeNumerically(">", 0))
|
||||
|
||||
node := nodes[0]
|
||||
|
||||
// Initially not failing
|
||||
Expect(node.Failing()).To(BeFalse())
|
||||
|
||||
// Mark as failing
|
||||
node.MarkAsFailing()
|
||||
Expect(node.Failing()).To(BeTrue())
|
||||
|
||||
// Should still be failing after 1 second (less than timeout)
|
||||
time.Sleep(1 * time.Second)
|
||||
Expect(node.Failing()).To(BeTrue())
|
||||
|
||||
// Should not be failing after timeout expires
|
||||
time.Sleep(2 * time.Second) // Total 3 seconds > 2 second timeout
|
||||
Expect(node.Failing()).To(BeFalse())
|
||||
})
|
||||
|
||||
It("should handle zero timeout by using default", func() {
|
||||
opt := redisClusterOptions()
|
||||
opt.FailingTimeoutSeconds = 0 // Should use default
|
||||
client = cluster.newClusterClient(ctx, opt)
|
||||
|
||||
// After initialization, should be set to default
|
||||
Expect(opt.FailingTimeoutSeconds).To(Equal(15))
|
||||
})
|
||||
})
|
||||
|
||||
16
sentinel.go
16
sentinel.go
@@ -129,7 +129,13 @@ type FailoverOptions struct {
|
||||
DisableIdentity bool
|
||||
|
||||
IdentitySuffix string
|
||||
UnstableResp3 bool
|
||||
|
||||
// FailingTimeoutSeconds is the timeout in seconds for marking a cluster node as failing.
|
||||
// When a node is marked as failing, it will be avoided for this duration.
|
||||
// Only applies to failover cluster clients. Default is 15 seconds.
|
||||
FailingTimeoutSeconds int
|
||||
|
||||
UnstableResp3 bool
|
||||
}
|
||||
|
||||
func (opt *FailoverOptions) clientOptions() *Options {
|
||||
@@ -263,10 +269,10 @@ func (opt *FailoverOptions) clusterOptions() *ClusterOptions {
|
||||
|
||||
TLSConfig: opt.TLSConfig,
|
||||
|
||||
DisableIdentity: opt.DisableIdentity,
|
||||
DisableIndentity: opt.DisableIndentity,
|
||||
|
||||
IdentitySuffix: opt.IdentitySuffix,
|
||||
DisableIdentity: opt.DisableIdentity,
|
||||
DisableIndentity: opt.DisableIndentity,
|
||||
IdentitySuffix: opt.IdentitySuffix,
|
||||
FailingTimeoutSeconds: opt.FailingTimeoutSeconds,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
17
universal.go
17
universal.go
@@ -98,7 +98,13 @@ type UniversalOptions struct {
|
||||
DisableIdentity bool
|
||||
|
||||
IdentitySuffix string
|
||||
UnstableResp3 bool
|
||||
|
||||
// FailingTimeoutSeconds is the timeout in seconds for marking a cluster node as failing.
|
||||
// When a node is marked as failing, it will be avoided for this duration.
|
||||
// Only applies to cluster clients. Default is 15 seconds.
|
||||
FailingTimeoutSeconds int
|
||||
|
||||
UnstableResp3 bool
|
||||
|
||||
// IsClusterMode can be used when only one Addrs is provided (e.g. Elasticache supports setting up cluster mode with configuration endpoint).
|
||||
IsClusterMode bool
|
||||
@@ -149,10 +155,11 @@ func (o *UniversalOptions) Cluster() *ClusterOptions {
|
||||
|
||||
TLSConfig: o.TLSConfig,
|
||||
|
||||
DisableIdentity: o.DisableIdentity,
|
||||
DisableIndentity: o.DisableIndentity,
|
||||
IdentitySuffix: o.IdentitySuffix,
|
||||
UnstableResp3: o.UnstableResp3,
|
||||
DisableIdentity: o.DisableIdentity,
|
||||
DisableIndentity: o.DisableIndentity,
|
||||
IdentitySuffix: o.IdentitySuffix,
|
||||
FailingTimeoutSeconds: o.FailingTimeoutSeconds,
|
||||
UnstableResp3: o.UnstableResp3,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user