pubsub_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. package redis_test
  2. import (
  3. "io"
  4. "net"
  5. "sync"
  6. "time"
  7. "github.com/go-redis/redis/v7"
  8. . "github.com/onsi/ginkgo"
  9. . "github.com/onsi/gomega"
  10. )
  11. var _ = Describe("PubSub", func() {
  12. var client *redis.Client
  13. BeforeEach(func() {
  14. opt := redisOptions()
  15. opt.MinIdleConns = 0
  16. opt.MaxConnAge = 0
  17. client = redis.NewClient(opt)
  18. Expect(client.FlushDB().Err()).NotTo(HaveOccurred())
  19. })
  20. AfterEach(func() {
  21. Expect(client.Close()).NotTo(HaveOccurred())
  22. })
  23. It("implements Stringer", func() {
  24. pubsub := client.PSubscribe("mychannel*")
  25. defer pubsub.Close()
  26. Expect(pubsub.String()).To(Equal("PubSub(mychannel*)"))
  27. })
  28. It("should support pattern matching", func() {
  29. pubsub := client.PSubscribe("mychannel*")
  30. defer pubsub.Close()
  31. {
  32. msgi, err := pubsub.ReceiveTimeout(time.Second)
  33. Expect(err).NotTo(HaveOccurred())
  34. subscr := msgi.(*redis.Subscription)
  35. Expect(subscr.Kind).To(Equal("psubscribe"))
  36. Expect(subscr.Channel).To(Equal("mychannel*"))
  37. Expect(subscr.Count).To(Equal(1))
  38. }
  39. {
  40. msgi, err := pubsub.ReceiveTimeout(time.Second)
  41. Expect(err.(net.Error).Timeout()).To(Equal(true))
  42. Expect(msgi).To(BeNil())
  43. }
  44. n, err := client.Publish("mychannel1", "hello").Result()
  45. Expect(err).NotTo(HaveOccurred())
  46. Expect(n).To(Equal(int64(1)))
  47. Expect(pubsub.PUnsubscribe("mychannel*")).NotTo(HaveOccurred())
  48. {
  49. msgi, err := pubsub.ReceiveTimeout(time.Second)
  50. Expect(err).NotTo(HaveOccurred())
  51. subscr := msgi.(*redis.Message)
  52. Expect(subscr.Channel).To(Equal("mychannel1"))
  53. Expect(subscr.Pattern).To(Equal("mychannel*"))
  54. Expect(subscr.Payload).To(Equal("hello"))
  55. }
  56. {
  57. msgi, err := pubsub.ReceiveTimeout(time.Second)
  58. Expect(err).NotTo(HaveOccurred())
  59. subscr := msgi.(*redis.Subscription)
  60. Expect(subscr.Kind).To(Equal("punsubscribe"))
  61. Expect(subscr.Channel).To(Equal("mychannel*"))
  62. Expect(subscr.Count).To(Equal(0))
  63. }
  64. stats := client.PoolStats()
  65. Expect(stats.Misses).To(Equal(uint32(1)))
  66. })
  67. It("should pub/sub channels", func() {
  68. channels, err := client.PubSubChannels("mychannel*").Result()
  69. Expect(err).NotTo(HaveOccurred())
  70. Expect(channels).To(BeEmpty())
  71. pubsub := client.Subscribe("mychannel", "mychannel2")
  72. defer pubsub.Close()
  73. channels, err = client.PubSubChannels("mychannel*").Result()
  74. Expect(err).NotTo(HaveOccurred())
  75. Expect(channels).To(ConsistOf([]string{"mychannel", "mychannel2"}))
  76. channels, err = client.PubSubChannels("").Result()
  77. Expect(err).NotTo(HaveOccurred())
  78. Expect(channels).To(BeEmpty())
  79. channels, err = client.PubSubChannels("*").Result()
  80. Expect(err).NotTo(HaveOccurred())
  81. Expect(len(channels)).To(BeNumerically(">=", 2))
  82. })
  83. It("should return the numbers of subscribers", func() {
  84. pubsub := client.Subscribe("mychannel", "mychannel2")
  85. defer pubsub.Close()
  86. channels, err := client.PubSubNumSub("mychannel", "mychannel2", "mychannel3").Result()
  87. Expect(err).NotTo(HaveOccurred())
  88. Expect(channels).To(Equal(map[string]int64{
  89. "mychannel": 1,
  90. "mychannel2": 1,
  91. "mychannel3": 0,
  92. }))
  93. })
  94. It("should return the numbers of subscribers by pattern", func() {
  95. num, err := client.PubSubNumPat().Result()
  96. Expect(err).NotTo(HaveOccurred())
  97. Expect(num).To(Equal(int64(0)))
  98. pubsub := client.PSubscribe("*")
  99. defer pubsub.Close()
  100. num, err = client.PubSubNumPat().Result()
  101. Expect(err).NotTo(HaveOccurred())
  102. Expect(num).To(Equal(int64(1)))
  103. })
  104. It("should pub/sub", func() {
  105. pubsub := client.Subscribe("mychannel", "mychannel2")
  106. defer pubsub.Close()
  107. {
  108. msgi, err := pubsub.ReceiveTimeout(time.Second)
  109. Expect(err).NotTo(HaveOccurred())
  110. subscr := msgi.(*redis.Subscription)
  111. Expect(subscr.Kind).To(Equal("subscribe"))
  112. Expect(subscr.Channel).To(Equal("mychannel"))
  113. Expect(subscr.Count).To(Equal(1))
  114. }
  115. {
  116. msgi, err := pubsub.ReceiveTimeout(time.Second)
  117. Expect(err).NotTo(HaveOccurred())
  118. subscr := msgi.(*redis.Subscription)
  119. Expect(subscr.Kind).To(Equal("subscribe"))
  120. Expect(subscr.Channel).To(Equal("mychannel2"))
  121. Expect(subscr.Count).To(Equal(2))
  122. }
  123. {
  124. msgi, err := pubsub.ReceiveTimeout(time.Second)
  125. Expect(err.(net.Error).Timeout()).To(Equal(true))
  126. Expect(msgi).NotTo(HaveOccurred())
  127. }
  128. n, err := client.Publish("mychannel", "hello").Result()
  129. Expect(err).NotTo(HaveOccurred())
  130. Expect(n).To(Equal(int64(1)))
  131. n, err = client.Publish("mychannel2", "hello2").Result()
  132. Expect(err).NotTo(HaveOccurred())
  133. Expect(n).To(Equal(int64(1)))
  134. Expect(pubsub.Unsubscribe("mychannel", "mychannel2")).NotTo(HaveOccurred())
  135. {
  136. msgi, err := pubsub.ReceiveTimeout(time.Second)
  137. Expect(err).NotTo(HaveOccurred())
  138. msg := msgi.(*redis.Message)
  139. Expect(msg.Channel).To(Equal("mychannel"))
  140. Expect(msg.Payload).To(Equal("hello"))
  141. }
  142. {
  143. msgi, err := pubsub.ReceiveTimeout(time.Second)
  144. Expect(err).NotTo(HaveOccurred())
  145. msg := msgi.(*redis.Message)
  146. Expect(msg.Channel).To(Equal("mychannel2"))
  147. Expect(msg.Payload).To(Equal("hello2"))
  148. }
  149. {
  150. msgi, err := pubsub.ReceiveTimeout(time.Second)
  151. Expect(err).NotTo(HaveOccurred())
  152. subscr := msgi.(*redis.Subscription)
  153. Expect(subscr.Kind).To(Equal("unsubscribe"))
  154. Expect(subscr.Channel).To(Equal("mychannel"))
  155. Expect(subscr.Count).To(Equal(1))
  156. }
  157. {
  158. msgi, err := pubsub.ReceiveTimeout(time.Second)
  159. Expect(err).NotTo(HaveOccurred())
  160. subscr := msgi.(*redis.Subscription)
  161. Expect(subscr.Kind).To(Equal("unsubscribe"))
  162. Expect(subscr.Channel).To(Equal("mychannel2"))
  163. Expect(subscr.Count).To(Equal(0))
  164. }
  165. stats := client.PoolStats()
  166. Expect(stats.Misses).To(Equal(uint32(1)))
  167. })
  168. It("should ping/pong", func() {
  169. pubsub := client.Subscribe("mychannel")
  170. defer pubsub.Close()
  171. _, err := pubsub.ReceiveTimeout(time.Second)
  172. Expect(err).NotTo(HaveOccurred())
  173. err = pubsub.Ping("")
  174. Expect(err).NotTo(HaveOccurred())
  175. msgi, err := pubsub.ReceiveTimeout(time.Second)
  176. Expect(err).NotTo(HaveOccurred())
  177. pong := msgi.(*redis.Pong)
  178. Expect(pong.Payload).To(Equal(""))
  179. })
  180. It("should ping/pong with payload", func() {
  181. pubsub := client.Subscribe("mychannel")
  182. defer pubsub.Close()
  183. _, err := pubsub.ReceiveTimeout(time.Second)
  184. Expect(err).NotTo(HaveOccurred())
  185. err = pubsub.Ping("hello")
  186. Expect(err).NotTo(HaveOccurred())
  187. msgi, err := pubsub.ReceiveTimeout(time.Second)
  188. Expect(err).NotTo(HaveOccurred())
  189. pong := msgi.(*redis.Pong)
  190. Expect(pong.Payload).To(Equal("hello"))
  191. })
  192. It("should multi-ReceiveMessage", func() {
  193. pubsub := client.Subscribe("mychannel")
  194. defer pubsub.Close()
  195. subscr, err := pubsub.ReceiveTimeout(time.Second)
  196. Expect(err).NotTo(HaveOccurred())
  197. Expect(subscr).To(Equal(&redis.Subscription{
  198. Kind: "subscribe",
  199. Channel: "mychannel",
  200. Count: 1,
  201. }))
  202. err = client.Publish("mychannel", "hello").Err()
  203. Expect(err).NotTo(HaveOccurred())
  204. err = client.Publish("mychannel", "world").Err()
  205. Expect(err).NotTo(HaveOccurred())
  206. msg, err := pubsub.ReceiveMessage()
  207. Expect(err).NotTo(HaveOccurred())
  208. Expect(msg.Channel).To(Equal("mychannel"))
  209. Expect(msg.Payload).To(Equal("hello"))
  210. msg, err = pubsub.ReceiveMessage()
  211. Expect(err).NotTo(HaveOccurred())
  212. Expect(msg.Channel).To(Equal("mychannel"))
  213. Expect(msg.Payload).To(Equal("world"))
  214. })
  215. It("returns an error when subscribe fails", func() {
  216. pubsub := client.Subscribe()
  217. defer pubsub.Close()
  218. pubsub.SetNetConn(&badConn{
  219. readErr: io.EOF,
  220. writeErr: io.EOF,
  221. })
  222. err := pubsub.Subscribe("mychannel")
  223. Expect(err).To(MatchError("EOF"))
  224. err = pubsub.Subscribe("mychannel")
  225. Expect(err).NotTo(HaveOccurred())
  226. })
  227. expectReceiveMessageOnError := func(pubsub *redis.PubSub) {
  228. pubsub.SetNetConn(&badConn{
  229. readErr: io.EOF,
  230. writeErr: io.EOF,
  231. })
  232. step := make(chan struct{}, 3)
  233. go func() {
  234. defer GinkgoRecover()
  235. Eventually(step).Should(Receive())
  236. err := client.Publish("mychannel", "hello").Err()
  237. Expect(err).NotTo(HaveOccurred())
  238. step <- struct{}{}
  239. }()
  240. _, err := pubsub.ReceiveMessage()
  241. Expect(err).To(Equal(io.EOF))
  242. step <- struct{}{}
  243. msg, err := pubsub.ReceiveMessage()
  244. Expect(err).NotTo(HaveOccurred())
  245. Expect(msg.Channel).To(Equal("mychannel"))
  246. Expect(msg.Payload).To(Equal("hello"))
  247. Eventually(step).Should(Receive())
  248. }
  249. It("Subscribe should reconnect on ReceiveMessage error", func() {
  250. pubsub := client.Subscribe("mychannel")
  251. defer pubsub.Close()
  252. subscr, err := pubsub.ReceiveTimeout(time.Second)
  253. Expect(err).NotTo(HaveOccurred())
  254. Expect(subscr).To(Equal(&redis.Subscription{
  255. Kind: "subscribe",
  256. Channel: "mychannel",
  257. Count: 1,
  258. }))
  259. expectReceiveMessageOnError(pubsub)
  260. })
  261. It("PSubscribe should reconnect on ReceiveMessage error", func() {
  262. pubsub := client.PSubscribe("mychannel")
  263. defer pubsub.Close()
  264. subscr, err := pubsub.ReceiveTimeout(time.Second)
  265. Expect(err).NotTo(HaveOccurred())
  266. Expect(subscr).To(Equal(&redis.Subscription{
  267. Kind: "psubscribe",
  268. Channel: "mychannel",
  269. Count: 1,
  270. }))
  271. expectReceiveMessageOnError(pubsub)
  272. })
  273. It("should return on Close", func() {
  274. pubsub := client.Subscribe("mychannel")
  275. defer pubsub.Close()
  276. var wg sync.WaitGroup
  277. wg.Add(1)
  278. go func() {
  279. defer GinkgoRecover()
  280. wg.Done()
  281. defer wg.Done()
  282. _, err := pubsub.ReceiveMessage()
  283. Expect(err).To(HaveOccurred())
  284. Expect(err.Error()).To(SatisfyAny(
  285. Equal("redis: client is closed"),
  286. ContainSubstring("use of closed network connection"),
  287. ))
  288. }()
  289. wg.Wait()
  290. wg.Add(1)
  291. Expect(pubsub.Close()).NotTo(HaveOccurred())
  292. wg.Wait()
  293. })
  294. It("should ReceiveMessage without a subscription", func() {
  295. timeout := 100 * time.Millisecond
  296. pubsub := client.Subscribe()
  297. defer pubsub.Close()
  298. var wg sync.WaitGroup
  299. wg.Add(1)
  300. go func() {
  301. defer GinkgoRecover()
  302. defer wg.Done()
  303. time.Sleep(timeout)
  304. err := pubsub.Subscribe("mychannel")
  305. Expect(err).NotTo(HaveOccurred())
  306. time.Sleep(timeout)
  307. err = client.Publish("mychannel", "hello").Err()
  308. Expect(err).NotTo(HaveOccurred())
  309. }()
  310. msg, err := pubsub.ReceiveMessage()
  311. Expect(err).NotTo(HaveOccurred())
  312. Expect(msg.Channel).To(Equal("mychannel"))
  313. Expect(msg.Payload).To(Equal("hello"))
  314. wg.Wait()
  315. })
  316. It("handles big message payload", func() {
  317. pubsub := client.Subscribe("mychannel")
  318. defer pubsub.Close()
  319. ch := pubsub.Channel()
  320. bigVal := bigVal()
  321. err := client.Publish("mychannel", bigVal).Err()
  322. Expect(err).NotTo(HaveOccurred())
  323. var msg *redis.Message
  324. Eventually(ch).Should(Receive(&msg))
  325. Expect(msg.Channel).To(Equal("mychannel"))
  326. Expect(msg.Payload).To(Equal(string(bigVal)))
  327. })
  328. It("supports concurrent Ping and Receive", func() {
  329. const N = 100
  330. pubsub := client.Subscribe("mychannel")
  331. defer pubsub.Close()
  332. done := make(chan struct{})
  333. go func() {
  334. defer GinkgoRecover()
  335. for i := 0; i < N; i++ {
  336. _, err := pubsub.ReceiveTimeout(5 * time.Second)
  337. Expect(err).NotTo(HaveOccurred())
  338. }
  339. close(done)
  340. }()
  341. for i := 0; i < N; i++ {
  342. err := pubsub.Ping()
  343. Expect(err).NotTo(HaveOccurred())
  344. }
  345. select {
  346. case <-done:
  347. case <-time.After(30 * time.Second):
  348. Fail("timeout")
  349. }
  350. })
  351. })