functional_test.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package sarama
  2. import (
  3. "log"
  4. "math/rand"
  5. "net"
  6. "os"
  7. "strconv"
  8. "strings"
  9. "testing"
  10. "time"
  11. toxiproxy "github.com/Shopify/toxiproxy/client"
  12. )
  13. const (
  14. VagrantToxiproxy = "http://192.168.100.67:8474"
  15. VagrantKafkaPeers = "192.168.100.67:9091,192.168.100.67:9092,192.168.100.67:9093,192.168.100.67:9094,192.168.100.67:9095"
  16. VagrantZookeeperPeers = "192.168.100.67:2181,192.168.100.67:2182,192.168.100.67:2183,192.168.100.67:2184,192.168.100.67:2185"
  17. )
  18. var (
  19. kafkaAvailable, kafkaRequired bool
  20. kafkaBrokers []string
  21. proxyClient *toxiproxy.Client
  22. Proxies map[string]*toxiproxy.Proxy
  23. )
  24. func init() {
  25. if os.Getenv("DEBUG") == "true" {
  26. Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  27. }
  28. seed := time.Now().UTC().UnixNano()
  29. if tmp := os.Getenv("TEST_SEED"); tmp != "" {
  30. seed, _ = strconv.ParseInt(tmp, 0, 64)
  31. }
  32. Logger.Println("Using random seed:", seed)
  33. rand.Seed(seed)
  34. proxyAddr := os.Getenv("TOXIPROXY_ADDR")
  35. if proxyAddr == "" {
  36. proxyAddr = VagrantToxiproxy
  37. }
  38. proxyClient = toxiproxy.NewClient(proxyAddr)
  39. kafkaPeers := os.Getenv("KAFKA_PEERS")
  40. if kafkaPeers == "" {
  41. kafkaPeers = VagrantKafkaPeers
  42. }
  43. kafkaBrokers = strings.Split(kafkaPeers, ",")
  44. if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil {
  45. if err = c.Close(); err == nil {
  46. kafkaAvailable = true
  47. }
  48. }
  49. kafkaRequired = os.Getenv("CI") != ""
  50. }
  51. func checkKafkaAvailability(t testing.TB) {
  52. if !kafkaAvailable {
  53. if kafkaRequired {
  54. t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
  55. } else {
  56. t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
  57. }
  58. }
  59. }
  60. func checkKafkaVersion(t testing.TB, requiredVersion string) {
  61. kafkaVersion := os.Getenv("KAFKA_VERSION")
  62. if kafkaVersion == "" {
  63. t.Logf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
  64. } else {
  65. available := parseKafkaVersion(kafkaVersion)
  66. required := parseKafkaVersion(requiredVersion)
  67. if !available.satisfies(required) {
  68. t.Skipf("Kafka version %s is required for this test; you have %s. Skipping...", requiredVersion, kafkaVersion)
  69. }
  70. }
  71. }
  72. func resetProxies(t testing.TB) {
  73. if err := proxyClient.ResetState(); err != nil {
  74. t.Error(err)
  75. }
  76. Proxies = nil
  77. }
  78. func fetchProxies(t testing.TB) {
  79. var err error
  80. Proxies, err = proxyClient.Proxies()
  81. if err != nil {
  82. t.Fatal(err)
  83. }
  84. }
  85. func SaveProxy(t *testing.T, px string) {
  86. if err := Proxies[px].Save(); err != nil {
  87. t.Fatal(err)
  88. }
  89. }
  90. func setupFunctionalTest(t testing.TB) {
  91. checkKafkaAvailability(t)
  92. resetProxies(t)
  93. fetchProxies(t)
  94. }
  95. func teardownFunctionalTest(t testing.TB) {
  96. resetProxies(t)
  97. }
  98. type kafkaVersion []int
  99. func (kv kafkaVersion) satisfies(other kafkaVersion) bool {
  100. var ov int
  101. for index, v := range kv {
  102. if len(other) <= index {
  103. ov = 0
  104. } else {
  105. ov = other[index]
  106. }
  107. if v < ov {
  108. return false
  109. } else if v > ov {
  110. return true
  111. }
  112. }
  113. return true
  114. }
  115. func parseKafkaVersion(version string) kafkaVersion {
  116. numbers := strings.Split(version, ".")
  117. result := make(kafkaVersion, 0, len(numbers))
  118. for _, number := range numbers {
  119. nr, _ := strconv.Atoi(number)
  120. result = append(result, nr)
  121. }
  122. return result
  123. }