mirror of
				https://github.com/redis/go-redis.git
				synced 2025-10-29 05:29:22 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			440 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			440 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package redis_test
 | |
| 
 | |
| import (
 | |
| 	"io"
 | |
| 	"net"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/go-redis/redis"
 | |
| 
 | |
| 	. "github.com/onsi/ginkgo"
 | |
| 	. "github.com/onsi/gomega"
 | |
| )
 | |
| 
 | |
| var _ = Describe("PubSub", func() {
 | |
| 	var client *redis.Client
 | |
| 
 | |
| 	BeforeEach(func() {
 | |
| 		opt := redisOptions()
 | |
| 		opt.MinIdleConns = 0
 | |
| 		opt.MaxConnAge = 0
 | |
| 		client = redis.NewClient(opt)
 | |
| 		Expect(client.FlushDB().Err()).NotTo(HaveOccurred())
 | |
| 	})
 | |
| 
 | |
| 	AfterEach(func() {
 | |
| 		Expect(client.Close()).NotTo(HaveOccurred())
 | |
| 	})
 | |
| 
 | |
| 	It("should support pattern matching", func() {
 | |
| 		pubsub := client.PSubscribe("mychannel*")
 | |
| 		defer pubsub.Close()
 | |
| 
 | |
| 		{
 | |
| 			msgi, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 			Expect(err).NotTo(HaveOccurred())
 | |
| 			subscr := msgi.(*redis.Subscription)
 | |
| 			Expect(subscr.Kind).To(Equal("psubscribe"))
 | |
| 			Expect(subscr.Channel).To(Equal("mychannel*"))
 | |
| 			Expect(subscr.Count).To(Equal(1))
 | |
| 		}
 | |
| 
 | |
| 		{
 | |
| 			msgi, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 			Expect(err.(net.Error).Timeout()).To(Equal(true))
 | |
| 			Expect(msgi).To(BeNil())
 | |
| 		}
 | |
| 
 | |
| 		n, err := client.Publish("mychannel1", "hello").Result()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(n).To(Equal(int64(1)))
 | |
| 
 | |
| 		Expect(pubsub.PUnsubscribe("mychannel*")).NotTo(HaveOccurred())
 | |
| 
 | |
| 		{
 | |
| 			msgi, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 			Expect(err).NotTo(HaveOccurred())
 | |
| 			subscr := msgi.(*redis.Message)
 | |
| 			Expect(subscr.Channel).To(Equal("mychannel1"))
 | |
| 			Expect(subscr.Pattern).To(Equal("mychannel*"))
 | |
| 			Expect(subscr.Payload).To(Equal("hello"))
 | |
| 		}
 | |
| 
 | |
| 		{
 | |
| 			msgi, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 			Expect(err).NotTo(HaveOccurred())
 | |
| 			subscr := msgi.(*redis.Subscription)
 | |
| 			Expect(subscr.Kind).To(Equal("punsubscribe"))
 | |
| 			Expect(subscr.Channel).To(Equal("mychannel*"))
 | |
| 			Expect(subscr.Count).To(Equal(0))
 | |
| 		}
 | |
| 
 | |
| 		stats := client.PoolStats()
 | |
| 		Expect(stats.Misses).To(Equal(uint32(2)))
 | |
| 	})
 | |
| 
 | |
| 	It("should pub/sub channels", func() {
 | |
| 		channels, err := client.PubSubChannels("mychannel*").Result()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(channels).To(BeEmpty())
 | |
| 
 | |
| 		pubsub := client.Subscribe("mychannel", "mychannel2")
 | |
| 		defer pubsub.Close()
 | |
| 
 | |
| 		channels, err = client.PubSubChannels("mychannel*").Result()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(channels).To(ConsistOf([]string{"mychannel", "mychannel2"}))
 | |
| 
 | |
| 		channels, err = client.PubSubChannels("").Result()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(channels).To(BeEmpty())
 | |
| 
 | |
| 		channels, err = client.PubSubChannels("*").Result()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(len(channels)).To(BeNumerically(">=", 2))
 | |
| 	})
 | |
| 
 | |
| 	It("should return the numbers of subscribers", func() {
 | |
| 		pubsub := client.Subscribe("mychannel", "mychannel2")
 | |
| 		defer pubsub.Close()
 | |
| 
 | |
| 		channels, err := client.PubSubNumSub("mychannel", "mychannel2", "mychannel3").Result()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(channels).To(Equal(map[string]int64{
 | |
| 			"mychannel":  1,
 | |
| 			"mychannel2": 1,
 | |
| 			"mychannel3": 0,
 | |
| 		}))
 | |
| 	})
 | |
| 
 | |
| 	It("should return the numbers of subscribers by pattern", func() {
 | |
| 		num, err := client.PubSubNumPat().Result()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(num).To(Equal(int64(0)))
 | |
| 
 | |
| 		pubsub := client.PSubscribe("*")
 | |
| 		defer pubsub.Close()
 | |
| 
 | |
| 		num, err = client.PubSubNumPat().Result()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(num).To(Equal(int64(1)))
 | |
| 	})
 | |
| 
 | |
| 	It("should pub/sub", func() {
 | |
| 		pubsub := client.Subscribe("mychannel", "mychannel2")
 | |
| 		defer pubsub.Close()
 | |
| 
 | |
| 		{
 | |
| 			msgi, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 			Expect(err).NotTo(HaveOccurred())
 | |
| 			subscr := msgi.(*redis.Subscription)
 | |
| 			Expect(subscr.Kind).To(Equal("subscribe"))
 | |
| 			Expect(subscr.Channel).To(Equal("mychannel"))
 | |
| 			Expect(subscr.Count).To(Equal(1))
 | |
| 		}
 | |
| 
 | |
| 		{
 | |
| 			msgi, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 			Expect(err).NotTo(HaveOccurred())
 | |
| 			subscr := msgi.(*redis.Subscription)
 | |
| 			Expect(subscr.Kind).To(Equal("subscribe"))
 | |
| 			Expect(subscr.Channel).To(Equal("mychannel2"))
 | |
| 			Expect(subscr.Count).To(Equal(2))
 | |
| 		}
 | |
| 
 | |
| 		{
 | |
| 			msgi, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 			Expect(err.(net.Error).Timeout()).To(Equal(true))
 | |
| 			Expect(msgi).NotTo(HaveOccurred())
 | |
| 		}
 | |
| 
 | |
| 		n, err := client.Publish("mychannel", "hello").Result()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(n).To(Equal(int64(1)))
 | |
| 
 | |
| 		n, err = client.Publish("mychannel2", "hello2").Result()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(n).To(Equal(int64(1)))
 | |
| 
 | |
| 		Expect(pubsub.Unsubscribe("mychannel", "mychannel2")).NotTo(HaveOccurred())
 | |
| 
 | |
| 		{
 | |
| 			msgi, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 			Expect(err).NotTo(HaveOccurred())
 | |
| 			msg := msgi.(*redis.Message)
 | |
| 			Expect(msg.Channel).To(Equal("mychannel"))
 | |
| 			Expect(msg.Payload).To(Equal("hello"))
 | |
| 		}
 | |
| 
 | |
| 		{
 | |
| 			msgi, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 			Expect(err).NotTo(HaveOccurred())
 | |
| 			msg := msgi.(*redis.Message)
 | |
| 			Expect(msg.Channel).To(Equal("mychannel2"))
 | |
| 			Expect(msg.Payload).To(Equal("hello2"))
 | |
| 		}
 | |
| 
 | |
| 		{
 | |
| 			msgi, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 			Expect(err).NotTo(HaveOccurred())
 | |
| 			subscr := msgi.(*redis.Subscription)
 | |
| 			Expect(subscr.Kind).To(Equal("unsubscribe"))
 | |
| 			Expect(subscr.Channel).To(Equal("mychannel"))
 | |
| 			Expect(subscr.Count).To(Equal(1))
 | |
| 		}
 | |
| 
 | |
| 		{
 | |
| 			msgi, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 			Expect(err).NotTo(HaveOccurred())
 | |
| 			subscr := msgi.(*redis.Subscription)
 | |
| 			Expect(subscr.Kind).To(Equal("unsubscribe"))
 | |
| 			Expect(subscr.Channel).To(Equal("mychannel2"))
 | |
| 			Expect(subscr.Count).To(Equal(0))
 | |
| 		}
 | |
| 
 | |
| 		stats := client.PoolStats()
 | |
| 		Expect(stats.Misses).To(Equal(uint32(2)))
 | |
| 	})
 | |
| 
 | |
| 	It("should ping/pong", func() {
 | |
| 		pubsub := client.Subscribe("mychannel")
 | |
| 		defer pubsub.Close()
 | |
| 
 | |
| 		_, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 
 | |
| 		err = pubsub.Ping("")
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 
 | |
| 		msgi, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		pong := msgi.(*redis.Pong)
 | |
| 		Expect(pong.Payload).To(Equal(""))
 | |
| 	})
 | |
| 
 | |
| 	It("should ping/pong with payload", func() {
 | |
| 		pubsub := client.Subscribe("mychannel")
 | |
| 		defer pubsub.Close()
 | |
| 
 | |
| 		_, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 
 | |
| 		err = pubsub.Ping("hello")
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 
 | |
| 		msgi, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		pong := msgi.(*redis.Pong)
 | |
| 		Expect(pong.Payload).To(Equal("hello"))
 | |
| 	})
 | |
| 
 | |
| 	It("should multi-ReceiveMessage", func() {
 | |
| 		pubsub := client.Subscribe("mychannel")
 | |
| 		defer pubsub.Close()
 | |
| 
 | |
| 		subscr, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(subscr).To(Equal(&redis.Subscription{
 | |
| 			Kind:    "subscribe",
 | |
| 			Channel: "mychannel",
 | |
| 			Count:   1,
 | |
| 		}))
 | |
| 
 | |
| 		err = client.Publish("mychannel", "hello").Err()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 
 | |
| 		err = client.Publish("mychannel", "world").Err()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 
 | |
| 		msg, err := pubsub.ReceiveMessage()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(msg.Channel).To(Equal("mychannel"))
 | |
| 		Expect(msg.Payload).To(Equal("hello"))
 | |
| 
 | |
| 		msg, err = pubsub.ReceiveMessage()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(msg.Channel).To(Equal("mychannel"))
 | |
| 		Expect(msg.Payload).To(Equal("world"))
 | |
| 	})
 | |
| 
 | |
| 	It("returns an error when subscribe fails", func() {
 | |
| 		pubsub := client.Subscribe()
 | |
| 		defer pubsub.Close()
 | |
| 
 | |
| 		pubsub.SetNetConn(&badConn{
 | |
| 			readErr:  io.EOF,
 | |
| 			writeErr: io.EOF,
 | |
| 		})
 | |
| 
 | |
| 		err := pubsub.Subscribe("mychannel")
 | |
| 		Expect(err).To(MatchError("EOF"))
 | |
| 
 | |
| 		err = pubsub.Subscribe("mychannel")
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 	})
 | |
| 
 | |
| 	expectReceiveMessageOnError := func(pubsub *redis.PubSub) {
 | |
| 		pubsub.SetNetConn(&badConn{
 | |
| 			readErr:  io.EOF,
 | |
| 			writeErr: io.EOF,
 | |
| 		})
 | |
| 
 | |
| 		step := make(chan struct{}, 3)
 | |
| 
 | |
| 		go func() {
 | |
| 			defer GinkgoRecover()
 | |
| 
 | |
| 			Eventually(step).Should(Receive())
 | |
| 			err := client.Publish("mychannel", "hello").Err()
 | |
| 			Expect(err).NotTo(HaveOccurred())
 | |
| 			step <- struct{}{}
 | |
| 		}()
 | |
| 
 | |
| 		_, err := pubsub.ReceiveMessage()
 | |
| 		Expect(err).To(Equal(io.EOF))
 | |
| 		step <- struct{}{}
 | |
| 
 | |
| 		msg, err := pubsub.ReceiveMessage()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(msg.Channel).To(Equal("mychannel"))
 | |
| 		Expect(msg.Payload).To(Equal("hello"))
 | |
| 
 | |
| 		Eventually(step).Should(Receive())
 | |
| 	}
 | |
| 
 | |
| 	It("Subscribe should reconnect on ReceiveMessage error", func() {
 | |
| 		pubsub := client.Subscribe("mychannel")
 | |
| 		defer pubsub.Close()
 | |
| 
 | |
| 		subscr, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(subscr).To(Equal(&redis.Subscription{
 | |
| 			Kind:    "subscribe",
 | |
| 			Channel: "mychannel",
 | |
| 			Count:   1,
 | |
| 		}))
 | |
| 
 | |
| 		expectReceiveMessageOnError(pubsub)
 | |
| 	})
 | |
| 
 | |
| 	It("PSubscribe should reconnect on ReceiveMessage error", func() {
 | |
| 		pubsub := client.PSubscribe("mychannel")
 | |
| 		defer pubsub.Close()
 | |
| 
 | |
| 		subscr, err := pubsub.ReceiveTimeout(time.Second)
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(subscr).To(Equal(&redis.Subscription{
 | |
| 			Kind:    "psubscribe",
 | |
| 			Channel: "mychannel",
 | |
| 			Count:   1,
 | |
| 		}))
 | |
| 
 | |
| 		expectReceiveMessageOnError(pubsub)
 | |
| 	})
 | |
| 
 | |
| 	It("should return on Close", func() {
 | |
| 		pubsub := client.Subscribe("mychannel")
 | |
| 		defer pubsub.Close()
 | |
| 
 | |
| 		var wg sync.WaitGroup
 | |
| 		wg.Add(1)
 | |
| 		go func() {
 | |
| 			defer GinkgoRecover()
 | |
| 
 | |
| 			wg.Done()
 | |
| 			defer wg.Done()
 | |
| 
 | |
| 			_, err := pubsub.ReceiveMessage()
 | |
| 			Expect(err).To(HaveOccurred())
 | |
| 			Expect(err.Error()).To(SatisfyAny(
 | |
| 				Equal("redis: client is closed"),
 | |
| 				ContainSubstring("use of closed network connection"),
 | |
| 			))
 | |
| 		}()
 | |
| 
 | |
| 		wg.Wait()
 | |
| 		wg.Add(1)
 | |
| 
 | |
| 		Expect(pubsub.Close()).NotTo(HaveOccurred())
 | |
| 
 | |
| 		wg.Wait()
 | |
| 	})
 | |
| 
 | |
| 	It("should ReceiveMessage without a subscription", func() {
 | |
| 		timeout := 100 * time.Millisecond
 | |
| 
 | |
| 		pubsub := client.Subscribe()
 | |
| 		defer pubsub.Close()
 | |
| 
 | |
| 		var wg sync.WaitGroup
 | |
| 		wg.Add(1)
 | |
| 		go func() {
 | |
| 			defer GinkgoRecover()
 | |
| 			defer wg.Done()
 | |
| 
 | |
| 			time.Sleep(timeout)
 | |
| 
 | |
| 			err := pubsub.Subscribe("mychannel")
 | |
| 			Expect(err).NotTo(HaveOccurred())
 | |
| 
 | |
| 			time.Sleep(timeout)
 | |
| 
 | |
| 			err = client.Publish("mychannel", "hello").Err()
 | |
| 			Expect(err).NotTo(HaveOccurred())
 | |
| 		}()
 | |
| 
 | |
| 		msg, err := pubsub.ReceiveMessage()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 		Expect(msg.Channel).To(Equal("mychannel"))
 | |
| 		Expect(msg.Payload).To(Equal("hello"))
 | |
| 
 | |
| 		wg.Wait()
 | |
| 	})
 | |
| 
 | |
| 	It("handles big message payload", func() {
 | |
| 		pubsub := client.Subscribe("mychannel")
 | |
| 		defer pubsub.Close()
 | |
| 
 | |
| 		ch := pubsub.Channel()
 | |
| 
 | |
| 		bigVal := bigVal()
 | |
| 		err := client.Publish("mychannel", bigVal).Err()
 | |
| 		Expect(err).NotTo(HaveOccurred())
 | |
| 
 | |
| 		var msg *redis.Message
 | |
| 		Eventually(ch).Should(Receive(&msg))
 | |
| 		Expect(msg.Channel).To(Equal("mychannel"))
 | |
| 		Expect(msg.Payload).To(Equal(string(bigVal)))
 | |
| 	})
 | |
| 
 | |
| 	It("supports concurrent Ping and Receive", func() {
 | |
| 		const N = 100
 | |
| 
 | |
| 		pubsub := client.Subscribe("mychannel")
 | |
| 		defer pubsub.Close()
 | |
| 
 | |
| 		done := make(chan struct{})
 | |
| 		go func() {
 | |
| 			defer GinkgoRecover()
 | |
| 
 | |
| 			for i := 0; i < N; i++ {
 | |
| 				_, err := pubsub.ReceiveTimeout(5 * time.Second)
 | |
| 				Expect(err).NotTo(HaveOccurred())
 | |
| 			}
 | |
| 			close(done)
 | |
| 		}()
 | |
| 
 | |
| 		for i := 0; i < N; i++ {
 | |
| 			err := pubsub.Ping()
 | |
| 			Expect(err).NotTo(HaveOccurred())
 | |
| 		}
 | |
| 
 | |
| 		select {
 | |
| 		case <-done:
 | |
| 		case <-time.After(30 * time.Second):
 | |
| 			Fail("timeout")
 | |
| 		}
 | |
| 	})
 | |
| })
 |