From e0853aba634dd9fb50a55919c2442ffe7d382013 Mon Sep 17 00:00:00 2001 From: Hristo Temelski Date: Tue, 9 Sep 2025 18:10:17 +0300 Subject: [PATCH] Added batch process method to the pipeline (#3510) * Added batch process method to the pipeline * Added Process and BatchProcess tests * Fix test matching --- pipeline.go | 12 ++++++++++-- pipeline_test.go | 19 +++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/pipeline.go b/pipeline.go index dbbced50..567bf121 100644 --- a/pipeline.go +++ b/pipeline.go @@ -30,9 +30,12 @@ type Pipeliner interface { // If a certain Redis command is not yet supported, you can use Do to execute it. Do(ctx context.Context, args ...interface{}) *Cmd - // Process puts the commands to be executed into the pipeline buffer. + // Process queues the cmd for later execution. Process(ctx context.Context, cmd Cmder) error + // BatchProcess adds multiple commands to be executed into the pipeline buffer. + BatchProcess(ctx context.Context, cmd ...Cmder) error + // Discard discards all commands in the pipeline buffer that have not yet been executed. Discard() @@ -79,7 +82,12 @@ func (c *Pipeline) Do(ctx context.Context, args ...interface{}) *Cmd { // Process queues the cmd for later execution. func (c *Pipeline) Process(ctx context.Context, cmd Cmder) error { - c.cmds = append(c.cmds, cmd) + return c.BatchProcess(ctx, cmd) +} + +// BatchProcess queues multiple cmds for later execution. +func (c *Pipeline) BatchProcess(ctx context.Context, cmd ...Cmder) error { + c.cmds = append(c.cmds, cmd...) return nil } diff --git a/pipeline_test.go b/pipeline_test.go index d32ab35b..15eacb3d 100644 --- a/pipeline_test.go +++ b/pipeline_test.go @@ -114,6 +114,25 @@ var _ = Describe("pipelining", func() { err := pipe.Do(ctx).Err() Expect(err).To(Equal(errors.New("redis: please enter the command to be executed"))) }) + + It("should process", func() { + err := pipe.Process(ctx, redis.NewCmd(ctx, "asking")) + Expect(err).To(BeNil()) + Expect(pipe.Cmds()).To(HaveLen(1)) + }) + + It("should batchProcess", func() { + err := pipe.BatchProcess(ctx, redis.NewCmd(ctx, "asking")) + Expect(err).To(BeNil()) + Expect(pipe.Cmds()).To(HaveLen(1)) + + pipe.Discard() + Expect(pipe.Cmds()).To(HaveLen(0)) + + err = pipe.BatchProcess(ctx, redis.NewCmd(ctx, "asking"), redis.NewCmd(ctx, "set", "key", "value")) + Expect(err).To(BeNil()) + Expect(pipe.Cmds()).To(HaveLen(2)) + }) } Describe("Pipeline", func() {