functional_test.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148
  1. package sarama
  2. import (
  3. "log"
  4. "math/rand"
  5. "net"
  6. "os"
  7. "strconv"
  8. "strings"
  9. "testing"
  10. "time"
  11. toxiproxy "github.com/Shopify/toxiproxy/client"
  12. )
  13. const (
  14. VagrantToxiproxy = "http://192.168.100.67:8474"
  15. VagrantKafkaPeers = "192.168.100.67:9091,192.168.100.67:9092,192.168.100.67:9093,192.168.100.67:9094,192.168.100.67:9095"
  16. VagrantZookeeperPeers = "192.168.100.67:2181,192.168.100.67:2182,192.168.100.67:2183,192.168.100.67:2184,192.168.100.67:2185"
  17. )
  18. var (
  19. kafkaAvailable, kafkaRequired bool
  20. kafkaBrokers []string
  21. proxyClient *toxiproxy.Client
  22. Proxies map[string]*toxiproxy.Proxy
  23. ZKProxies = []string{"zk1", "zk2", "zk3", "zk4", "zk5"}
  24. KafkaProxies = []string{"kafka1", "kafka2", "kafka3", "kafka4", "kafka5"}
  25. )
  26. func init() {
  27. if os.Getenv("DEBUG") == "true" {
  28. Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  29. }
  30. seed := time.Now().UTC().UnixNano()
  31. if tmp := os.Getenv("TEST_SEED"); tmp != "" {
  32. seed, _ = strconv.ParseInt(tmp, 0, 64)
  33. }
  34. Logger.Println("Using random seed:", seed)
  35. rand.Seed(seed)
  36. proxyAddr := os.Getenv("TOXIPROXY_ADDR")
  37. if proxyAddr == "" {
  38. proxyAddr = VagrantToxiproxy
  39. }
  40. proxyClient = toxiproxy.NewClient(proxyAddr)
  41. kafkaPeers := os.Getenv("KAFKA_PEERS")
  42. if kafkaPeers == "" {
  43. kafkaPeers = VagrantKafkaPeers
  44. }
  45. kafkaBrokers = strings.Split(kafkaPeers, ",")
  46. if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil {
  47. if err = c.Close(); err == nil {
  48. kafkaAvailable = true
  49. }
  50. }
  51. kafkaRequired = os.Getenv("CI") != ""
  52. }
  53. func checkKafkaAvailability(t testing.TB) {
  54. if !kafkaAvailable {
  55. if kafkaRequired {
  56. t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
  57. } else {
  58. t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
  59. }
  60. }
  61. }
  62. func checkKafkaVersion(t testing.TB, requiredVersion string) {
  63. kafkaVersion := os.Getenv("KAFKA_VERSION")
  64. if kafkaVersion == "" {
  65. t.Logf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
  66. } else {
  67. available := parseKafkaVersion(kafkaVersion)
  68. required := parseKafkaVersion(requiredVersion)
  69. if !available.satisfies(required) {
  70. t.Skipf("Kafka version %s is required for this test; you have %s. Skipping...", requiredVersion, kafkaVersion)
  71. }
  72. }
  73. }
  74. func resetProxies(t testing.TB) {
  75. if err := proxyClient.ResetState(); err != nil {
  76. t.Error(err)
  77. }
  78. Proxies = nil
  79. }
  80. func fetchProxies(t testing.TB) {
  81. var err error
  82. Proxies, err = proxyClient.Proxies()
  83. if err != nil {
  84. t.Fatal(err)
  85. }
  86. }
  87. func SaveProxy(t *testing.T, px string) {
  88. if err := Proxies[px].Save(); err != nil {
  89. t.Fatal(err)
  90. }
  91. }
  92. func setupFunctionalTest(t testing.TB) {
  93. checkKafkaAvailability(t)
  94. resetProxies(t)
  95. fetchProxies(t)
  96. }
  97. func teardownFunctionalTest(t testing.TB) {
  98. resetProxies(t)
  99. }
  100. type kafkaVersion []int
  101. func (kv kafkaVersion) satisfies(other kafkaVersion) bool {
  102. var ov int
  103. for index, v := range kv {
  104. if len(other) <= index {
  105. ov = 0
  106. } else {
  107. ov = other[index]
  108. }
  109. if v < ov {
  110. return false
  111. } else if v > ov {
  112. return true
  113. }
  114. }
  115. return true
  116. }
  117. func parseKafkaVersion(version string) kafkaVersion {
  118. numbers := strings.Split(version, ".")
  119. result := make(kafkaVersion, 0, len(numbers))
  120. for _, number := range numbers {
  121. nr, _ := strconv.Atoi(number)
  122. result = append(result, nr)
  123. }
  124. return result
  125. }