فهرست منبع

Finalize most of the toxiproxy test framework

- Create `setupFunctionalTest()` and `teardownFunctionalTest()` that handle
  testing for kafka presence, resetting toxiproxy, etc, and use them everywhere.
- Rename `kafkaIsAvailable` to `kafkaAvailable` and `kafkaShouldBeAvailable` to
  `kafkaRequired`.
- Implement a couple of helper methods for managing proxies.
- Use toxiproxy for `TestFuncConnectionFailure` so that we know there won't ever
  be anything else listening on that port.
Evan Huus 10 سال پیش
والد
کامیت
f0fc7e9247
4فایلهای تغییر یافته به همراه66 افزوده شده و 17 حذف شده
  1. 11 3
      functional_client_test.go
  2. 4 2
      functional_consumer_test.go
  3. 8 4
      functional_producer_test.go
  4. 43 8
      functional_test.go

+ 11 - 3
functional_client_test.go

@@ -7,17 +7,24 @@ import (
 )
 )
 
 
 func TestFuncConnectionFailure(t *testing.T) {
 func TestFuncConnectionFailure(t *testing.T) {
+	setupFunctionalTest(t)
+	defer teardownFunctionalTest(t)
+
+	Proxies["kafka1"].Enabled = false
+	SaveProxy(t, "kafka1")
+
 	config := NewConfig()
 	config := NewConfig()
 	config.Metadata.Retry.Max = 1
 	config.Metadata.Retry.Max = 1
 
 
-	_, err := NewClient([]string{"localhost:9000"}, config)
+	_, err := NewClient([]string{kafkaBrokers[0]}, config)
 	if err != ErrOutOfBrokers {
 	if err != ErrOutOfBrokers {
 		t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
 		t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
 	}
 	}
 }
 }
 
 
 func TestFuncClientMetadata(t *testing.T) {
 func TestFuncClientMetadata(t *testing.T) {
-	checkKafkaAvailability(t)
+	setupFunctionalTest(t)
+	defer teardownFunctionalTest(t)
 
 
 	config := NewConfig()
 	config := NewConfig()
 	config.Metadata.Retry.Max = 1
 	config.Metadata.Retry.Max = 1
@@ -60,7 +67,8 @@ func TestFuncClientMetadata(t *testing.T) {
 
 
 func TestFuncClientCoordinator(t *testing.T) {
 func TestFuncClientCoordinator(t *testing.T) {
 	checkKafkaVersion(t, "0.8.2")
 	checkKafkaVersion(t, "0.8.2")
-	checkKafkaAvailability(t)
+	setupFunctionalTest(t)
+	defer teardownFunctionalTest(t)
 
 
 	client, err := NewClient(kafkaBrokers, nil)
 	client, err := NewClient(kafkaBrokers, nil)
 	if err != nil {
 	if err != nil {

+ 4 - 2
functional_consumer_test.go

@@ -6,7 +6,8 @@ import (
 )
 )
 
 
 func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
 func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
-	checkKafkaAvailability(t)
+	setupFunctionalTest(t)
+	defer teardownFunctionalTest(t)
 
 
 	consumer, err := NewConsumer(kafkaBrokers, nil)
 	consumer, err := NewConsumer(kafkaBrokers, nil)
 	if err != nil {
 	if err != nil {
@@ -25,7 +26,8 @@ func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
 }
 }
 
 
 func TestConsumerHighWaterMarkOffset(t *testing.T) {
 func TestConsumerHighWaterMarkOffset(t *testing.T) {
-	checkKafkaAvailability(t)
+	setupFunctionalTest(t)
+	defer teardownFunctionalTest(t)
 
 
 	p, err := NewSyncProducer(kafkaBrokers, nil)
 	p, err := NewSyncProducer(kafkaBrokers, nil)
 	if err != nil {
 	if err != nil {

+ 8 - 4
functional_producer_test.go

@@ -40,7 +40,8 @@ func TestFuncProducingFlushing(t *testing.T) {
 }
 }
 
 
 func TestFuncMultiPartitionProduce(t *testing.T) {
 func TestFuncMultiPartitionProduce(t *testing.T) {
-	checkKafkaAvailability(t)
+	setupFunctionalTest(t)
+	defer teardownFunctionalTest(t)
 
 
 	config := NewConfig()
 	config := NewConfig()
 	config.ChannelBufferSize = 20
 	config.ChannelBufferSize = 20
@@ -72,7 +73,8 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
 }
 }
 
 
 func TestFuncProducingToInvalidTopic(t *testing.T) {
 func TestFuncProducingToInvalidTopic(t *testing.T) {
-	checkKafkaAvailability(t)
+	setupFunctionalTest(t)
+	defer teardownFunctionalTest(t)
 
 
 	producer, err := NewSyncProducer(kafkaBrokers, nil)
 	producer, err := NewSyncProducer(kafkaBrokers, nil)
 	if err != nil {
 	if err != nil {
@@ -91,7 +93,8 @@ func TestFuncProducingToInvalidTopic(t *testing.T) {
 }
 }
 
 
 func testProducingMessages(t *testing.T, config *Config) {
 func testProducingMessages(t *testing.T, config *Config) {
-	checkKafkaAvailability(t)
+	setupFunctionalTest(t)
+	defer teardownFunctionalTest(t)
 
 
 	config.Producer.Return.Successes = true
 	config.Producer.Return.Successes = true
 	config.Consumer.Return.Errors = true
 	config.Consumer.Return.Errors = true
@@ -177,7 +180,8 @@ func BenchmarkProducerMediumSnappy(b *testing.B) {
 }
 }
 
 
 func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) {
 func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder) {
-	checkKafkaAvailability(b)
+	setupFunctionalTest(b)
+	defer teardownFunctionalTest(b)
 
 
 	producer, err := NewAsyncProducer(kafkaBrokers, conf)
 	producer, err := NewAsyncProducer(kafkaBrokers, conf)
 	if err != nil {
 	if err != nil {

+ 43 - 8
functional_test.go

@@ -20,9 +20,13 @@ const (
 )
 )
 
 
 var (
 var (
-	kafkaIsAvailable, kafkaShouldBeAvailable bool
-	kafkaBrokers                             []string
-	proxy                                    *toxiproxy.Client
+	kafkaAvailable, kafkaRequired bool
+	kafkaBrokers                  []string
+
+	proxyClient  *toxiproxy.Client
+	Proxies      map[string]*toxiproxy.Proxy
+	ZKProxies    = []string{"zk1", "zk2", "zk3", "zk4", "zk5"}
+	KafkaProxies = []string{"kafka1", "kafka2", "kafka3", "kafka4", "kafka5"}
 )
 )
 
 
 func init() {
 func init() {
@@ -41,7 +45,7 @@ func init() {
 	if proxyAddr == "" {
 	if proxyAddr == "" {
 		proxyAddr = VagrantToxiproxy
 		proxyAddr = VagrantToxiproxy
 	}
 	}
-	proxy = toxiproxy.NewClient(proxyAddr)
+	proxyClient = toxiproxy.NewClient(proxyAddr)
 
 
 	kafkaPeers := os.Getenv("KAFKA_PEERS")
 	kafkaPeers := os.Getenv("KAFKA_PEERS")
 	if kafkaPeers == "" {
 	if kafkaPeers == "" {
@@ -51,16 +55,16 @@ func init() {
 
 
 	if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil {
 	if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil {
 		if err = c.Close(); err == nil {
 		if err = c.Close(); err == nil {
-			kafkaIsAvailable = true
+			kafkaAvailable = true
 		}
 		}
 	}
 	}
 
 
-	kafkaShouldBeAvailable = os.Getenv("CI") != ""
+	kafkaRequired = os.Getenv("CI") != ""
 }
 }
 
 
 func checkKafkaAvailability(t testing.TB) {
 func checkKafkaAvailability(t testing.TB) {
-	if !kafkaIsAvailable {
-		if kafkaShouldBeAvailable {
+	if !kafkaAvailable {
+		if kafkaRequired {
 			t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
 			t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
 		} else {
 		} else {
 			t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
 			t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
@@ -81,6 +85,37 @@ func checkKafkaVersion(t testing.TB, requiredVersion string) {
 	}
 	}
 }
 }
 
 
+func resetProxies(t testing.TB) {
+	if err := proxyClient.ResetState(); err != nil {
+		t.Error(err)
+	}
+	Proxies = nil
+}
+
+func fetchProxies(t testing.TB) {
+	var err error
+	Proxies, err = proxyClient.Proxies()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func SaveProxy(t *testing.T, px string) {
+	if err := Proxies[px].Save(); err != nil {
+		t.Fatal(err)
+	}
+}
+
+func setupFunctionalTest(t testing.TB) {
+	checkKafkaAvailability(t)
+	resetProxies(t)
+	fetchProxies(t)
+}
+
+func teardownFunctionalTest(t testing.TB) {
+	resetProxies(t)
+}
+
 type kafkaVersion []int
 type kafkaVersion []int
 
 
 func (kv kafkaVersion) satisfies(other kafkaVersion) bool {
 func (kv kafkaVersion) satisfies(other kafkaVersion) bool {