ring_test.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. package redis_test
  2. import (
  3. "context"
  4. "crypto/rand"
  5. "fmt"
  6. "net"
  7. "strconv"
  8. "sync"
  9. "time"
  10. "github.com/go-redis/redis/v7"
  11. . "github.com/onsi/ginkgo"
  12. . "github.com/onsi/gomega"
  13. )
  14. var _ = Describe("Redis Ring", func() {
  15. const heartbeat = 100 * time.Millisecond
  16. var ring *redis.Ring
  17. setRingKeys := func() {
  18. for i := 0; i < 100; i++ {
  19. err := ring.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
  20. Expect(err).NotTo(HaveOccurred())
  21. }
  22. }
  23. BeforeEach(func() {
  24. opt := redisRingOptions()
  25. opt.HeartbeatFrequency = heartbeat
  26. ring = redis.NewRing(opt)
  27. err := ring.ForEachShard(func(cl *redis.Client) error {
  28. return cl.FlushDB().Err()
  29. })
  30. Expect(err).NotTo(HaveOccurred())
  31. })
  32. AfterEach(func() {
  33. Expect(ring.Close()).NotTo(HaveOccurred())
  34. })
  35. It("supports WithContext", func() {
  36. c, cancel := context.WithCancel(context.Background())
  37. cancel()
  38. err := ring.WithContext(c).Ping().Err()
  39. Expect(err).To(MatchError("context canceled"))
  40. })
  41. It("distributes keys", func() {
  42. setRingKeys()
  43. // Both shards should have some keys now.
  44. Expect(ringShard1.Info("keyspace").Val()).To(ContainSubstring("keys=57"))
  45. Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=43"))
  46. })
  47. It("distributes keys when using EVAL", func() {
  48. script := redis.NewScript(`
  49. local r = redis.call('SET', KEYS[1], ARGV[1])
  50. return r
  51. `)
  52. var key string
  53. for i := 0; i < 100; i++ {
  54. key = fmt.Sprintf("key%d", i)
  55. err := script.Run(ring, []string{key}, "value").Err()
  56. Expect(err).NotTo(HaveOccurred())
  57. }
  58. Expect(ringShard1.Info("keyspace").Val()).To(ContainSubstring("keys=57"))
  59. Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=43"))
  60. })
  61. It("uses single shard when one of the shards is down", func() {
  62. // Stop ringShard2.
  63. Expect(ringShard2.Close()).NotTo(HaveOccurred())
  64. Eventually(func() int {
  65. return ring.Len()
  66. }, "30s").Should(Equal(1))
  67. setRingKeys()
  68. // RingShard1 should have all keys.
  69. Expect(ringShard1.Info("keyspace").Val()).To(ContainSubstring("keys=100"))
  70. // Start ringShard2.
  71. var err error
  72. ringShard2, err = startRedis(ringShard2Port)
  73. Expect(err).NotTo(HaveOccurred())
  74. Eventually(func() int {
  75. return ring.Len()
  76. }, "30s").Should(Equal(2))
  77. setRingKeys()
  78. // RingShard2 should have its keys.
  79. Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=43"))
  80. })
  81. It("supports hash tags", func() {
  82. for i := 0; i < 100; i++ {
  83. err := ring.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
  84. Expect(err).NotTo(HaveOccurred())
  85. }
  86. Expect(ringShard1.Info("keyspace").Val()).ToNot(ContainSubstring("keys="))
  87. Expect(ringShard2.Info("keyspace").Val()).To(ContainSubstring("keys=100"))
  88. })
  89. Describe("pipeline", func() {
  90. It("distributes keys", func() {
  91. pipe := ring.Pipeline()
  92. for i := 0; i < 100; i++ {
  93. err := pipe.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
  94. Expect(err).NotTo(HaveOccurred())
  95. }
  96. cmds, err := pipe.Exec()
  97. Expect(err).NotTo(HaveOccurred())
  98. Expect(cmds).To(HaveLen(100))
  99. Expect(pipe.Close()).NotTo(HaveOccurred())
  100. for _, cmd := range cmds {
  101. Expect(cmd.Err()).NotTo(HaveOccurred())
  102. Expect(cmd.(*redis.StatusCmd).Val()).To(Equal("OK"))
  103. }
  104. // Both shards should have some keys now.
  105. Expect(ringShard1.Info().Val()).To(ContainSubstring("keys=57"))
  106. Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=43"))
  107. })
  108. It("is consistent with ring", func() {
  109. var keys []string
  110. for i := 0; i < 100; i++ {
  111. key := make([]byte, 64)
  112. _, err := rand.Read(key)
  113. Expect(err).NotTo(HaveOccurred())
  114. keys = append(keys, string(key))
  115. }
  116. _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
  117. for _, key := range keys {
  118. pipe.Set(key, "value", 0).Err()
  119. }
  120. return nil
  121. })
  122. Expect(err).NotTo(HaveOccurred())
  123. for _, key := range keys {
  124. val, err := ring.Get(key).Result()
  125. Expect(err).NotTo(HaveOccurred())
  126. Expect(val).To(Equal("value"))
  127. }
  128. })
  129. It("supports hash tags", func() {
  130. _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
  131. for i := 0; i < 100; i++ {
  132. pipe.Set(fmt.Sprintf("key%d{tag}", i), "value", 0).Err()
  133. }
  134. return nil
  135. })
  136. Expect(err).NotTo(HaveOccurred())
  137. Expect(ringShard1.Info().Val()).ToNot(ContainSubstring("keys="))
  138. Expect(ringShard2.Info().Val()).To(ContainSubstring("keys=100"))
  139. })
  140. })
  141. Describe("shard passwords", func() {
  142. It("can be initialized with a single password, used for all shards", func() {
  143. opts := redisRingOptions()
  144. opts.Password = "password"
  145. ring = redis.NewRing(opts)
  146. err := ring.Ping().Err()
  147. Expect(err).To(MatchError("ERR Client sent AUTH, but no password is set"))
  148. })
  149. It("can be initialized with a passwords map, one for each shard", func() {
  150. opts := redisRingOptions()
  151. opts.Passwords = map[string]string{
  152. "ringShardOne": "password1",
  153. "ringShardTwo": "password2",
  154. }
  155. ring = redis.NewRing(opts)
  156. err := ring.Ping().Err()
  157. Expect(err).To(MatchError("ERR Client sent AUTH, but no password is set"))
  158. })
  159. })
  160. })
  161. var _ = Describe("empty Redis Ring", func() {
  162. var ring *redis.Ring
  163. BeforeEach(func() {
  164. ring = redis.NewRing(&redis.RingOptions{})
  165. })
  166. AfterEach(func() {
  167. Expect(ring.Close()).NotTo(HaveOccurred())
  168. })
  169. It("returns an error", func() {
  170. err := ring.Ping().Err()
  171. Expect(err).To(MatchError("redis: all ring shards are down"))
  172. })
  173. It("pipeline returns an error", func() {
  174. _, err := ring.Pipelined(func(pipe redis.Pipeliner) error {
  175. pipe.Ping()
  176. return nil
  177. })
  178. Expect(err).To(MatchError("redis: all ring shards are down"))
  179. })
  180. })
  181. var _ = Describe("Ring watch", func() {
  182. const heartbeat = 100 * time.Millisecond
  183. var ring *redis.Ring
  184. BeforeEach(func() {
  185. opt := redisRingOptions()
  186. opt.HeartbeatFrequency = heartbeat
  187. ring = redis.NewRing(opt)
  188. err := ring.ForEachShard(func(cl *redis.Client) error {
  189. return cl.FlushDB().Err()
  190. })
  191. Expect(err).NotTo(HaveOccurred())
  192. })
  193. AfterEach(func() {
  194. Expect(ring.Close()).NotTo(HaveOccurred())
  195. })
  196. It("should Watch", func() {
  197. var incr func(string) error
  198. // Transactionally increments key using GET and SET commands.
  199. incr = func(key string) error {
  200. err := ring.Watch(func(tx *redis.Tx) error {
  201. n, err := tx.Get(key).Int64()
  202. if err != nil && err != redis.Nil {
  203. return err
  204. }
  205. _, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
  206. pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
  207. return nil
  208. })
  209. return err
  210. }, key)
  211. if err == redis.TxFailedErr {
  212. return incr(key)
  213. }
  214. return err
  215. }
  216. var wg sync.WaitGroup
  217. for i := 0; i < 100; i++ {
  218. wg.Add(1)
  219. go func() {
  220. defer GinkgoRecover()
  221. defer wg.Done()
  222. err := incr("key")
  223. Expect(err).NotTo(HaveOccurred())
  224. }()
  225. }
  226. wg.Wait()
  227. n, err := ring.Get("key").Int64()
  228. Expect(err).NotTo(HaveOccurred())
  229. Expect(n).To(Equal(int64(100)))
  230. })
  231. It("should discard", func() {
  232. err := ring.Watch(func(tx *redis.Tx) error {
  233. cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
  234. pipe.Set("key1", "hello1", 0)
  235. pipe.Discard()
  236. pipe.Set("key2", "hello2", 0)
  237. return nil
  238. })
  239. Expect(err).NotTo(HaveOccurred())
  240. Expect(cmds).To(HaveLen(1))
  241. return err
  242. }, "key1", "key2")
  243. Expect(err).NotTo(HaveOccurred())
  244. get := ring.Get("key1")
  245. Expect(get.Err()).To(Equal(redis.Nil))
  246. Expect(get.Val()).To(Equal(""))
  247. get = ring.Get("key2")
  248. Expect(get.Err()).NotTo(HaveOccurred())
  249. Expect(get.Val()).To(Equal("hello2"))
  250. })
  251. It("returns no error when there are no commands", func() {
  252. err := ring.Watch(func(tx *redis.Tx) error {
  253. _, err := tx.TxPipelined(func(redis.Pipeliner) error { return nil })
  254. return err
  255. }, "key")
  256. Expect(err).NotTo(HaveOccurred())
  257. v, err := ring.Ping().Result()
  258. Expect(err).NotTo(HaveOccurred())
  259. Expect(v).To(Equal("PONG"))
  260. })
  261. It("should exec bulks", func() {
  262. const N = 20000
  263. err := ring.Watch(func(tx *redis.Tx) error {
  264. cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
  265. for i := 0; i < N; i++ {
  266. pipe.Incr("key")
  267. }
  268. return nil
  269. })
  270. Expect(err).NotTo(HaveOccurred())
  271. Expect(len(cmds)).To(Equal(N))
  272. for _, cmd := range cmds {
  273. Expect(cmd.Err()).NotTo(HaveOccurred())
  274. }
  275. return err
  276. }, "key")
  277. Expect(err).NotTo(HaveOccurred())
  278. num, err := ring.Get("key").Int64()
  279. Expect(err).NotTo(HaveOccurred())
  280. Expect(num).To(Equal(int64(N)))
  281. })
  282. It("should Watch/Unwatch", func() {
  283. var C, N int
  284. err := ring.Set("key", "0", 0).Err()
  285. Expect(err).NotTo(HaveOccurred())
  286. perform(C, func(id int) {
  287. for i := 0; i < N; i++ {
  288. err := ring.Watch(func(tx *redis.Tx) error {
  289. val, err := tx.Get("key").Result()
  290. Expect(err).NotTo(HaveOccurred())
  291. Expect(val).NotTo(Equal(redis.Nil))
  292. num, err := strconv.ParseInt(val, 10, 64)
  293. Expect(err).NotTo(HaveOccurred())
  294. cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
  295. pipe.Set("key", strconv.FormatInt(num+1, 10), 0)
  296. return nil
  297. })
  298. Expect(cmds).To(HaveLen(1))
  299. return err
  300. }, "key")
  301. if err == redis.TxFailedErr {
  302. i--
  303. continue
  304. }
  305. Expect(err).NotTo(HaveOccurred())
  306. }
  307. })
  308. val, err := ring.Get("key").Int64()
  309. Expect(err).NotTo(HaveOccurred())
  310. Expect(val).To(Equal(int64(C * N)))
  311. })
  312. It("should close Tx without closing the client", func() {
  313. err := ring.Watch(func(tx *redis.Tx) error {
  314. _, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
  315. pipe.Ping()
  316. return nil
  317. })
  318. return err
  319. }, "key")
  320. Expect(err).NotTo(HaveOccurred())
  321. Expect(ring.Ping().Err()).NotTo(HaveOccurred())
  322. })
  323. It("respects max size on multi", func() {
  324. perform(1000, func(id int) {
  325. var ping *redis.StatusCmd
  326. err := ring.Watch(func(tx *redis.Tx) error {
  327. cmds, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
  328. ping = pipe.Ping()
  329. return nil
  330. })
  331. Expect(err).NotTo(HaveOccurred())
  332. Expect(cmds).To(HaveLen(1))
  333. return err
  334. }, "key")
  335. Expect(err).NotTo(HaveOccurred())
  336. Expect(ping.Err()).NotTo(HaveOccurred())
  337. Expect(ping.Val()).To(Equal("PONG"))
  338. })
  339. ring.ForEachShard(func(cl *redis.Client) error {
  340. defer GinkgoRecover()
  341. pool := cl.Pool()
  342. Expect(pool.Len()).To(BeNumerically("<=", 10))
  343. Expect(pool.IdleLen()).To(BeNumerically("<=", 10))
  344. Expect(pool.Len()).To(Equal(pool.IdleLen()))
  345. return nil
  346. })
  347. })
  348. })
  349. var _ = Describe("Ring Tx timeout", func() {
  350. const heartbeat = 100 * time.Millisecond
  351. var ring *redis.Ring
  352. AfterEach(func() {
  353. _ = ring.Close()
  354. })
  355. testTimeout := func() {
  356. It("Tx timeouts", func() {
  357. err := ring.Watch(func(tx *redis.Tx) error {
  358. return tx.Ping().Err()
  359. }, "foo")
  360. Expect(err).To(HaveOccurred())
  361. Expect(err.(net.Error).Timeout()).To(BeTrue())
  362. })
  363. It("Tx Pipeline timeouts", func() {
  364. err := ring.Watch(func(tx *redis.Tx) error {
  365. _, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
  366. pipe.Ping()
  367. return nil
  368. })
  369. return err
  370. }, "foo")
  371. Expect(err).To(HaveOccurred())
  372. Expect(err.(net.Error).Timeout()).To(BeTrue())
  373. })
  374. }
  375. const pause = 5 * time.Second
  376. Context("read/write timeout", func() {
  377. BeforeEach(func() {
  378. opt := redisRingOptions()
  379. opt.ReadTimeout = 250 * time.Millisecond
  380. opt.WriteTimeout = 250 * time.Millisecond
  381. opt.HeartbeatFrequency = heartbeat
  382. ring = redis.NewRing(opt)
  383. err := ring.ForEachShard(func(client *redis.Client) error {
  384. return client.ClientPause(pause).Err()
  385. })
  386. Expect(err).NotTo(HaveOccurred())
  387. })
  388. AfterEach(func() {
  389. _ = ring.ForEachShard(func(client *redis.Client) error {
  390. defer GinkgoRecover()
  391. Eventually(func() error {
  392. return client.Ping().Err()
  393. }, 2*pause).ShouldNot(HaveOccurred())
  394. return nil
  395. })
  396. })
  397. testTimeout()
  398. })
  399. })