diff --git a/commands.go b/commands.go index c0358001..e9fd0f2e 100644 --- a/commands.go +++ b/commands.go @@ -253,6 +253,7 @@ var ( _ Cmdable = (*Tx)(nil) _ Cmdable = (*Ring)(nil) _ Cmdable = (*ClusterClient)(nil) + _ Cmdable = (*Pipeline)(nil) ) type cmdable func(ctx context.Context, cmd Cmder) error diff --git a/pipeline.go b/pipeline.go index 1c114205..dbbced50 100644 --- a/pipeline.go +++ b/pipeline.go @@ -7,7 +7,7 @@ import ( type pipelineExecer func(context.Context, []Cmder) error -// Pipeliner is an mechanism to realise Redis Pipeline technique. +// Pipeliner is a mechanism to realise Redis Pipeline technique. // // Pipelining is a technique to extremely speed up processing by packing // operations to batches, send them at once to Redis and read a replies in a @@ -23,21 +23,24 @@ type pipelineExecer func(context.Context, []Cmder) error type Pipeliner interface { StatefulCmdable - // Len is to obtain the number of commands in the pipeline that have not yet been executed. + // Len obtains the number of commands in the pipeline that have not yet been executed. Len() int // Do is an API for executing any command. // If a certain Redis command is not yet supported, you can use Do to execute it. Do(ctx context.Context, args ...interface{}) *Cmd - // Process is to put the commands to be executed into the pipeline buffer. + // Process puts the commands to be executed into the pipeline buffer. Process(ctx context.Context, cmd Cmder) error - // Discard is to discard all commands in the cache that have not yet been executed. + // Discard discards all commands in the pipeline buffer that have not yet been executed. Discard() - // Exec is to send all the commands buffered in the pipeline to the redis-server. + // Exec sends all the commands buffered in the pipeline to the redis server. Exec(ctx context.Context) ([]Cmder, error) + + // Cmds returns the list of queued commands. + Cmds() []Cmder } var _ Pipeliner = (*Pipeline)(nil) @@ -119,3 +122,7 @@ func (c *Pipeline) TxPipelined(ctx context.Context, fn func(Pipeliner) error) ([ func (c *Pipeline) TxPipeline() Pipeliner { return c } + +func (c *Pipeline) Cmds() []Cmder { + return c.cmds +} diff --git a/pipeline_test.go b/pipeline_test.go index 7f734472..d32ab35b 100644 --- a/pipeline_test.go +++ b/pipeline_test.go @@ -36,6 +36,30 @@ var _ = Describe("pipelining", func() { Expect(get.Val()).To(Equal("")) }) + It("exports queued commands", func() { + p := client.Pipeline() + cmds := p.Cmds() + Expect(cmds).To(BeEmpty()) + + p.Set(ctx, "foo", "bar", 0) + p.Get(ctx, "foo") + cmds = p.Cmds() + Expect(cmds).To(HaveLen(p.Len())) + Expect(cmds[0].Name()).To(Equal("set")) + Expect(cmds[1].Name()).To(Equal("get")) + + cmds, err := p.Exec(ctx) + Expect(err).NotTo(HaveOccurred()) + Expect(cmds).To(HaveLen(2)) + Expect(cmds[0].Name()).To(Equal("set")) + Expect(cmds[0].(*redis.StatusCmd).Val()).To(Equal("OK")) + Expect(cmds[1].Name()).To(Equal("get")) + Expect(cmds[1].(*redis.StringCmd).Val()).To(Equal("bar")) + + cmds = p.Cmds() + Expect(cmds).To(BeEmpty()) + }) + assertPipeline := func() { It("returns no errors when there are no commands", func() { _, err := pipe.Exec(ctx)