functional_test.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. package sarama
  2. import (
  3. "log"
  4. "math/rand"
  5. "net"
  6. "os"
  7. "strconv"
  8. "strings"
  9. "testing"
  10. "time"
  11. toxiproxy "github.com/Shopify/toxiproxy/client"
  12. )
  13. const (
  14. VagrantToxiproxy = "http://192.168.100.67:8474"
  15. VagrantKafkaPeers = "192.168.100.67:9091,192.168.100.67:9092,192.168.100.67:9093,192.168.100.67:9094,192.168.100.67:9095"
  16. VagrantZookeeperPeers = "192.168.100.67:2181,192.168.100.67:2182,192.168.100.67:2183,192.168.100.67:2184,192.168.100.67:2185"
  17. )
  18. var (
  19. kafkaIsAvailable, kafkaShouldBeAvailable bool
  20. kafkaBrokers []string
  21. proxy *toxiproxy.Client
  22. )
  23. func init() {
  24. if os.Getenv("DEBUG") == "true" {
  25. Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  26. }
  27. seed := time.Now().UTC().UnixNano()
  28. if tmp := os.Getenv("TEST_SEED"); tmp != "" {
  29. seed, _ = strconv.ParseInt(tmp, 0, 64)
  30. }
  31. Logger.Println("Using random seed:", seed)
  32. rand.Seed(seed)
  33. proxyAddr := os.Getenv("TOXIPROXY_ADDR")
  34. if proxyAddr == "" {
  35. proxyAddr = VagrantToxiproxy
  36. }
  37. proxy = toxiproxy.NewClient(proxyAddr)
  38. kafkaPeers := os.Getenv("KAFKA_PEERS")
  39. if kafkaPeers == "" {
  40. kafkaPeers = VagrantKafkaPeers
  41. }
  42. kafkaBrokers = strings.Split(kafkaPeers, ",")
  43. if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil {
  44. if err = c.Close(); err == nil {
  45. kafkaIsAvailable = true
  46. }
  47. }
  48. kafkaShouldBeAvailable = os.Getenv("CI") != ""
  49. }
  50. func checkKafkaAvailability(t testing.TB) {
  51. if !kafkaIsAvailable {
  52. if kafkaShouldBeAvailable {
  53. t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
  54. } else {
  55. t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
  56. }
  57. }
  58. }
  59. func checkKafkaVersion(t testing.TB, requiredVersion string) {
  60. kafkaVersion := os.Getenv("KAFKA_VERSION")
  61. if kafkaVersion == "" {
  62. t.Logf("No KAFKA_VERSION set. This tests requires Kafka version %s or higher. Continuing...", requiredVersion)
  63. } else {
  64. available := parseKafkaVersion(kafkaVersion)
  65. required := parseKafkaVersion(requiredVersion)
  66. if !available.satisfies(required) {
  67. t.Skipf("Kafka version %s is required for this test; you have %s. Skipping...", requiredVersion, kafkaVersion)
  68. }
  69. }
  70. }
  71. type kafkaVersion []int
  72. func (kv kafkaVersion) satisfies(other kafkaVersion) bool {
  73. var ov int
  74. for index, v := range kv {
  75. if len(other) <= index {
  76. ov = 0
  77. } else {
  78. ov = other[index]
  79. }
  80. if v < ov {
  81. return false
  82. }
  83. }
  84. return true
  85. }
  86. func parseKafkaVersion(version string) kafkaVersion {
  87. numbers := strings.Split(version, ".")
  88. result := make(kafkaVersion, 0, len(numbers))
  89. for _, number := range numbers {
  90. nr, _ := strconv.Atoi(number)
  91. result = append(result, nr)
  92. }
  93. return result
  94. }