1
0
mirror of https://github.com/redis/go-redis.git synced 2025-12-02 06:22:31 +03:00
This commit is contained in:
Nedyalko Dyakov
2025-11-20 11:01:57 +02:00
parent 9b7f1be092
commit 212095540d
2 changed files with 144 additions and 61 deletions

View File

@@ -1037,7 +1037,6 @@ func NewClusterClient(opt *ClusterOptions) *ClusterClient {
pipeline: c.processPipeline, pipeline: c.processPipeline,
txPipeline: c.processTxPipeline, txPipeline: c.processTxPipeline,
}) })
return c return c
} }
@@ -1046,6 +1045,24 @@ func (c *ClusterClient) Options() *ClusterOptions {
return c.opt return c.opt
} }
// AddHook adds a hook to the client.
func (c *ClusterClient) AddHook(h Hook) {
// Add hook only to nodes, not to the cluster client itself.
// This prevents hooks from being called twice (once at cluster level, once at node level).
// The cluster client delegates all commands to nodes, so hooks on nodes will be called.
if err := c.ForEachShard(context.Background(), func(ctx context.Context, node *Client) error {
node.AddHook(h)
return nil
}); err != nil {
return
}
c.nodes.OnNewNode(func(rdb *Client) {
rdb.AddHook(h)
})
}
// ReloadState reloads cluster state. If available it calls ClusterSlots func // ReloadState reloads cluster state. If available it calls ClusterSlots func
// to get cluster slots information. // to get cluster slots information.
func (c *ClusterClient) ReloadState(ctx context.Context) { func (c *ClusterClient) ReloadState(ctx context.Context) {

View File

@@ -1188,7 +1188,9 @@ var _ = Describe("ClusterClient", func() {
var stack []string var stack []string
clusterHook := &hook{ // AddHook now only adds to nodes, not to cluster client itself
// This prevents hooks from being called twice
firstHook := &hook{
processHook: func(hook redis.ProcessHook) redis.ProcessHook { processHook: func(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error { return func(ctx context.Context, cmd redis.Cmder) error {
select { select {
@@ -1198,20 +1200,20 @@ var _ = Describe("ClusterClient", func() {
} }
Expect(cmd.String()).To(Equal("ping: ")) Expect(cmd.String()).To(Equal("ping: "))
stack = append(stack, "cluster.BeforeProcess") stack = append(stack, "hook1.BeforeProcess")
err := hook(ctx, cmd) err := hook(ctx, cmd)
Expect(cmd.String()).To(Equal("ping: PONG")) Expect(cmd.String()).To(Equal("ping: PONG"))
stack = append(stack, "cluster.AfterProcess") stack = append(stack, "hook1.AfterProcess")
return err return err
} }
}, },
} }
client.AddHook(clusterHook) client.AddHook(firstHook)
nodeHook := &hook{ secondHook := &hook{
processHook: func(hook redis.ProcessHook) redis.ProcessHook { processHook: func(hook redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error { return func(ctx context.Context, cmd redis.Cmder) error {
select { select {
@@ -1221,30 +1223,27 @@ var _ = Describe("ClusterClient", func() {
} }
Expect(cmd.String()).To(Equal("ping: ")) Expect(cmd.String()).To(Equal("ping: "))
stack = append(stack, "shard.BeforeProcess") stack = append(stack, "hook2.BeforeProcess")
err := hook(ctx, cmd) err := hook(ctx, cmd)
Expect(cmd.String()).To(Equal("ping: PONG")) Expect(cmd.String()).To(Equal("ping: PONG"))
stack = append(stack, "shard.AfterProcess") stack = append(stack, "hook2.AfterProcess")
return err return err
} }
}, },
} }
client.AddHook(secondHook)
_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
node.AddHook(nodeHook)
return nil
})
err = client.Ping(ctx).Err() err = client.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
// Both hooks should be called in FIFO order on the node
Expect(stack).To(Equal([]string{ Expect(stack).To(Equal([]string{
"cluster.BeforeProcess", "hook1.BeforeProcess",
"shard.BeforeProcess", "hook2.BeforeProcess",
"shard.AfterProcess", "hook2.AfterProcess",
"cluster.AfterProcess", "hook1.AfterProcess",
})) }))
}) })
@@ -1259,43 +1258,41 @@ var _ = Describe("ClusterClient", func() {
var stack []string var stack []string
// AddHook now only adds to nodes, not to cluster client itself
client.AddHook(&hook{ client.AddHook(&hook{
processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook { processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error { return func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(1)) Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("ping: ")) Expect(cmds[0].String()).To(Equal("ping: "))
stack = append(stack, "cluster.BeforeProcessPipeline") stack = append(stack, "hook1.BeforeProcessPipeline")
err := hook(ctx, cmds) err := hook(ctx, cmds)
Expect(cmds).To(HaveLen(1)) Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("ping: PONG")) Expect(cmds[0].String()).To(Equal("ping: PONG"))
stack = append(stack, "cluster.AfterProcessPipeline") stack = append(stack, "hook1.AfterProcessPipeline")
return err return err
} }
}, },
}) })
_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error { client.AddHook(&hook{
node.AddHook(&hook{ processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook { return func(ctx context.Context, cmds []redis.Cmder) error {
return func(ctx context.Context, cmds []redis.Cmder) error { Expect(cmds).To(HaveLen(1))
Expect(cmds).To(HaveLen(1)) Expect(cmds[0].String()).To(Equal("ping: "))
Expect(cmds[0].String()).To(Equal("ping: ")) stack = append(stack, "hook2.BeforeProcessPipeline")
stack = append(stack, "shard.BeforeProcessPipeline")
err := hook(ctx, cmds) err := hook(ctx, cmds)
Expect(cmds).To(HaveLen(1)) Expect(cmds).To(HaveLen(1))
Expect(cmds[0].String()).To(Equal("ping: PONG")) Expect(cmds[0].String()).To(Equal("ping: PONG"))
stack = append(stack, "shard.AfterProcessPipeline") stack = append(stack, "hook2.AfterProcessPipeline")
return err return err
} }
}, },
})
return nil
}) })
_, err = client.Pipelined(ctx, func(pipe redis.Pipeliner) error { _, err = client.Pipelined(ctx, func(pipe redis.Pipeliner) error {
@@ -1303,11 +1300,12 @@ var _ = Describe("ClusterClient", func() {
return nil return nil
}) })
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
// Both hooks should be called in FIFO order on the node
Expect(stack).To(Equal([]string{ Expect(stack).To(Equal([]string{
"cluster.BeforeProcessPipeline", "hook1.BeforeProcessPipeline",
"shard.BeforeProcessPipeline", "hook2.BeforeProcessPipeline",
"shard.AfterProcessPipeline", "hook2.AfterProcessPipeline",
"cluster.AfterProcessPipeline", "hook1.AfterProcessPipeline",
})) }))
}) })
@@ -1322,43 +1320,41 @@ var _ = Describe("ClusterClient", func() {
var stack []string var stack []string
// AddHook now only adds to nodes, not to cluster client itself
client.AddHook(&hook{ client.AddHook(&hook{
processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook { processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
return func(ctx context.Context, cmds []redis.Cmder) error { return func(ctx context.Context, cmds []redis.Cmder) error {
Expect(cmds).To(HaveLen(3)) Expect(cmds).To(HaveLen(3))
Expect(cmds[1].String()).To(Equal("ping: ")) Expect(cmds[1].String()).To(Equal("ping: "))
stack = append(stack, "cluster.BeforeProcessPipeline") stack = append(stack, "hook1.BeforeProcessPipeline")
err := hook(ctx, cmds) err := hook(ctx, cmds)
Expect(cmds).To(HaveLen(3)) Expect(cmds).To(HaveLen(3))
Expect(cmds[1].String()).To(Equal("ping: PONG")) Expect(cmds[1].String()).To(Equal("ping: PONG"))
stack = append(stack, "cluster.AfterProcessPipeline") stack = append(stack, "hook1.AfterProcessPipeline")
return err return err
} }
}, },
}) })
_ = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error { client.AddHook(&hook{
node.AddHook(&hook{ processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook {
processPipelineHook: func(hook redis.ProcessPipelineHook) redis.ProcessPipelineHook { return func(ctx context.Context, cmds []redis.Cmder) error {
return func(ctx context.Context, cmds []redis.Cmder) error { Expect(cmds).To(HaveLen(3))
Expect(cmds).To(HaveLen(3)) Expect(cmds[1].String()).To(Equal("ping: "))
Expect(cmds[1].String()).To(Equal("ping: ")) stack = append(stack, "hook2.BeforeProcessPipeline")
stack = append(stack, "shard.BeforeProcessPipeline")
err := hook(ctx, cmds) err := hook(ctx, cmds)
Expect(cmds).To(HaveLen(3)) Expect(cmds).To(HaveLen(3))
Expect(cmds[1].String()).To(Equal("ping: PONG")) Expect(cmds[1].String()).To(Equal("ping: PONG"))
stack = append(stack, "shard.AfterProcessPipeline") stack = append(stack, "hook2.AfterProcessPipeline")
return err return err
} }
}, },
})
return nil
}) })
_, err = client.TxPipelined(ctx, func(pipe redis.Pipeliner) error { _, err = client.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
@@ -1366,14 +1362,84 @@ var _ = Describe("ClusterClient", func() {
return nil return nil
}) })
Expect(err).NotTo(HaveOccurred()) Expect(err).NotTo(HaveOccurred())
// Both hooks should be called in FIFO order on the node
Expect(stack).To(Equal([]string{ Expect(stack).To(Equal([]string{
"cluster.BeforeProcessPipeline", "hook1.BeforeProcessPipeline",
"shard.BeforeProcessPipeline", "hook2.BeforeProcessPipeline",
"shard.AfterProcessPipeline", "hook2.AfterProcessPipeline",
"cluster.AfterProcessPipeline", "hook1.AfterProcessPipeline",
})) }))
}) })
It("passes hooks to cluster nodes", func() {
// Ensure cluster is initialized
err := client.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred())
err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
return node.Ping(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
// Track hook calls to detect if hooks are called multiple times
var mu sync.Mutex
var hookCallCount int
// Create a hook that counts how many times it's called
testHook := &hook{
processHook: func(next redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
// Only track PING commands to avoid noise from other operations
if cmd.Name() == "ping" {
mu.Lock()
hookCallCount++
mu.Unlock()
}
return next(ctx, cmd)
}
},
}
// Add hook to cluster client - this should propagate to all existing nodes
client.AddHook(testHook)
// Reset counter before test
mu.Lock()
hookCallCount = 0
mu.Unlock()
// Execute a single PING command through the cluster client
// This should call the hook ONLY ONCE, not twice (cluster + node)
err = client.Ping(ctx).Err()
Expect(err).NotTo(HaveOccurred())
mu.Lock()
clusterPingCalls := hookCallCount
mu.Unlock()
// Reset counter
mu.Lock()
hookCallCount = 0
mu.Unlock()
// Execute a PING command directly on a node
// This should call the hook ONLY ONCE
err = client.ForEachShard(ctx, func(ctx context.Context, node *redis.Client) error {
// Just test one node
return node.Ping(ctx).Err()
})
Expect(err).NotTo(HaveOccurred())
mu.Lock()
nodePingCalls := hookCallCount
mu.Unlock()
// Verify hook is called exactly once per command, not twice
// If hooks are called twice (cluster + node), this will fail
Expect(clusterPingCalls).To(Equal(1), "Hook should be called exactly once for cluster.Ping(), not twice")
Expect(nodePingCalls).To(BeNumerically(">=", 1), "Hook should be called at least once for node.Ping()")
})
It("should return correct replica for key", func() { It("should return correct replica for key", func() {
client, err := client.SlaveForKey(ctx, "test") client, err := client.SlaveForKey(ctx, "test")
Expect(err).ToNot(HaveOccurred()) Expect(err).ToNot(HaveOccurred())