Forráskód Böngészése

Merge pull request #1695 from zendesk/ktsanaktsidis/docker_compose_tests

Use docker-compose to run the functional tests
Vlad Gorodetsky 5 éve
szülő
commit
ad70aac2ad

+ 1 - 13
.github/workflows/ci.yml

@@ -14,13 +14,8 @@ jobs:
         platform: [ubuntu-latest]
 
     env:
-      KAFKA_PEERS: localhost:9091,localhost:9092,localhost:9093,localhost:9094,localhost:9095
-      TOXIPROXY_ADDR: http://localhost:8474
-      KAFKA_INSTALL_ROOT: /home/runner/kafka
-      KAFKA_HOSTNAME: localhost
       DEBUG: true
       KAFKA_VERSION: ${{ matrix.kafka-version }}
-      KAFKA_SCALA_VERSION: 2.12
 
     steps:
     - uses: actions/checkout@v1
@@ -48,16 +43,9 @@ jobs:
       run: |
         curl -sSfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh | sh -s -- -b $(go env GOPATH)/bin v1.27.0
         export REPOSITORY_ROOT=${GITHUB_WORKSPACE}
-        vagrant/install_cluster.sh
-        vagrant/boot_cluster.sh
-        vagrant/create_topics.sh
-        vagrant/run_java_producer.sh
 
     - name: Run test suite
-      run: make test
+      run: make test_functional
 
     - name: Run linter
       run: make lint
-
-    - name: Teardown
-      run: vagrant/halt_cluster.sh

+ 3 - 0
.gitignore

@@ -25,3 +25,6 @@ _testmain.go
 
 coverage.txt
 profile.out
+
+simplest-uncommitted-msg-0.1-jar-with-dependencies.jar
+

+ 5 - 1
Makefile

@@ -21,7 +21,11 @@ fmt:
 	gofmt -s -l -w $(FILES) $(TESTS)
 
 lint:
-	golangci-lint run
+	GOFLAGS="-tags=functional" golangci-lint run
 
 test:
 	$(GOTEST) ./...
+
+.PHONY: test_functional
+test_functional:
+	$(GOTEST) -tags=functional ./...

+ 134 - 0
docker-compose.yml

@@ -0,0 +1,134 @@
+version: '3.7'
+services:
+  zookeeper-1:
+    image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
+    restart: always
+    environment:
+      ZOOKEEPER_SERVER_ID: '1'
+      ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888'
+      ZOOKEEPER_CLIENT_PORT: '2181'
+      ZOOKEEPER_PEER_PORT: '2888'
+      ZOOKEEPER_LEADER_PORT: '3888'
+      ZOOKEEPER_INIT_LIMIT: '10'
+      ZOOKEEPER_SYNC_LIMIT: '5'
+      ZOOKEEPER_MAX_CLIENT_CONNS: '0'
+  zookeeper-2:
+    image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
+    restart: always
+    environment:
+      ZOOKEEPER_SERVER_ID: '2'
+      ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888'
+      ZOOKEEPER_CLIENT_PORT: '2181'
+      ZOOKEEPER_PEER_PORT: '2888'
+      ZOOKEEPER_LEADER_PORT: '3888'
+      ZOOKEEPER_INIT_LIMIT: '10'
+      ZOOKEEPER_SYNC_LIMIT: '5'
+      ZOOKEEPER_MAX_CLIENT_CONNS: '0'
+  zookeeper-3:
+    image: 'confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
+    restart: always
+    environment:
+      ZOOKEEPER_SERVER_ID: '3'
+      ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888'
+      ZOOKEEPER_CLIENT_PORT: '2181'
+      ZOOKEEPER_PEER_PORT: '2888'
+      ZOOKEEPER_LEADER_PORT: '3888'
+      ZOOKEEPER_INIT_LIMIT: '10'
+      ZOOKEEPER_SYNC_LIMIT: '5'
+      ZOOKEEPER_MAX_CLIENT_CONNS: '0'
+  kafka-1:
+    image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
+    restart: always
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
+      KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29091'
+      KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-1:9091,LISTENER_LOCAL://localhost:29091'
+      KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
+      KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
+      KAFKA_BROKER_ID: '1'
+      KAFKA_BROKER_RACK: '1'
+      KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
+      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
+      KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
+      KAFKA_DELETE_TOPIC_ENABLE: 'true'
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
+  kafka-2:
+    image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
+    restart: always
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
+      KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29092'
+      KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-2:9091,LISTENER_LOCAL://localhost:29092'
+      KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
+      KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
+      KAFKA_BROKER_ID: '2'
+      KAFKA_BROKER_RACK: '2'
+      KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
+      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
+      KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
+      KAFKA_DELETE_TOPIC_ENABLE: 'true'
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
+  kafka-3:
+    image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
+    restart: always
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
+      KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29093'
+      KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-3:9091,LISTENER_LOCAL://localhost:29093'
+      KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
+      KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
+      KAFKA_BROKER_ID: '3'
+      KAFKA_BROKER_RACK: '3'
+      KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
+      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
+      KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
+      KAFKA_DELETE_TOPIC_ENABLE: 'true'
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
+  kafka-4:
+    image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
+    restart: always
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
+      KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29094'
+      KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-4:9091,LISTENER_LOCAL://localhost:29094'
+      KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
+      KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
+      KAFKA_BROKER_ID: '4'
+      KAFKA_BROKER_RACK: '4'
+      KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
+      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
+      KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
+      KAFKA_DELETE_TOPIC_ENABLE: 'true'
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
+  kafka-5:
+    image: 'confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-5.5.0}'
+    restart: always
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
+      KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29095'
+      KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-5:9091,LISTENER_LOCAL://localhost:29095'
+      KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
+      KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
+      KAFKA_BROKER_ID: '5'
+      KAFKA_BROKER_RACK: '5'
+      KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
+      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
+      KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
+      KAFKA_DELETE_TOPIC_ENABLE: 'true'
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
+  toxiproxy:
+    image: 'shopify/toxiproxy:2.1.4'
+    ports:
+      # The tests themselves actually start the proies on these ports
+      - '29091:29091'
+      - '29092:29092'
+      - '29093:29093'
+      - '29094:29094'
+      - '29095:29095'
+      # This is the toxiproxy API port
+      - '8474:8474'

+ 6 - 4
functional_client_test.go

@@ -1,3 +1,5 @@
+//+build functional
+
 package sarama
 
 import (
@@ -10,13 +12,13 @@ func TestFuncConnectionFailure(t *testing.T) {
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 
-	Proxies["kafka1"].Enabled = false
+	FunctionalTestEnv.Proxies["kafka1"].Enabled = false
 	SaveProxy(t, "kafka1")
 
 	config := NewConfig()
 	config.Metadata.Retry.Max = 1
 
-	_, err := NewClient([]string{kafkaBrokers[0]}, config)
+	_, err := NewClient([]string{FunctionalTestEnv.KafkaBrokerAddrs[0]}, config)
 	if err != ErrOutOfBrokers {
 		t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
 	}
@@ -29,7 +31,7 @@ func TestFuncClientMetadata(t *testing.T) {
 	config := NewConfig()
 	config.Metadata.Retry.Max = 1
 	config.Metadata.Retry.Backoff = 10 * time.Millisecond
-	client, err := NewClient(kafkaBrokers, config)
+	client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -70,7 +72,7 @@ func TestFuncClientCoordinator(t *testing.T) {
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 
-	client, err := NewClient(kafkaBrokers, nil)
+	client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 3 - 3
functional_consumer_group_test.go

@@ -1,4 +1,4 @@
-// +build go1.9
+//+build functional
 
 package sarama
 
@@ -153,7 +153,7 @@ func testFuncConsumerGroupID(t *testing.T) string {
 }
 
 func testFuncConsumerGroupFuzzySeed(topic string) error {
-	client, err := NewClient(kafkaBrokers, nil)
+	client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil)
 	if err != nil {
 		return err
 	}
@@ -245,7 +245,7 @@ func runTestFuncConsumerGroupMember(t *testing.T, groupID, clientID string, maxM
 	config.Consumer.Offsets.Initial = OffsetOldest
 	config.Consumer.Group.Rebalance.Timeout = 10 * time.Second
 
-	group, err := NewConsumerGroup(kafkaBrokers, groupID, config)
+	group, err := NewConsumerGroup(FunctionalTestEnv.KafkaBrokerAddrs, groupID, config)
 	if err != nil {
 		t.Fatal(err)
 		return nil

+ 8 - 6
functional_consumer_test.go

@@ -1,3 +1,5 @@
+//+build functional
+
 package sarama
 
 import (
@@ -16,7 +18,7 @@ func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 
-	consumer, err := NewConsumer(kafkaBrokers, nil)
+	consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -36,7 +38,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) {
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 
-	p, err := NewSyncProducer(kafkaBrokers, nil)
+	p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -47,7 +49,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	c, err := NewConsumer(kafkaBrokers, nil)
+	c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -143,7 +145,7 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
 	config.Consumer.IsolationLevel = ReadCommitted
 	config.Version = V0_11_0_0
 
-	consumer, err := NewConsumer(kafkaBrokers, config)
+	consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -205,7 +207,7 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
 				prodCfg.Net.MaxOpenRequests = 1
 			}
 
-			p, err := NewSyncProducer(kafkaBrokers, prodCfg)
+			p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
 			if err != nil {
 				t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
 				continue
@@ -251,7 +253,7 @@ consumerVersionLoop:
 		// message.
 		consCfg := NewConfig()
 		consCfg.Version = consVer
-		c, err := NewConsumer(kafkaBrokers, consCfg)
+		c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg)
 		if err != nil {
 			t.Fatal(err)
 		}

+ 3 - 1
functional_offset_manager_test.go

@@ -1,3 +1,5 @@
+//+build functional
+
 package sarama
 
 import (
@@ -9,7 +11,7 @@ func TestFuncOffsetManager(t *testing.T) {
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 
-	client, err := NewClient(kafkaBrokers, nil)
+	client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 10 - 8
functional_producer_test.go

@@ -1,3 +1,5 @@
+//+build functional
+
 package sarama
 
 import (
@@ -60,7 +62,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
 	config.Producer.Flush.Frequency = 50 * time.Millisecond
 	config.Producer.Flush.Messages = 200
 	config.Producer.Return.Successes = true
-	producer, err := NewSyncProducer(kafkaBrokers, config)
+	producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -88,7 +90,7 @@ func TestFuncProducingToInvalidTopic(t *testing.T) {
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 
-	producer, err := NewSyncProducer(kafkaBrokers, nil)
+	producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -120,7 +122,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
 	config.Net.MaxOpenRequests = 1
 	config.Version = V0_11_0_0
 
-	producer, err := NewSyncProducer(kafkaBrokers, config)
+	producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -138,7 +140,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
 	}
 
 	// break the brokers.
-	for proxyName, proxy := range Proxies {
+	for proxyName, proxy := range FunctionalTestEnv.Proxies {
 		if !strings.Contains(proxyName, "kafka") {
 			continue
 		}
@@ -159,7 +161,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
 	}
 
 	// Now bring the proxy back up
-	for proxyName, proxy := range Proxies {
+	for proxyName, proxy := range FunctionalTestEnv.Proxies {
 		if !strings.Contains(proxyName, "kafka") {
 			continue
 		}
@@ -186,7 +188,7 @@ func testProducingMessages(t *testing.T, config *Config) {
 	defer teardownFunctionalTest(t)
 
 	// Configure some latency in order to properly validate the request latency metric
-	for _, proxy := range Proxies {
+	for _, proxy := range FunctionalTestEnv.Proxies {
 		if _, err := proxy.AddToxic("", "latency", "", 1, toxiproxy.Attributes{"latency": 10}); err != nil {
 			t.Fatal("Unable to configure latency toxicity", err)
 		}
@@ -195,7 +197,7 @@ func testProducingMessages(t *testing.T, config *Config) {
 	config.Producer.Return.Successes = true
 	config.Consumer.Return.Errors = true
 
-	client, err := NewClient(kafkaBrokers, config)
+	client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -387,7 +389,7 @@ func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder)
 		}()
 	}
 
-	producer, err := NewAsyncProducer(kafkaBrokers, conf)
+	producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, conf)
 	if err != nil {
 		b.Fatal(err)
 	}

+ 351 - 47
functional_test.go

@@ -1,10 +1,18 @@
+//+build functional
+
 package sarama
 
 import (
+	"context"
+	"fmt"
+	"io"
 	"log"
-	"math/rand"
 	"net"
+	"net/http"
+	"net/url"
 	"os"
+	"os/exec"
+	"path/filepath"
 	"strconv"
 	"strings"
 	"testing"
@@ -14,66 +22,373 @@ import (
 )
 
 const (
-	VagrantToxiproxy      = "http://192.168.100.67:8474"
-	VagrantKafkaPeers     = "192.168.100.67:9091,192.168.100.67:9092,192.168.100.67:9093,192.168.100.67:9094,192.168.100.67:9095"
-	VagrantZookeeperPeers = "192.168.100.67:2181,192.168.100.67:2182,192.168.100.67:2183,192.168.100.67:2184,192.168.100.67:2185"
+	uncomittedMsgJar = "https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar"
 )
 
 var (
-	kafkaAvailable, kafkaRequired bool
-	kafkaBrokers                  []string
+	testTopicDetails = map[string]*TopicDetail{
+		"test.1": {
+			NumPartitions:     1,
+			ReplicationFactor: 3,
+		},
+		"test.4": {
+			NumPartitions:     4,
+			ReplicationFactor: 3,
+		},
+		"test.64": {
+			NumPartitions:     64,
+			ReplicationFactor: 3,
+		},
+		"uncommitted-topic-test-4": {
+			NumPartitions:     1,
+			ReplicationFactor: 3,
+		},
+	}
 
-	proxyClient *toxiproxy.Client
-	Proxies     map[string]*toxiproxy.Proxy
+	FunctionalTestEnv *testEnvironment
 )
 
-func init() {
+func TestMain(m *testing.M) {
+	// Functional tests for Sarama
+	//
+	// You can either set TOXIPROXY_ADDR, which points at a toxiproxy address
+	// already set up with 21801-21805 bound to zookeeper and 29091-29095
+	// bound to kafka. Alternatively, if TOXIPROXY_ADDR is not set, we'll try
+	// and use Docker to bring up a 5-node zookeeper cluster & 5-node kafka
+	// cluster, with toxiproxy configured as above.
+	//
+	// In either case, the following topics will be deleted (if they exist) and
+	// then created/pre-seeded with data for the functional test run:
+	//     * uncomitted-topic-test-4
+	//     * test.1
+	//     * test.4
+	//     * test.64
+	os.Exit(testMain(m))
+}
+
+func testMain(m *testing.M) int {
+	ctx := context.Background()
+	var env testEnvironment
+
 	if os.Getenv("DEBUG") == "true" {
 		Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
 	}
 
-	seed := time.Now().UTC().UnixNano()
-	if tmp := os.Getenv("TEST_SEED"); tmp != "" {
-		seed, _ = strconv.ParseInt(tmp, 0, 64)
+	usingExisting, err := existingEnvironment(ctx, &env)
+	if err != nil {
+		panic(err)
+	}
+	if !usingExisting {
+		err := prepareDockerTestEnvironment(ctx, &env)
+		if err != nil {
+			_ = tearDownDockerTestEnvironment(ctx, &env)
+			panic(err)
+		}
+		defer tearDownDockerTestEnvironment(ctx, &env) // nolint:errcheck
+	}
+	if err := prepareTestTopics(ctx, &env); err != nil {
+		panic(err)
+	}
+	FunctionalTestEnv = &env
+	return m.Run()
+}
+
+type testEnvironment struct {
+	ToxiproxyClient  *toxiproxy.Client
+	Proxies          map[string]*toxiproxy.Proxy
+	KafkaBrokerAddrs []string
+	KafkaVersion     string
+}
+
+func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) error {
+	Logger.Println("bringing up docker-based test environment")
+
+	// Always (try to) tear down first.
+	if err := tearDownDockerTestEnvironment(ctx, env); err != nil {
+		return fmt.Errorf("failed to tear down existing env: %w", err)
 	}
-	Logger.Println("Using random seed:", seed)
-	rand.Seed(seed)
 
-	proxyAddr := os.Getenv("TOXIPROXY_ADDR")
-	if proxyAddr == "" {
-		proxyAddr = VagrantToxiproxy
+	if version, ok := os.LookupEnv("KAFKA_VERSION"); ok {
+		env.KafkaVersion = version
+	} else {
+		// We have cp-5.5.0 as the default in the docker-compose file, so that's kafka 2.5.0.
+		env.KafkaVersion = "2.5.0"
 	}
-	proxyClient = toxiproxy.NewClient(proxyAddr)
 
-	kafkaPeers := os.Getenv("KAFKA_PEERS")
-	if kafkaPeers == "" {
-		kafkaPeers = VagrantKafkaPeers
+	// the mapping of confluent platform docker image versions -> kafka versions can be
+	// found here: https://docs.confluent.io/current/installation/versions-interoperability.html
+	var confluentPlatformVersion string
+	switch env.KafkaVersion {
+	case "2.5.0":
+		confluentPlatformVersion = "5.5.0"
+	case "2.4.1":
+		confluentPlatformVersion = "5.4.2"
+	default:
+		return fmt.Errorf("don't know what confluent platform version to use for kafka %s", env.KafkaVersion)
+	}
+
+	c := exec.Command("docker-compose", "up", "-d")
+	c.Stdout = os.Stdout
+	c.Stderr = os.Stderr
+	c.Env = append(os.Environ(), fmt.Sprintf("CONFLUENT_PLATFORM_VERSION=%s", confluentPlatformVersion))
+	err := c.Run()
+	if err != nil {
+		return fmt.Errorf("failed to run docker-compose to start test enviroment: %w", err)
+	}
+
+	// Set up toxiproxy Proxies
+	env.ToxiproxyClient = toxiproxy.NewClient("localhost:8474")
+	env.Proxies = map[string]*toxiproxy.Proxy{}
+	for i := 1; i <= 5; i++ {
+		proxyName := fmt.Sprintf("kafka%d", i)
+		proxy, err := env.ToxiproxyClient.CreateProxy(
+			proxyName,
+			fmt.Sprintf("0.0.0.0:%d", 29090+i),
+			fmt.Sprintf("kafka-%d:%d", i, 29090+i),
+		)
+		if err != nil {
+			return fmt.Errorf("failed to create toxiproxy: %w", err)
+		}
+		env.Proxies[proxyName] = proxy
+		env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090+i))
 	}
-	kafkaBrokers = strings.Split(kafkaPeers, ",")
 
-	if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil {
-		if err = c.Close(); err == nil {
-			kafkaAvailable = true
+	// Wait for the kafka broker to come up
+	allBrokersUp := false
+	for i := 0; i < 45 && !allBrokersUp; i++ {
+		Logger.Println("waiting for kafka brokers to come up")
+		time.Sleep(1 * time.Second)
+		config := NewConfig()
+		config.Version, err = ParseKafkaVersion(env.KafkaVersion)
+		if err != nil {
+			return err
 		}
+		config.Net.DialTimeout = 1 * time.Second
+		config.Net.ReadTimeout = 1 * time.Second
+		config.Net.WriteTimeout = 1 * time.Second
+		config.ClientID = "sarama-tests"
+		brokersOk := make([]bool, len(env.KafkaBrokerAddrs))
+	retryLoop:
+		for j, addr := range env.KafkaBrokerAddrs {
+			client, err := NewClient([]string{addr}, config)
+			if err != nil {
+				continue
+			}
+			err = client.RefreshMetadata()
+			if err != nil {
+				continue
+			}
+			brokers := client.Brokers()
+			if len(brokers) < 5 {
+				continue
+			}
+			for _, broker := range brokers {
+				err := broker.Open(client.Config())
+				if err != nil {
+					continue retryLoop
+				}
+				connected, err := broker.Connected()
+				if err != nil || !connected {
+					continue retryLoop
+				}
+			}
+			brokersOk[j] = true
+		}
+		allBrokersUp = true
+		for _, u := range brokersOk {
+			allBrokersUp = allBrokersUp && u
+		}
+	}
+	if !allBrokersUp {
+		return fmt.Errorf("timed out waiting for broker to come up")
 	}
 
-	kafkaRequired = os.Getenv("CI") != ""
+	return nil
 }
 
-func checkKafkaAvailability(t testing.TB) {
-	if !kafkaAvailable {
-		if kafkaRequired {
-			t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
-		} else {
-			t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
+func existingEnvironment(ctx context.Context, env *testEnvironment) (bool, error) {
+	toxiproxyAddr, ok := os.LookupEnv("TOXIPROXY_ADDR")
+	if !ok {
+		return false, nil
+	}
+	toxiproxyURL, err := url.Parse(toxiproxyAddr)
+	if err != nil {
+		return false, fmt.Errorf("$TOXIPROXY_ADDR not parseable as url")
+	}
+	toxiproxyHost := toxiproxyURL.Hostname()
+
+	env.ToxiproxyClient = toxiproxy.NewClient(toxiproxyAddr)
+	for i := 1; i <= 5; i++ {
+		proxyName := fmt.Sprintf("kafka%d", i)
+		proxy, err := env.ToxiproxyClient.Proxy(proxyName)
+		if err != nil {
+			return false, fmt.Errorf("no proxy kafka%d on toxiproxy: %w", i, err)
+		}
+		env.Proxies[proxyName] = proxy
+		// get the host:port from the proxy & toxiproxy addr, so we can do "$toxiproxy_addr:$proxy_port"
+		_, proxyPort, err := net.SplitHostPort(proxy.Listen)
+		if err != nil {
+			return false, fmt.Errorf("proxy.Listen not a host:port combo: %w", err)
 		}
+		env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("%s:%s", toxiproxyHost, proxyPort))
+	}
+
+	env.KafkaVersion, ok = os.LookupEnv("KAFKA_VERSION")
+	if !ok {
+		return false, fmt.Errorf("KAFKA_VERSION needs to be provided with TOXIPROXY_ADDR")
 	}
+	return true, nil
+}
+
+func tearDownDockerTestEnvironment(ctx context.Context, env *testEnvironment) error {
+	c := exec.Command("docker-compose", "down", "--volumes")
+	c.Stdout = os.Stdout
+	c.Stderr = os.Stderr
+	downErr := c.Run()
+
+	c = exec.Command("docker-compose", "rm", "-v", "--force", "--stop")
+	c.Stdout = os.Stdout
+	c.Stderr = os.Stderr
+	rmErr := c.Run()
+	if downErr != nil {
+		return fmt.Errorf("failed to run docker-compose to stop test enviroment: %w", downErr)
+	}
+	if rmErr != nil {
+		return fmt.Errorf("failed to run docker-compose to rm test enviroment: %w", rmErr)
+	}
+	return nil
+}
+
+func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
+	Logger.Println("creating test topics")
+	var testTopicNames []string
+	for topic := range testTopicDetails {
+		testTopicNames = append(testTopicNames, topic)
+	}
+
+	Logger.Println("Creating topics")
+	config := NewConfig()
+	config.Metadata.Retry.Max = 5
+	config.Metadata.Retry.Backoff = 10 * time.Second
+	config.ClientID = "sarama-tests"
+	var err error
+	config.Version, err = ParseKafkaVersion(env.KafkaVersion)
+	if err != nil {
+		return fmt.Errorf("failed to parse kafka version %s: %w", env.KafkaVersion, err)
+	}
+
+	client, err := NewClient(env.KafkaBrokerAddrs, config)
+	if err != nil {
+		return fmt.Errorf("failed to connect to kafka: %w", err)
+	}
+	defer client.Close()
+
+	controller, err := client.Controller()
+	if err != nil {
+		return fmt.Errorf("failed to connect to kafka controller: %w", err)
+	}
+	defer controller.Close()
+
+	// Start by deleting the test topics (if they already exist)
+	deleteRes, err := controller.DeleteTopics(&DeleteTopicsRequest{
+		Topics:  testTopicNames,
+		Timeout: 30 * time.Second,
+	})
+	if err != nil {
+		return fmt.Errorf("failed to delete test topics: %w", err)
+	}
+	for topic, topicErr := range deleteRes.TopicErrorCodes {
+		if !isTopicNotExistsErrorOrOk(topicErr) {
+			return fmt.Errorf("failed to delete topic %s: %w", topic, topicErr)
+		}
+	}
+
+	// wait for the topics to _actually_ be gone - the delete is not guaranteed to be processed
+	// synchronously
+	var topicsOk bool
+	for i := 0; i < 20 && !topicsOk; i++ {
+		time.Sleep(1 * time.Second)
+		md, err := controller.GetMetadata(&MetadataRequest{
+			Topics: testTopicNames,
+		})
+		if err != nil {
+			return fmt.Errorf("failed to get metadata for test topics: %w", err)
+		}
+
+		topicsOk = true
+		for _, topicsMd := range md.Topics {
+			if !isTopicNotExistsErrorOrOk(topicsMd.Err) {
+				topicsOk = false
+			}
+		}
+	}
+	if !topicsOk {
+		return fmt.Errorf("timed out waiting for test topics to be gone")
+	}
+
+	// now create the topics empty
+	createRes, err := controller.CreateTopics(&CreateTopicsRequest{
+		TopicDetails: testTopicDetails,
+		Timeout:      30 * time.Second,
+	})
+	if err != nil {
+		return fmt.Errorf("failed to create test topics: %w", err)
+	}
+	for topic, topicErr := range createRes.TopicErrors {
+		if !isTopicExistsErrorOrOk(topicErr.Err) {
+			return fmt.Errorf("failed to create test topic %s: %w", topic, topicErr)
+		}
+	}
+
+	// This is kind of gross, but we don't actually have support for doing transactional publishing
+	// with sarama, so we need to use a java-based tool to publish uncomitted messages to
+	// the uncommitted-topic-test-4 topic
+	jarName := filepath.Base(uncomittedMsgJar)
+	if _, err := os.Stat(jarName); err != nil {
+		Logger.Printf("Downloading %s\n", uncomittedMsgJar)
+		req, err := http.NewRequest("GET", uncomittedMsgJar, nil)
+		if err != nil {
+			return fmt.Errorf("failed creating requst for uncomitted msg jar: %w", err)
+		}
+		res, err := http.DefaultClient.Do(req)
+		if err != nil {
+			return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err)
+		}
+		defer res.Body.Close()
+		jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
+		if err != nil {
+			return fmt.Errorf("failed opening the uncomitted msg jar: %w", err)
+		}
+		defer jarFile.Close()
+
+		_, err = io.Copy(jarFile, res.Body)
+		if err != nil {
+			return fmt.Errorf("failed writing the uncomitted msg jar: %w", err)
+		}
+	}
+
+	c := exec.Command("java", "-jar", jarName, "-b", env.KafkaBrokerAddrs[0], "-c", "4")
+	c.Stdout = os.Stdout
+	c.Stderr = os.Stderr
+	err = c.Run()
+	if err != nil {
+		return fmt.Errorf("failed running uncomitted msg jar: %w", err)
+	}
+	return nil
+}
+
+func isTopicNotExistsErrorOrOk(err KError) bool {
+	return err == ErrUnknownTopicOrPartition || err == ErrInvalidTopic || err == ErrNoError
+}
+
+func isTopicExistsErrorOrOk(err KError) bool {
+	return err == ErrTopicAlreadyExists || err == ErrNoError
 }
 
 func checkKafkaVersion(t testing.TB, requiredVersion string) {
-	kafkaVersion := os.Getenv("KAFKA_VERSION")
+	kafkaVersion := FunctionalTestEnv.KafkaVersion
 	if kafkaVersion == "" {
-		t.Logf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
+		t.Skipf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
 	} else {
 		available := parseKafkaVersion(kafkaVersion)
 		required := parseKafkaVersion(requiredVersion)
@@ -84,30 +399,19 @@ func checkKafkaVersion(t testing.TB, requiredVersion string) {
 }
 
 func resetProxies(t testing.TB) {
-	if err := proxyClient.ResetState(); err != nil {
+	if err := FunctionalTestEnv.ToxiproxyClient.ResetState(); err != nil {
 		t.Error(err)
 	}
-	Proxies = nil
-}
-
-func fetchProxies(t testing.TB) {
-	var err error
-	Proxies, err = proxyClient.Proxies()
-	if err != nil {
-		t.Fatal(err)
-	}
 }
 
 func SaveProxy(t *testing.T, px string) {
-	if err := Proxies[px].Save(); err != nil {
+	if err := FunctionalTestEnv.Proxies[px].Save(); err != nil {
 		t.Fatal(err)
 	}
 }
 
 func setupFunctionalTest(t testing.TB) {
-	checkKafkaAvailability(t)
 	resetProxies(t)
-	fetchProxies(t)
 }
 
 func teardownFunctionalTest(t testing.TB) {

+ 0 - 9
vagrant/create_topics.sh

@@ -1,9 +0,0 @@
-#!/bin/sh
-
-set -ex
-
-cd ${KAFKA_INSTALL_ROOT}/kafka-9092
-bin/kafka-topics.sh --create --partitions 1 --replication-factor 3 --topic test.1 --zookeeper localhost:2181
-bin/kafka-topics.sh --create --partitions 4 --replication-factor 3 --topic test.4 --zookeeper localhost:2181
-bin/kafka-topics.sh --create --partitions 64 --replication-factor 3 --topic test.64  --zookeeper localhost:2181
-bin/kafka-topics.sh --create --partitions 1 --replication-factor 3 --topic uncommitted-topic-test-4  --zookeeper localhost:2181

+ 0 - 25
vagrant/halt_cluster.sh

@@ -1,25 +0,0 @@
-#!/bin/bash
-
-# If the functional tests failed (or some other task) then
-# we might want to look into the broker logs
-if [ "$TRAVIS_TEST_RESULT" = "1" ]; then
-    echo "Dumping Kafka brokers server.log:"
-    for i in 1 2 3 4 5; do
-        KAFKA_PORT=`expr $i + 9090`
-        sed -e "s/^/kafka-${KAFKA_PORT} /" ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/logs/server.log{.*,}
-    done
-fi
-
-set -ex
-
-for i in 1 2 3 4 5; do
-    KAFKA_PORT=`expr $i + 9090`
-    cd ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} && bin/kafka-server-stop.sh
-done
-
-for i in 1 2 3 4 5; do
-    KAFKA_PORT=`expr $i + 9090`
-    cd ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} && bin/zookeeper-server-stop.sh
-done
-
-killall toxiproxy

+ 0 - 86
vagrant/install_cluster.sh

@@ -1,86 +0,0 @@
-#!/bin/sh
-
-set -ex
-
-TOXIPROXY_VERSION=2.1.4
-
-mkdir -p ${KAFKA_INSTALL_ROOT}
-if [ ! -f ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz ]; then
-    wget --quiet https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz -O ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz
-fi
-if [ ! -f ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} ]; then
-    wget --quiet https://github.com/Shopify/toxiproxy/releases/download/v${TOXIPROXY_VERSION}/toxiproxy-server-linux-amd64 -O ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION}
-    chmod +x ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION}
-fi
-rm -f ${KAFKA_INSTALL_ROOT}/toxiproxy
-ln -s ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} ${KAFKA_INSTALL_ROOT}/toxiproxy
-
-for i in 1 2 3 4 5; do
-    ZK_PORT=$((i + 2180))
-    ZK_PORT_REAL=$((i + 21800))
-    KAFKA_PORT=$((i + 9090))
-    KAFKA_PORT_REAL=$((i + 29090))
-
-    # unpack kafka
-    mkdir -p ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}
-    tar xzf ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz -C ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT} --strip-components 1
-
-    # broker configuration
-    mkdir -p "${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/data"
-
-    # Append to default server.properties with a small number of customisations
-    printf "\n\n" >> "${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/server.properties"
-    cat << EOF >> "${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/server.properties"
-############################# Sarama Test Cluster #############################
-
-broker.id=${KAFKA_PORT}
-broker.rack=${i}
-
-# Listen on "real" port
-listeners=PLAINTEXT://:${KAFKA_PORT_REAL}
-# Advertise Toxiproxy port
-advertised.listeners=PLAINTEXT://${KAFKA_HOSTNAME}:${KAFKA_PORT}
-
-# Connect to Zookeeper via Toxiproxy port
-zookeeper.connect=127.0.0.1:${ZK_PORT}
-
-# Data directory
-log.dirs="${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/data"
-
-# Create new topics with a replication factor of 2 so failover can be tested
-# more easily.
-default.replication.factor=2
-
-# Turn on log.retention.bytes to avoid filling up the VM's disk
-log.retention.bytes=268435456
-log.segment.bytes=268435456
-
-# Enable topic deletion and disable auto-creation
-delete.topic.enable=true
-auto.create.topics.enable=false
-
-# Lower the zookeeper timeouts so we don't have to wait forever for a node
-# to die when we use toxiproxy to kill its zookeeper connection
-zookeeper.session.timeout.ms=3000
-zookeeper.connection.timeout.ms=3000
-
-# Disable broker ID length constraint
-reserved.broker.max.id=10000
-
-# Permit follower fetching (KIP-392)
-replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
-
-###############################################################################
-EOF
-
-    # zookeeper configuration
-    cp ${REPOSITORY_ROOT}/vagrant/zookeeper.properties ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/
-    sed -i s/KAFKAID/${KAFKA_PORT}/g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/zookeeper.properties
-    sed -i s/ZK_PORT/${ZK_PORT_REAL}/g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/zookeeper.properties
-
-    ZK_DATADIR="${KAFKA_INSTALL_ROOT}/zookeeper-${ZK_PORT}"
-    mkdir -p ${ZK_DATADIR}
-    sed -i s#ZK_DATADIR#${ZK_DATADIR}#g ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_PORT}/config/zookeeper.properties
-
-    echo $i > ${KAFKA_INSTALL_ROOT}/zookeeper-${ZK_PORT}/myid
-done

+ 0 - 9
vagrant/kafka.conf

@@ -1,9 +0,0 @@
-start on started zookeeper-ZK_PORT
-stop on stopping zookeeper-ZK_PORT
-
-# Use a script instead of exec (using env stanza leaks KAFKA_HEAP_OPTS from zookeeper)
-script
-  sleep 2
-  export KAFKA_HEAP_OPTS="-Xmx320m"
-  exec /opt/kafka-KAFKAID/bin/kafka-server-start.sh /opt/kafka-KAFKAID/config/server.properties
-end script

+ 0 - 17
vagrant/provision.sh

@@ -1,17 +0,0 @@
-#!/bin/sh
-
-set -ex
-
-apt-get update
-yes | apt-get install default-jre
-
-export KAFKA_INSTALL_ROOT=/opt
-export KAFKA_HOSTNAME=192.168.100.67
-export KAFKA_VERSION=1.0.2
-export KAFKA_SCALA_VERSION=2.11
-export REPOSITORY_ROOT=/vagrant
-
-sh /vagrant/vagrant/install_cluster.sh
-sh /vagrant/vagrant/setup_services.sh
-sh /vagrant/vagrant/create_topics.sh
-sh /vagrant/vagrant/run_java_producer.sh

+ 0 - 6
vagrant/run_java_producer.sh

@@ -1,6 +0,0 @@
-#!/bin/sh
-
-set -ex
-
-wget https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar
-java -jar simplest-uncommitted-msg-0.1-jar-with-dependencies.jar -b ${KAFKA_HOSTNAME}:9092 -c 4

+ 0 - 22
vagrant/run_toxiproxy.sh

@@ -1,22 +0,0 @@
-#!/bin/sh
-
-set -ex
-
-${KAFKA_INSTALL_ROOT}/toxiproxy -port 8474 -host 0.0.0.0 &
-PID=$!
-
-while ! nc -q 1 localhost 8474 </dev/null; do echo "Waiting"; sleep 1; done
-
-wget -O/dev/null -S --post-data='{"name":"zk1", "upstream":"localhost:21801", "listen":"0.0.0.0:2181"}' localhost:8474/proxies
-wget -O/dev/null -S --post-data='{"name":"zk2", "upstream":"localhost:21802", "listen":"0.0.0.0:2182"}' localhost:8474/proxies
-wget -O/dev/null -S --post-data='{"name":"zk3", "upstream":"localhost:21803", "listen":"0.0.0.0:2183"}' localhost:8474/proxies
-wget -O/dev/null -S --post-data='{"name":"zk4", "upstream":"localhost:21804", "listen":"0.0.0.0:2184"}' localhost:8474/proxies
-wget -O/dev/null -S --post-data='{"name":"zk5", "upstream":"localhost:21805", "listen":"0.0.0.0:2185"}' localhost:8474/proxies
-
-wget -O/dev/null -S --post-data='{"name":"kafka1", "upstream":"localhost:29091", "listen":"0.0.0.0:9091"}' localhost:8474/proxies
-wget -O/dev/null -S --post-data='{"name":"kafka2", "upstream":"localhost:29092", "listen":"0.0.0.0:9092"}' localhost:8474/proxies
-wget -O/dev/null -S --post-data='{"name":"kafka3", "upstream":"localhost:29093", "listen":"0.0.0.0:9093"}' localhost:8474/proxies
-wget -O/dev/null -S --post-data='{"name":"kafka4", "upstream":"localhost:29094", "listen":"0.0.0.0:9094"}' localhost:8474/proxies
-wget -O/dev/null -S --post-data='{"name":"kafka5", "upstream":"localhost:29095", "listen":"0.0.0.0:9095"}' localhost:8474/proxies
-
-wait $PID

+ 0 - 29
vagrant/setup_services.sh

@@ -1,29 +0,0 @@
-#!/bin/sh
-
-set -ex
-
-stop toxiproxy || true
-cp ${REPOSITORY_ROOT}/vagrant/toxiproxy.conf /etc/init/toxiproxy.conf
-cp ${REPOSITORY_ROOT}/vagrant/run_toxiproxy.sh ${KAFKA_INSTALL_ROOT}/
-start toxiproxy
-
-for i in 1 2 3 4 5; do
-    ZK_PORT=`expr $i + 2180`
-    KAFKA_PORT=`expr $i + 9090`
-
-    stop zookeeper-${ZK_PORT} || true
-
-    # set up zk service
-    cp ${REPOSITORY_ROOT}/vagrant/zookeeper.conf /etc/init/zookeeper-${ZK_PORT}.conf
-    sed -i s/KAFKAID/${KAFKA_PORT}/g /etc/init/zookeeper-${ZK_PORT}.conf
-
-    # set up kafka service
-    cp ${REPOSITORY_ROOT}/vagrant/kafka.conf /etc/init/kafka-${KAFKA_PORT}.conf
-    sed -i s/KAFKAID/${KAFKA_PORT}/g /etc/init/kafka-${KAFKA_PORT}.conf
-    sed -i s/ZK_PORT/${ZK_PORT}/g /etc/init/kafka-${KAFKA_PORT}.conf
-
-    start zookeeper-${ZK_PORT}
-done
-
-# Wait for the last kafka node to finish booting
-while ! nc -q 1 localhost 29095 </dev/null; do echo "Waiting"; sleep 1; done

+ 0 - 6
vagrant/toxiproxy.conf

@@ -1,6 +0,0 @@
-start on started networking
-stop on shutdown
-
-env KAFKA_INSTALL_ROOT=/opt
-
-exec /opt/run_toxiproxy.sh

+ 0 - 7
vagrant/zookeeper.conf

@@ -1,7 +0,0 @@
-start on started toxiproxy
-stop on stopping toxiproxy
-
-script
-  export KAFKA_HEAP_OPTS="-Xmx192m"
-  exec /opt/kafka-KAFKAID/bin/zookeeper-server-start.sh /opt/kafka-KAFKAID/config/zookeeper.properties
-end script

+ 0 - 36
vagrant/zookeeper.properties

@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# the directory where the snapshot is stored.
-dataDir=ZK_DATADIR
-# the port at which the clients will connect
-clientPort=ZK_PORT
-# disable the per-ip limit on the number of connections since this is a non-production config
-maxClientCnxns=0
-
-# The number of milliseconds of each tick
-tickTime=2000
-
-# The number of ticks that the initial synchronization phase can take
-initLimit=10
-
-# The number of ticks that can pass between
-# sending a request and getting an acknowledgement
-syncLimit=5
-
-server.1=localhost:2281:2381
-server.2=localhost:2282:2382
-server.3=localhost:2283:2383
-server.4=localhost:2284:2384
-server.5=localhost:2285:2385