123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146 |
- package sarama
- import (
- "log"
- "math/rand"
- "net"
- "os"
- "strconv"
- "strings"
- "testing"
- "time"
- toxiproxy "github.com/Shopify/toxiproxy/client"
- )
- const (
- VagrantToxiproxy = "http://192.168.100.67:8474"
- VagrantKafkaPeers = "192.168.100.67:9091,192.168.100.67:9092,192.168.100.67:9093,192.168.100.67:9094,192.168.100.67:9095"
- VagrantZookeeperPeers = "192.168.100.67:2181,192.168.100.67:2182,192.168.100.67:2183,192.168.100.67:2184,192.168.100.67:2185"
- )
- var (
- kafkaAvailable, kafkaRequired bool
- kafkaBrokers []string
- proxyClient *toxiproxy.Client
- Proxies map[string]*toxiproxy.Proxy
- )
- func init() {
- if os.Getenv("DEBUG") == "true" {
- Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
- }
- seed := time.Now().UTC().UnixNano()
- if tmp := os.Getenv("TEST_SEED"); tmp != "" {
- seed, _ = strconv.ParseInt(tmp, 0, 64)
- }
- Logger.Println("Using random seed:", seed)
- rand.Seed(seed)
- proxyAddr := os.Getenv("TOXIPROXY_ADDR")
- if proxyAddr == "" {
- proxyAddr = VagrantToxiproxy
- }
- proxyClient = toxiproxy.NewClient(proxyAddr)
- kafkaPeers := os.Getenv("KAFKA_PEERS")
- if kafkaPeers == "" {
- kafkaPeers = VagrantKafkaPeers
- }
- kafkaBrokers = strings.Split(kafkaPeers, ",")
- if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil {
- if err = c.Close(); err == nil {
- kafkaAvailable = true
- }
- }
- kafkaRequired = os.Getenv("CI") != ""
- }
- func checkKafkaAvailability(t testing.TB) {
- if !kafkaAvailable {
- if kafkaRequired {
- t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
- } else {
- t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
- }
- }
- }
- func checkKafkaVersion(t testing.TB, requiredVersion string) {
- kafkaVersion := os.Getenv("KAFKA_VERSION")
- if kafkaVersion == "" {
- t.Logf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
- } else {
- available := parseKafkaVersion(kafkaVersion)
- required := parseKafkaVersion(requiredVersion)
- if !available.satisfies(required) {
- t.Skipf("Kafka version %s is required for this test; you have %s. Skipping...", requiredVersion, kafkaVersion)
- }
- }
- }
- func resetProxies(t testing.TB) {
- if err := proxyClient.ResetState(); err != nil {
- t.Error(err)
- }
- Proxies = nil
- }
- func fetchProxies(t testing.TB) {
- var err error
- Proxies, err = proxyClient.Proxies()
- if err != nil {
- t.Fatal(err)
- }
- }
- func SaveProxy(t *testing.T, px string) {
- if err := Proxies[px].Save(); err != nil {
- t.Fatal(err)
- }
- }
- func setupFunctionalTest(t testing.TB) {
- checkKafkaAvailability(t)
- resetProxies(t)
- fetchProxies(t)
- }
- func teardownFunctionalTest(t testing.TB) {
- resetProxies(t)
- }
- type kafkaVersion []int
- func (kv kafkaVersion) satisfies(other kafkaVersion) bool {
- var ov int
- for index, v := range kv {
- if len(other) <= index {
- ov = 0
- } else {
- ov = other[index]
- }
- if v < ov {
- return false
- } else if v > ov {
- return true
- }
- }
- return true
- }
- func parseKafkaVersion(version string) kafkaVersion {
- numbers := strings.Split(version, ".")
- result := make(kafkaVersion, 0, len(numbers))
- for _, number := range numbers {
- nr, _ := strconv.Atoi(number)
- result = append(result, nr)
- }
- return result
- }
|