functional_test.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. package sarama
  2. import (
  3. "log"
  4. "net"
  5. "os"
  6. "strconv"
  7. "strings"
  8. "testing"
  9. "time"
  10. )
  11. const (
  12. VagrantKafkaPeers = "192.168.100.67:6667,192.168.100.67:6668,192.168.100.67:6669,192.168.100.67:6670,192.168.100.67:6671"
  13. VagrantZookeeperPeers = "192.168.100.67:2181,192.168.100.67:2182,192.168.100.67:2183,192.168.100.67:2184,192.168.100.67:2185"
  14. )
  15. var (
  16. kafkaIsAvailable, kafkaShouldBeAvailable bool
  17. kafkaBrokers []string
  18. )
  19. func init() {
  20. kafkaPeers := os.Getenv("KAFKA_PEERS")
  21. if kafkaPeers == "" {
  22. kafkaPeers = VagrantKafkaPeers
  23. }
  24. kafkaBrokers = strings.Split(kafkaPeers, ",")
  25. if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil {
  26. if err = c.Close(); err == nil {
  27. kafkaIsAvailable = true
  28. }
  29. }
  30. kafkaShouldBeAvailable = os.Getenv("CI") != ""
  31. if os.Getenv("DEBUG") == "true" {
  32. Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
  33. }
  34. }
  35. func checkKafkaAvailability(t *testing.T) {
  36. if !kafkaIsAvailable {
  37. if kafkaShouldBeAvailable {
  38. t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
  39. } else {
  40. t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
  41. }
  42. }
  43. }
  44. func checkKafkaVersion(t *testing.T, requiredVersion string) {
  45. kafkaVersion := os.Getenv("KAFKA_VERSION")
  46. if kafkaVersion == "" {
  47. t.Logf("No KAFKA_VERSION set. This tests requires Kafka version %s or higher. Continuing...", requiredVersion)
  48. } else {
  49. available := parseKafkaVersion(kafkaVersion)
  50. required := parseKafkaVersion(requiredVersion)
  51. if !available.satisfies(required) {
  52. t.Skipf("Kafka version %s is required for this test; you have %s. Skipping...", requiredVersion, kafkaVersion)
  53. }
  54. }
  55. }
  56. type kafkaVersion []int
  57. func (kv kafkaVersion) satisfies(other kafkaVersion) bool {
  58. var ov int
  59. for index, v := range kv {
  60. if len(other) <= index {
  61. ov = 0
  62. } else {
  63. ov = other[index]
  64. }
  65. if v < ov {
  66. return false
  67. }
  68. }
  69. return true
  70. }
  71. func parseKafkaVersion(version string) kafkaVersion {
  72. numbers := strings.Split(version, ".")
  73. result := make(kafkaVersion, 0, len(numbers))
  74. for _, number := range numbers {
  75. nr, _ := strconv.Atoi(number)
  76. result = append(result, nr)
  77. }
  78. return result
  79. }