cluster_test.go 24 KB


  1. package redis_test
  2. import (
  3. "context"
  4. "fmt"
  5. "net"
  6. "strconv"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/go-redis/redis/v7"
  11. "github.com/go-redis/redis/v7/internal/hashtag"
  12. . "github.com/onsi/ginkgo"
  13. . "github.com/onsi/gomega"
  14. )
  15. type clusterScenario struct {
  16. ports []string
  17. nodeIDs []string
  18. processes map[string]*redisProcess
  19. clients map[string]*redis.Client
  20. }
  21. func (s *clusterScenario) masters() []*redis.Client {
  22. result := make([]*redis.Client, 3)
  23. for pos, port := range s.ports[:3] {
  24. result[pos] = s.clients[port]
  25. }
  26. return result
  27. }
  28. func (s *clusterScenario) slaves() []*redis.Client {
  29. result := make([]*redis.Client, 3)
  30. for pos, port := range s.ports[3:] {
  31. result[pos] = s.clients[port]
  32. }
  33. return result
  34. }
  35. func (s *clusterScenario) addrs() []string {
  36. addrs := make([]string, len(s.ports))
  37. for i, port := range s.ports {
  38. addrs[i] = net.JoinHostPort("127.0.0.1", port)
  39. }
  40. return addrs
  41. }
  42. func (s *clusterScenario) clusterClientUnsafe(opt *redis.ClusterOptions) *redis.ClusterClient {
  43. opt.Addrs = s.addrs()
  44. return redis.NewClusterClient(opt)
  45. }
  46. func (s *clusterScenario) clusterClient(opt *redis.ClusterOptions) *redis.ClusterClient {
  47. client := s.clusterClientUnsafe(opt)
  48. err := eventually(func() error {
  49. if opt.ClusterSlots != nil {
  50. return nil
  51. }
  52. state, err := client.LoadState()
  53. if err != nil {
  54. return err
  55. }
  56. if !state.IsConsistent() {
  57. return fmt.Errorf("cluster state is not consistent")
  58. }
  59. return nil
  60. }, 30*time.Second)
  61. if err != nil {
  62. panic(err)
  63. }
  64. return client
  65. }
  66. func startCluster(scenario *clusterScenario) error {
  67. // Start processes and collect node ids
  68. for pos, port := range scenario.ports {
  69. process, err := startRedis(port, "--cluster-enabled", "yes")
  70. if err != nil {
  71. return err
  72. }
  73. client := redis.NewClient(&redis.Options{
  74. Addr: ":" + port,
  75. })
  76. info, err := client.ClusterNodes().Result()
  77. if err != nil {
  78. return err
  79. }
  80. scenario.processes[port] = process
  81. scenario.clients[port] = client
  82. scenario.nodeIDs[pos] = info[:40]
  83. }
  84. // Meet cluster nodes.
  85. for _, client := range scenario.clients {
  86. err := client.ClusterMeet("127.0.0.1", scenario.ports[0]).Err()
  87. if err != nil {
  88. return err
  89. }
  90. }
  91. // Bootstrap masters.
  92. slots := []int{0, 5000, 10000, 16384}
  93. for pos, master := range scenario.masters() {
  94. err := master.ClusterAddSlotsRange(slots[pos], slots[pos+1]-1).Err()
  95. if err != nil {
  96. return err
  97. }
  98. }
  99. // Bootstrap slaves.
  100. for idx, slave := range scenario.slaves() {
  101. masterID := scenario.nodeIDs[idx]
  102. // Wait until master is available
  103. err := eventually(func() error {
  104. s := slave.ClusterNodes().Val()
  105. wanted := masterID
  106. if !strings.Contains(s, wanted) {
  107. return fmt.Errorf("%q does not contain %q", s, wanted)
  108. }
  109. return nil
  110. }, 10*time.Second)
  111. if err != nil {
  112. return err
  113. }
  114. err = slave.ClusterReplicate(masterID).Err()
  115. if err != nil {
  116. return err
  117. }
  118. }
  119. // Wait until all nodes have consistent info.
  120. wanted := []redis.ClusterSlot{{
  121. Start: 0,
  122. End: 4999,
  123. Nodes: []redis.ClusterNode{{
  124. ID: "",
  125. Addr: "127.0.0.1:8220",
  126. }, {
  127. ID: "",
  128. Addr: "127.0.0.1:8223",
  129. }},
  130. }, {
  131. Start: 5000,
  132. End: 9999,
  133. Nodes: []redis.ClusterNode{{
  134. ID: "",
  135. Addr: "127.0.0.1:8221",
  136. }, {
  137. ID: "",
  138. Addr: "127.0.0.1:8224",
  139. }},
  140. }, {
  141. Start: 10000,
  142. End: 16383,
  143. Nodes: []redis.ClusterNode{{
  144. ID: "",
  145. Addr: "127.0.0.1:8222",
  146. }, {
  147. ID: "",
  148. Addr: "127.0.0.1:8225",
  149. }},
  150. }}
  151. for _, client := range scenario.clients {
  152. err := eventually(func() error {
  153. res, err := client.ClusterSlots().Result()
  154. if err != nil {
  155. return err
  156. }
  157. return assertSlotsEqual(res, wanted)
  158. }, 30*time.Second)
  159. if err != nil {
  160. return err
  161. }
  162. }
  163. return nil
  164. }
  165. func assertSlotsEqual(slots, wanted []redis.ClusterSlot) error {
  166. outerLoop:
  167. for _, s2 := range wanted {
  168. for _, s1 := range slots {
  169. if slotEqual(s1, s2) {
  170. continue outerLoop
  171. }
  172. }
  173. return fmt.Errorf("%v not found in %v", s2, slots)
  174. }
  175. return nil
  176. }
  177. func slotEqual(s1, s2 redis.ClusterSlot) bool {
  178. if s1.Start != s2.Start {
  179. return false
  180. }
  181. if s1.End != s2.End {
  182. return false
  183. }
  184. if len(s1.Nodes) != len(s2.Nodes) {
  185. return false
  186. }
  187. for i, n1 := range s1.Nodes {
  188. if n1.Addr != s2.Nodes[i].Addr {
  189. return false
  190. }
  191. }
  192. return true
  193. }
  194. func stopCluster(scenario *clusterScenario) error {
  195. for _, client := range scenario.clients {
  196. if err := client.Close(); err != nil {
  197. return err
  198. }
  199. }
  200. for _, process := range scenario.processes {
  201. if err := process.Close(); err != nil {
  202. return err
  203. }
  204. }
  205. return nil
  206. }
  207. //------------------------------------------------------------------------------
  208. var _ = Describe("ClusterClient", func() {
  209. var failover bool
  210. var opt *redis.ClusterOptions
  211. var client *redis.ClusterClient
  212. assertClusterClient := func() {
  213. It("supports WithContext", func() {
  214. c, cancel := context.WithCancel(context.Background())
  215. cancel()
  216. err := client.WithContext(c).Ping().Err()
  217. Expect(err).To(MatchError("context canceled"))
  218. })
  219. It("should GET/SET/DEL", func() {
  220. err := client.Get("A").Err()
  221. Expect(err).To(Equal(redis.Nil))
  222. err = client.Set("A", "VALUE", 0).Err()
  223. Expect(err).NotTo(HaveOccurred())
  224. Eventually(func() string {
  225. return client.Get("A").Val()
  226. }, 30*time.Second).Should(Equal("VALUE"))
  227. cnt, err := client.Del("A").Result()
  228. Expect(err).NotTo(HaveOccurred())
  229. Expect(cnt).To(Equal(int64(1)))
  230. })
  231. It("GET follows redirects", func() {
  232. err := client.Set("A", "VALUE", 0).Err()
  233. Expect(err).NotTo(HaveOccurred())
  234. if !failover {
  235. Eventually(func() int64 {
  236. nodes, err := client.Nodes("A")
  237. if err != nil {
  238. return 0
  239. }
  240. return nodes[1].Client.DBSize().Val()
  241. }, 30*time.Second).Should(Equal(int64(1)))
  242. Eventually(func() error {
  243. return client.SwapNodes("A")
  244. }, 30*time.Second).ShouldNot(HaveOccurred())
  245. }
  246. v, err := client.Get("A").Result()
  247. Expect(err).NotTo(HaveOccurred())
  248. Expect(v).To(Equal("VALUE"))
  249. })
  250. It("SET follows redirects", func() {
  251. if !failover {
  252. Eventually(func() error {
  253. return client.SwapNodes("A")
  254. }, 30*time.Second).ShouldNot(HaveOccurred())
  255. }
  256. err := client.Set("A", "VALUE", 0).Err()
  257. Expect(err).NotTo(HaveOccurred())
  258. v, err := client.Get("A").Result()
  259. Expect(err).NotTo(HaveOccurred())
  260. Expect(v).To(Equal("VALUE"))
  261. })
  262. It("distributes keys", func() {
  263. for i := 0; i < 100; i++ {
  264. err := client.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
  265. Expect(err).NotTo(HaveOccurred())
  266. }
  267. client.ForEachMaster(func(master *redis.Client) error {
  268. defer GinkgoRecover()
  269. Eventually(func() string {
  270. return master.Info("keyspace").Val()
  271. }, 30*time.Second).Should(Or(
  272. ContainSubstring("keys=31"),
  273. ContainSubstring("keys=29"),
  274. ContainSubstring("keys=40"),
  275. ))
  276. return nil
  277. })
  278. })
  279. It("distributes keys when using EVAL", func() {
  280. script := redis.NewScript(`
  281. local r = redis.call('SET', KEYS[1], ARGV[1])
  282. return r
  283. `)
  284. var key string
  285. for i := 0; i < 100; i++ {
  286. key = fmt.Sprintf("key%d", i)
  287. err := script.Run(client, []string{key}, "value").Err()
  288. Expect(err).NotTo(HaveOccurred())
  289. }
  290. client.ForEachMaster(func(master *redis.Client) error {
  291. defer GinkgoRecover()
  292. Eventually(func() string {
  293. return master.Info("keyspace").Val()
  294. }, 30*time.Second).Should(Or(
  295. ContainSubstring("keys=31"),
  296. ContainSubstring("keys=29"),
  297. ContainSubstring("keys=40"),
  298. ))
  299. return nil
  300. })
  301. })
  302. It("supports Watch", func() {
  303. var incr func(string) error
  304. // Transactionally increments key using GET and SET commands.
  305. incr = func(key string) error {
  306. err := client.Watch(func(tx *redis.Tx) error {
  307. n, err := tx.Get(key).Int64()
  308. if err != nil && err != redis.Nil {
  309. return err
  310. }
  311. _, err = tx.TxPipelined(func(pipe redis.Pipeliner) error {
  312. pipe.Set(key, strconv.FormatInt(n+1, 10), 0)
  313. return nil
  314. })
  315. return err
  316. }, key)
  317. if err == redis.TxFailedErr {
  318. return incr(key)
  319. }
  320. return err
  321. }
  322. var wg sync.WaitGroup
  323. for i := 0; i < 100; i++ {
  324. wg.Add(1)
  325. go func() {
  326. defer GinkgoRecover()
  327. defer wg.Done()
  328. err := incr("key")
  329. Expect(err).NotTo(HaveOccurred())
  330. }()
  331. }
  332. wg.Wait()
  333. Eventually(func() string {
  334. return client.Get("key").Val()
  335. }, 30*time.Second).Should(Equal("100"))
  336. })
  337. Describe("pipelining", func() {
  338. var pipe *redis.Pipeline
  339. assertPipeline := func() {
  340. keys := []string{"A", "B", "C", "D", "E", "F", "G"}
  341. It("follows redirects", func() {
  342. if !failover {
  343. for _, key := range keys {
  344. Eventually(func() error {
  345. return client.SwapNodes(key)
  346. }, 30*time.Second).ShouldNot(HaveOccurred())
  347. }
  348. }
  349. for i, key := range keys {
  350. pipe.Set(key, key+"_value", 0)
  351. pipe.Expire(key, time.Duration(i+1)*time.Hour)
  352. }
  353. cmds, err := pipe.Exec()
  354. Expect(err).NotTo(HaveOccurred())
  355. Expect(cmds).To(HaveLen(14))
  356. _ = client.ForEachNode(func(node *redis.Client) error {
  357. defer GinkgoRecover()
  358. Eventually(func() int64 {
  359. return node.DBSize().Val()
  360. }, 30*time.Second).ShouldNot(BeZero())
  361. return nil
  362. })
  363. if !failover {
  364. for _, key := range keys {
  365. Eventually(func() error {
  366. return client.SwapNodes(key)
  367. }, 30*time.Second).ShouldNot(HaveOccurred())
  368. }
  369. }
  370. for _, key := range keys {
  371. pipe.Get(key)
  372. pipe.TTL(key)
  373. }
  374. cmds, err = pipe.Exec()
  375. Expect(err).NotTo(HaveOccurred())
  376. Expect(cmds).To(HaveLen(14))
  377. for i, key := range keys {
  378. get := cmds[i*2].(*redis.StringCmd)
  379. Expect(get.Val()).To(Equal(key + "_value"))
  380. ttl := cmds[(i*2)+1].(*redis.DurationCmd)
  381. dur := time.Duration(i+1) * time.Hour
  382. Expect(ttl.Val()).To(BeNumerically("~", dur, 30*time.Second))
  383. }
  384. })
  385. It("works with missing keys", func() {
  386. pipe.Set("A", "A_value", 0)
  387. pipe.Set("C", "C_value", 0)
  388. _, err := pipe.Exec()
  389. Expect(err).NotTo(HaveOccurred())
  390. a := pipe.Get("A")
  391. b := pipe.Get("B")
  392. c := pipe.Get("C")
  393. cmds, err := pipe.Exec()
  394. Expect(err).To(Equal(redis.Nil))
  395. Expect(cmds).To(HaveLen(3))
  396. Expect(a.Err()).NotTo(HaveOccurred())
  397. Expect(a.Val()).To(Equal("A_value"))
  398. Expect(b.Err()).To(Equal(redis.Nil))
  399. Expect(b.Val()).To(Equal(""))
  400. Expect(c.Err()).NotTo(HaveOccurred())
  401. Expect(c.Val()).To(Equal("C_value"))
  402. })
  403. }
  404. Describe("with Pipeline", func() {
  405. BeforeEach(func() {
  406. pipe = client.Pipeline().(*redis.Pipeline)
  407. })
  408. AfterEach(func() {
  409. Expect(pipe.Close()).NotTo(HaveOccurred())
  410. })
  411. assertPipeline()
  412. })
  413. Describe("with TxPipeline", func() {
  414. BeforeEach(func() {
  415. pipe = client.TxPipeline().(*redis.Pipeline)
  416. })
  417. AfterEach(func() {
  418. Expect(pipe.Close()).NotTo(HaveOccurred())
  419. })
  420. assertPipeline()
  421. })
  422. })
  423. It("supports PubSub", func() {
  424. pubsub := client.Subscribe("mychannel")
  425. defer pubsub.Close()
  426. Eventually(func() error {
  427. _, err := client.Publish("mychannel", "hello").Result()
  428. if err != nil {
  429. return err
  430. }
  431. msg, err := pubsub.ReceiveTimeout(time.Second)
  432. if err != nil {
  433. return err
  434. }
  435. _, ok := msg.(*redis.Message)
  436. if !ok {
  437. return fmt.Errorf("got %T, wanted *redis.Message", msg)
  438. }
  439. return nil
  440. }, 30*time.Second).ShouldNot(HaveOccurred())
  441. })
  442. It("supports PubSub.Ping without channels", func() {
  443. pubsub := client.Subscribe()
  444. defer pubsub.Close()
  445. err := pubsub.Ping()
  446. Expect(err).NotTo(HaveOccurred())
  447. })
  448. }
  449. Describe("ClusterClient", func() {
  450. BeforeEach(func() {
  451. opt = redisClusterOptions()
  452. client = cluster.clusterClient(opt)
  453. err := client.ForEachMaster(func(master *redis.Client) error {
  454. return master.FlushDB().Err()
  455. })
  456. Expect(err).NotTo(HaveOccurred())
  457. })
  458. AfterEach(func() {
  459. _ = client.ForEachMaster(func(master *redis.Client) error {
  460. return master.FlushDB().Err()
  461. })
  462. Expect(client.Close()).NotTo(HaveOccurred())
  463. })
  464. It("returns pool stats", func() {
  465. stats := client.PoolStats()
  466. Expect(stats).To(BeAssignableToTypeOf(&redis.PoolStats{}))
  467. })
  468. It("returns an error when there are no attempts left", func() {
  469. opt := redisClusterOptions()
  470. opt.MaxRedirects = -1
  471. client := cluster.clusterClient(opt)
  472. Eventually(func() error {
  473. return client.SwapNodes("A")
  474. }, 30*time.Second).ShouldNot(HaveOccurred())
  475. err := client.Get("A").Err()
  476. Expect(err).To(HaveOccurred())
  477. Expect(err.Error()).To(ContainSubstring("MOVED"))
  478. Expect(client.Close()).NotTo(HaveOccurred())
  479. })
  480. It("calls fn for every master node", func() {
  481. for i := 0; i < 10; i++ {
  482. Expect(client.Set(strconv.Itoa(i), "", 0).Err()).NotTo(HaveOccurred())
  483. }
  484. err := client.ForEachMaster(func(master *redis.Client) error {
  485. return master.FlushDB().Err()
  486. })
  487. Expect(err).NotTo(HaveOccurred())
  488. size, err := client.DBSize().Result()
  489. Expect(err).NotTo(HaveOccurred())
  490. Expect(size).To(Equal(int64(0)))
  491. })
  492. It("should CLUSTER SLOTS", func() {
  493. res, err := client.ClusterSlots().Result()
  494. Expect(err).NotTo(HaveOccurred())
  495. Expect(res).To(HaveLen(3))
  496. wanted := []redis.ClusterSlot{{
  497. Start: 0,
  498. End: 4999,
  499. Nodes: []redis.ClusterNode{{
  500. ID: "",
  501. Addr: "127.0.0.1:8220",
  502. }, {
  503. ID: "",
  504. Addr: "127.0.0.1:8223",
  505. }},
  506. }, {
  507. Start: 5000,
  508. End: 9999,
  509. Nodes: []redis.ClusterNode{{
  510. ID: "",
  511. Addr: "127.0.0.1:8221",
  512. }, {
  513. ID: "",
  514. Addr: "127.0.0.1:8224",
  515. }},
  516. }, {
  517. Start: 10000,
  518. End: 16383,
  519. Nodes: []redis.ClusterNode{{
  520. ID: "",
  521. Addr: "127.0.0.1:8222",
  522. }, {
  523. ID: "",
  524. Addr: "127.0.0.1:8225",
  525. }},
  526. }}
  527. Expect(assertSlotsEqual(res, wanted)).NotTo(HaveOccurred())
  528. })
  529. It("should CLUSTER NODES", func() {
  530. res, err := client.ClusterNodes().Result()
  531. Expect(err).NotTo(HaveOccurred())
  532. Expect(len(res)).To(BeNumerically(">", 400))
  533. })
  534. It("should CLUSTER INFO", func() {
  535. res, err := client.ClusterInfo().Result()
  536. Expect(err).NotTo(HaveOccurred())
  537. Expect(res).To(ContainSubstring("cluster_known_nodes:6"))
  538. })
  539. It("should CLUSTER KEYSLOT", func() {
  540. hashSlot, err := client.ClusterKeySlot("somekey").Result()
  541. Expect(err).NotTo(HaveOccurred())
  542. Expect(hashSlot).To(Equal(int64(hashtag.Slot("somekey"))))
  543. })
  544. It("should CLUSTER GETKEYSINSLOT", func() {
  545. keys, err := client.ClusterGetKeysInSlot(hashtag.Slot("somekey"), 1).Result()
  546. Expect(err).NotTo(HaveOccurred())
  547. Expect(len(keys)).To(Equal(0))
  548. })
  549. It("should CLUSTER COUNT-FAILURE-REPORTS", func() {
  550. n, err := client.ClusterCountFailureReports(cluster.nodeIDs[0]).Result()
  551. Expect(err).NotTo(HaveOccurred())
  552. Expect(n).To(Equal(int64(0)))
  553. })
  554. It("should CLUSTER COUNTKEYSINSLOT", func() {
  555. n, err := client.ClusterCountKeysInSlot(10).Result()
  556. Expect(err).NotTo(HaveOccurred())
  557. Expect(n).To(Equal(int64(0)))
  558. })
  559. It("should CLUSTER SAVECONFIG", func() {
  560. res, err := client.ClusterSaveConfig().Result()
  561. Expect(err).NotTo(HaveOccurred())
  562. Expect(res).To(Equal("OK"))
  563. })
  564. It("should CLUSTER SLAVES", func() {
  565. nodesList, err := client.ClusterSlaves(cluster.nodeIDs[0]).Result()
  566. Expect(err).NotTo(HaveOccurred())
  567. Expect(nodesList).Should(ContainElement(ContainSubstring("slave")))
  568. Expect(nodesList).Should(HaveLen(1))
  569. })
  570. It("should RANDOMKEY", func() {
  571. const nkeys = 100
  572. for i := 0; i < nkeys; i++ {
  573. err := client.Set(fmt.Sprintf("key%d", i), "value", 0).Err()
  574. Expect(err).NotTo(HaveOccurred())
  575. }
  576. var keys []string
  577. addKey := func(key string) {
  578. for _, k := range keys {
  579. if k == key {
  580. return
  581. }
  582. }
  583. keys = append(keys, key)
  584. }
  585. for i := 0; i < nkeys*10; i++ {
  586. key := client.RandomKey().Val()
  587. addKey(key)
  588. }
  589. Expect(len(keys)).To(BeNumerically("~", nkeys, nkeys/10))
  590. })
  591. assertClusterClient()
  592. })
  593. Describe("ClusterClient failover", func() {
  594. BeforeEach(func() {
  595. failover = true
  596. opt = redisClusterOptions()
  597. opt.MinRetryBackoff = 250 * time.Millisecond
  598. opt.MaxRetryBackoff = time.Second
  599. client = cluster.clusterClient(opt)
  600. err := client.ForEachMaster(func(master *redis.Client) error {
  601. return master.FlushDB().Err()
  602. })
  603. Expect(err).NotTo(HaveOccurred())
  604. err = client.ForEachSlave(func(slave *redis.Client) error {
  605. defer GinkgoRecover()
  606. Eventually(func() int64 {
  607. return slave.DBSize().Val()
  608. }, "30s").Should(Equal(int64(0)))
  609. return nil
  610. })
  611. Expect(err).NotTo(HaveOccurred())
  612. state, err := client.LoadState()
  613. Eventually(func() bool {
  614. state, err = client.LoadState()
  615. if err != nil {
  616. return false
  617. }
  618. return state.IsConsistent()
  619. }, "30s").Should(BeTrue())
  620. for _, slave := range state.Slaves {
  621. err = slave.Client.ClusterFailover().Err()
  622. Expect(err).NotTo(HaveOccurred())
  623. Eventually(func() bool {
  624. state, _ := client.LoadState()
  625. return state.IsConsistent()
  626. }, "30s").Should(BeTrue())
  627. }
  628. })
  629. AfterEach(func() {
  630. failover = false
  631. Expect(client.Close()).NotTo(HaveOccurred())
  632. })
  633. assertClusterClient()
  634. })
  635. Describe("ClusterClient with RouteByLatency", func() {
  636. BeforeEach(func() {
  637. opt = redisClusterOptions()
  638. opt.RouteByLatency = true
  639. client = cluster.clusterClient(opt)
  640. err := client.ForEachMaster(func(master *redis.Client) error {
  641. return master.FlushDB().Err()
  642. })
  643. Expect(err).NotTo(HaveOccurred())
  644. err = client.ForEachSlave(func(slave *redis.Client) error {
  645. Eventually(func() int64 {
  646. return client.DBSize().Val()
  647. }, 30*time.Second).Should(Equal(int64(0)))
  648. return nil
  649. })
  650. Expect(err).NotTo(HaveOccurred())
  651. })
  652. AfterEach(func() {
  653. err := client.ForEachSlave(func(slave *redis.Client) error {
  654. return slave.ReadWrite().Err()
  655. })
  656. Expect(err).NotTo(HaveOccurred())
  657. err = client.Close()
  658. Expect(err).NotTo(HaveOccurred())
  659. })
  660. assertClusterClient()
  661. })
  662. Describe("ClusterClient with ClusterSlots", func() {
  663. BeforeEach(func() {
  664. failover = true
  665. opt = redisClusterOptions()
  666. opt.ClusterSlots = func() ([]redis.ClusterSlot, error) {
  667. slots := []redis.ClusterSlot{{
  668. Start: 0,
  669. End: 4999,
  670. Nodes: []redis.ClusterNode{{
  671. Addr: ":" + ringShard1Port,
  672. }},
  673. }, {
  674. Start: 5000,
  675. End: 9999,
  676. Nodes: []redis.ClusterNode{{
  677. Addr: ":" + ringShard2Port,
  678. }},
  679. }, {
  680. Start: 10000,
  681. End: 16383,
  682. Nodes: []redis.ClusterNode{{
  683. Addr: ":" + ringShard3Port,
  684. }},
  685. }}
  686. return slots, nil
  687. }
  688. client = cluster.clusterClient(opt)
  689. err := client.ForEachMaster(func(master *redis.Client) error {
  690. return master.FlushDB().Err()
  691. })
  692. Expect(err).NotTo(HaveOccurred())
  693. err = client.ForEachSlave(func(slave *redis.Client) error {
  694. Eventually(func() int64 {
  695. return client.DBSize().Val()
  696. }, 30*time.Second).Should(Equal(int64(0)))
  697. return nil
  698. })
  699. Expect(err).NotTo(HaveOccurred())
  700. })
  701. AfterEach(func() {
  702. failover = false
  703. err := client.Close()
  704. Expect(err).NotTo(HaveOccurred())
  705. })
  706. assertClusterClient()
  707. })
  708. Describe("ClusterClient with RouteRandomly and ClusterSlots", func() {
  709. BeforeEach(func() {
  710. failover = true
  711. opt = redisClusterOptions()
  712. opt.RouteRandomly = true
  713. opt.ClusterSlots = func() ([]redis.ClusterSlot, error) {
  714. slots := []redis.ClusterSlot{{
  715. Start: 0,
  716. End: 4999,
  717. Nodes: []redis.ClusterNode{{
  718. Addr: ":" + ringShard1Port,
  719. }},
  720. }, {
  721. Start: 5000,
  722. End: 9999,
  723. Nodes: []redis.ClusterNode{{
  724. Addr: ":" + ringShard2Port,
  725. }},
  726. }, {
  727. Start: 10000,
  728. End: 16383,
  729. Nodes: []redis.ClusterNode{{
  730. Addr: ":" + ringShard3Port,
  731. }},
  732. }}
  733. return slots, nil
  734. }
  735. client = cluster.clusterClient(opt)
  736. err := client.ForEachMaster(func(master *redis.Client) error {
  737. return master.FlushDB().Err()
  738. })
  739. Expect(err).NotTo(HaveOccurred())
  740. err = client.ForEachSlave(func(slave *redis.Client) error {
  741. Eventually(func() int64 {
  742. return client.DBSize().Val()
  743. }, 30*time.Second).Should(Equal(int64(0)))
  744. return nil
  745. })
  746. Expect(err).NotTo(HaveOccurred())
  747. })
  748. AfterEach(func() {
  749. failover = false
  750. err := client.Close()
  751. Expect(err).NotTo(HaveOccurred())
  752. })
  753. assertClusterClient()
  754. })
  755. })
  756. var _ = Describe("ClusterClient without nodes", func() {
  757. var client *redis.ClusterClient
  758. BeforeEach(func() {
  759. client = redis.NewClusterClient(&redis.ClusterOptions{})
  760. })
  761. AfterEach(func() {
  762. Expect(client.Close()).NotTo(HaveOccurred())
  763. })
  764. It("Ping returns an error", func() {
  765. err := client.Ping().Err()
  766. Expect(err).To(MatchError("redis: cluster has no nodes"))
  767. })
  768. It("pipeline returns an error", func() {
  769. _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
  770. pipe.Ping()
  771. return nil
  772. })
  773. Expect(err).To(MatchError("redis: cluster has no nodes"))
  774. })
  775. })
  776. var _ = Describe("ClusterClient without valid nodes", func() {
  777. var client *redis.ClusterClient
  778. BeforeEach(func() {
  779. client = redis.NewClusterClient(&redis.ClusterOptions{
  780. Addrs: []string{redisAddr},
  781. })
  782. })
  783. AfterEach(func() {
  784. Expect(client.Close()).NotTo(HaveOccurred())
  785. })
  786. It("returns an error", func() {
  787. err := client.Ping().Err()
  788. Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
  789. })
  790. It("pipeline returns an error", func() {
  791. _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
  792. pipe.Ping()
  793. return nil
  794. })
  795. Expect(err).To(MatchError("ERR This instance has cluster support disabled"))
  796. })
  797. })
  798. var _ = Describe("ClusterClient with unavailable Cluster", func() {
  799. var client *redis.ClusterClient
  800. BeforeEach(func() {
  801. for _, node := range cluster.clients {
  802. err := node.ClientPause(5 * time.Second).Err()
  803. Expect(err).NotTo(HaveOccurred())
  804. }
  805. opt := redisClusterOptions()
  806. opt.ReadTimeout = 250 * time.Millisecond
  807. opt.WriteTimeout = 250 * time.Millisecond
  808. opt.MaxRedirects = 1
  809. client = cluster.clusterClientUnsafe(opt)
  810. })
  811. AfterEach(func() {
  812. Expect(client.Close()).NotTo(HaveOccurred())
  813. })
  814. It("recovers when Cluster recovers", func() {
  815. err := client.Ping().Err()
  816. Expect(err).To(HaveOccurred())
  817. Eventually(func() error {
  818. return client.Ping().Err()
  819. }, "30s").ShouldNot(HaveOccurred())
  820. })
  821. })
  822. var _ = Describe("ClusterClient timeout", func() {
  823. var client *redis.ClusterClient
  824. AfterEach(func() {
  825. _ = client.Close()
  826. })
  827. testTimeout := func() {
  828. It("Ping timeouts", func() {
  829. err := client.Ping().Err()
  830. Expect(err).To(HaveOccurred())
  831. Expect(err.(net.Error).Timeout()).To(BeTrue())
  832. })
  833. It("Pipeline timeouts", func() {
  834. _, err := client.Pipelined(func(pipe redis.Pipeliner) error {
  835. pipe.Ping()
  836. return nil
  837. })
  838. Expect(err).To(HaveOccurred())
  839. Expect(err.(net.Error).Timeout()).To(BeTrue())
  840. })
  841. It("Tx timeouts", func() {
  842. err := client.Watch(func(tx *redis.Tx) error {
  843. return tx.Ping().Err()
  844. }, "foo")
  845. Expect(err).To(HaveOccurred())
  846. Expect(err.(net.Error).Timeout()).To(BeTrue())
  847. })
  848. It("Tx Pipeline timeouts", func() {
  849. err := client.Watch(func(tx *redis.Tx) error {
  850. _, err := tx.TxPipelined(func(pipe redis.Pipeliner) error {
  851. pipe.Ping()
  852. return nil
  853. })
  854. return err
  855. }, "foo")
  856. Expect(err).To(HaveOccurred())
  857. Expect(err.(net.Error).Timeout()).To(BeTrue())
  858. })
  859. }
  860. const pause = 5 * time.Second
  861. Context("read/write timeout", func() {
  862. BeforeEach(func() {
  863. opt := redisClusterOptions()
  864. opt.ReadTimeout = 250 * time.Millisecond
  865. opt.WriteTimeout = 250 * time.Millisecond
  866. opt.MaxRedirects = 1
  867. client = cluster.clusterClient(opt)
  868. err := client.ForEachNode(func(client *redis.Client) error {
  869. return client.ClientPause(pause).Err()
  870. })
  871. Expect(err).NotTo(HaveOccurred())
  872. })
  873. AfterEach(func() {
  874. _ = client.ForEachNode(func(client *redis.Client) error {
  875. defer GinkgoRecover()
  876. Eventually(func() error {
  877. return client.Ping().Err()
  878. }, 2*pause).ShouldNot(HaveOccurred())
  879. return nil
  880. })
  881. })
  882. testTimeout()
  883. })
  884. })