From b6fdeca648a903381216cd85e9e481d881d2ee5b Mon Sep 17 00:00:00 2001 From: Vladimir Mihailenco Date: Tue, 12 Mar 2019 12:48:32 +0200 Subject: [PATCH] Add PubSub.ChannelSize --- pubsub.go | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/pubsub.go b/pubsub.go index e1855119..f2a03bc3 100644 --- a/pubsub.go +++ b/pubsub.go @@ -400,14 +400,30 @@ func (c *PubSub) ReceiveMessage() (*Message, error) { // // If the Go channel is full for 30 seconds the message is dropped. func (c *PubSub) Channel() <-chan *Message { - c.chOnce.Do(c.initChannel) + return c.channel(100) +} + +// ChannelSize is like Channel, but creates a Go channel +// with specified buffer size. +func (c *PubSub) ChannelSize(size int) <-chan *Message { + return c.channel(size) +} + +func (c *PubSub) channel(size int) <-chan *Message { + c.chOnce.Do(func() { + c.initChannel(size) + }) + if cap(c.ch) != size { + err := fmt.Errorf("redis: PubSub.Channel is called with different buffer size") + panic(err) + } return c.ch } -func (c *PubSub) initChannel() { +func (c *PubSub) initChannel(size int) { const timeout = 30 * time.Second - c.ch = make(chan *Message, 100) + c.ch = make(chan *Message, size) c.ping = make(chan struct{}, 1) go func() {