functional_test.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package sarama
  2. import (
  3. "log"
  4. "net"
  5. "os"
  6. "strconv"
  7. "strings"
  8. "testing"
  9. "time"
  10. toxiproxy "github.com/Shopify/toxiproxy/client"
  11. )
  12. const (
  13. VagrantToxiproxy = "http://192.168.100.67:8474"
  14. VagrantKafkaPeers = "192.168.100.67:9091,192.168.100.67:9092,192.168.100.67:9093,192.168.100.67:9094,192.168.100.67:9095"
  15. VagrantZookeeperPeers = "192.168.100.67:2181,192.168.100.67:2182,192.168.100.67:2183,192.168.100.67:2184,192.168.100.67:2185"
  16. )
  17. var (
  18. kafkaIsAvailable, kafkaShouldBeAvailable bool
  19. kafkaBrokers []string
  20. proxy *toxiproxy.Client
  21. )
  22. func init() {
  23. proxyAddr := os.Getenv("TOXIPROXY_ADDR")
  24. if proxyAddr == "" {
  25. proxyAddr = VagrantToxiproxy
  26. }
  27. proxy = toxiproxy.NewClient(proxyAddr)
  28. kafkaPeers := os.Getenv("KAFKA_PEERS")
  29. if kafkaPeers == "" {
  30. kafkaPeers = VagrantKafkaPeers
  31. }
  32. kafkaBrokers = strings.Split(kafkaPeers, ",")
  33. if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil {
  34. if err = c.Close(); err == nil {
  35. kafkaIsAvailable = true
  36. }
  37. }
  38. kafkaShouldBeAvailable = os.Getenv("CI") != ""
  39. if os.Getenv("DEBUG") == "true" {
  40. Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  41. }
  42. }
  43. func checkKafkaAvailability(t *testing.T) {
  44. if !kafkaIsAvailable {
  45. if kafkaShouldBeAvailable {
  46. t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
  47. } else {
  48. t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
  49. }
  50. }
  51. }
  52. func checkKafkaVersion(t *testing.T, requiredVersion string) {
  53. kafkaVersion := os.Getenv("KAFKA_VERSION")
  54. if kafkaVersion == "" {
  55. t.Logf("No KAFKA_VERSION set. This tests requires Kafka version %s or higher. Continuing...", requiredVersion)
  56. } else {
  57. available := parseKafkaVersion(kafkaVersion)
  58. required := parseKafkaVersion(requiredVersion)
  59. if !available.satisfies(required) {
  60. t.Skipf("Kafka version %s is required for this test; you have %s. Skipping...", requiredVersion, kafkaVersion)
  61. }
  62. }
  63. }
  64. type kafkaVersion []int
  65. func (kv kafkaVersion) satisfies(other kafkaVersion) bool {
  66. var ov int
  67. for index, v := range kv {
  68. if len(other) <= index {
  69. ov = 0
  70. } else {
  71. ov = other[index]
  72. }
  73. if v < ov {
  74. return false
  75. }
  76. }
  77. return true
  78. }
  79. func parseKafkaVersion(version string) kafkaVersion {
  80. numbers := strings.Split(version, ".")
  81. result := make(kafkaVersion, 0, len(numbers))
  82. for _, number := range numbers {
  83. nr, _ := strconv.Atoi(number)
  84. result = append(result, nr)
  85. }
  86. return result
  87. }