1
0
mirror of https://github.com/redis/go-redis.git synced 2025-07-31 05:04:23 +03:00

Add PubSub.ReceiveMessage.

This commit is contained in:
Vladimir Mihailenco
2015-09-06 13:50:16 +03:00
parent 1e9f53a8e7
commit 9987f2abaa
6 changed files with 239 additions and 58 deletions

View File

@ -12,10 +12,12 @@ import (
var _ = Describe("PubSub", func() {
var client *redis.Client
readTimeout := 3 * time.Second
BeforeEach(func() {
client = redis.NewClient(&redis.Options{
Addr: redisAddr,
Addr: redisAddr,
ReadTimeout: readTimeout,
})
Expect(client.FlushDb().Err()).NotTo(HaveOccurred())
})
@ -227,4 +229,51 @@ var _ = Describe("PubSub", func() {
Expect(pong.Payload).To(Equal("hello"))
})
It("should ReceiveMessage", func() {
pubsub, err := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close()
go func() {
defer GinkgoRecover()
time.Sleep(readTimeout + 100*time.Millisecond)
n, err := client.Publish("mychannel", "hello").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(1)))
}()
msg, err := pubsub.ReceiveMessage()
Expect(err).NotTo(HaveOccurred())
Expect(msg.Channel).To(Equal("mychannel"))
Expect(msg.Payload).To(Equal("hello"))
})
It("should reconnect on ReceiveMessage error", func() {
pubsub, err := client.Subscribe("mychannel")
Expect(err).NotTo(HaveOccurred())
defer pubsub.Close()
cn, err := pubsub.Pool().Get()
Expect(err).NotTo(HaveOccurred())
cn.SetNetConn(&badConn{
readErr: errTimeout,
writeErr: errTimeout,
})
go func() {
defer GinkgoRecover()
time.Sleep(100 * time.Millisecond)
n, err := client.Publish("mychannel", "hello").Result()
Expect(err).NotTo(HaveOccurred())
Expect(n).To(Equal(int64(2)))
}()
msg, err := pubsub.ReceiveMessage()
Expect(err).NotTo(HaveOccurred())
Expect(msg.Channel).To(Equal("mychannel"))
Expect(msg.Payload).To(Equal("hello"))
})
})