Переглянути джерело

Add docker-composed based functional test harness

* Functional tests are skipped except when the `functional` build tag
  is passed to the go test command (i.e. go test -tags=functional)
* If TOXIPROXY_ADDR is not set when the functional tests are being run,
  it will use docker-compose to automatically spin up a
  zookeeper/kafka/toxiproxy environment suitab le for running the tests.
  This requies a working Docker and for the docker-compose command line
  tool to be installed.
* If TOXIPROXY_ADDR and KAFKA_VERSION are set, then the tests will not
  spin up any docker infrastructure and will instead rely on whatever
  kafka broker is behind the specified toxiproxy.
KJ Tsanaktsidis 4 роки тому
батько
коміт
f33a0afab2

+ 3 - 0
.gitignore

@@ -25,3 +25,6 @@ _testmain.go
 
 coverage.txt
 profile.out
+
+simplest-uncommitted-msg-0.1-jar-with-dependencies.jar
+

+ 134 - 0
docker-compose.yml

@@ -0,0 +1,134 @@
+version: '3.7'
+services:
+  zookeeper-1:
+    image: 'confluentinc/cp-zookeeper:5.5.0'
+    restart: always
+    environment:
+      ZOOKEEPER_SERVER_ID: '1'
+      ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888'
+      ZOOKEEPER_CLIENT_PORT: '2181'
+      ZOOKEEPER_PEER_PORT: '2888'
+      ZOOKEEPER_LEADER_PORT: '3888'
+      ZOOKEEPER_INIT_LIMIT: '10'
+      ZOOKEEPER_SYNC_LIMIT: '5'
+      ZOOKEEPER_MAX_CLIENT_CONNS: '0'
+  zookeeper-2:
+    image: 'confluentinc/cp-zookeeper:5.5.0'
+    restart: always
+    environment:
+      ZOOKEEPER_SERVER_ID: '2'
+      ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888'
+      ZOOKEEPER_CLIENT_PORT: '2181'
+      ZOOKEEPER_PEER_PORT: '2888'
+      ZOOKEEPER_LEADER_PORT: '3888'
+      ZOOKEEPER_INIT_LIMIT: '10'
+      ZOOKEEPER_SYNC_LIMIT: '5'
+      ZOOKEEPER_MAX_CLIENT_CONNS: '0'
+  zookeeper-3:
+    image: 'confluentinc/cp-zookeeper:5.5.0'
+    restart: always
+    environment:
+      ZOOKEEPER_SERVER_ID: '3'
+      ZOOKEEPER_SERVERS: 'zookeeper-1:2888:3888;zookeeper-2:2888:3888;zookeeper-3:2888:3888'
+      ZOOKEEPER_CLIENT_PORT: '2181'
+      ZOOKEEPER_PEER_PORT: '2888'
+      ZOOKEEPER_LEADER_PORT: '3888'
+      ZOOKEEPER_INIT_LIMIT: '10'
+      ZOOKEEPER_SYNC_LIMIT: '5'
+      ZOOKEEPER_MAX_CLIENT_CONNS: '0'
+  kafka-1:
+    image: 'confluentinc/cp-kafka:5.5.0'
+    restart: always
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
+      KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29091'
+      KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-1:9091,LISTENER_LOCAL://localhost:29091'
+      KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
+      KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
+      KAFKA_BROKER_ID: '1'
+      KAFKA_BROKER_RACK: '1'
+      KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
+      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
+      KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
+      KAFKA_DELETE_TOPIC_ENABLE: 'true'
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
+  kafka-2:
+    image: 'confluentinc/cp-kafka:5.5.0'
+    restart: always
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
+      KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29092'
+      KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-2:9091,LISTENER_LOCAL://localhost:29092'
+      KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
+      KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
+      KAFKA_BROKER_ID: '2'
+      KAFKA_BROKER_RACK: '2'
+      KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
+      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
+      KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
+      KAFKA_DELETE_TOPIC_ENABLE: 'true'
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
+  kafka-3:
+    image: 'confluentinc/cp-kafka:5.5.0'
+    restart: always
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
+      KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29093'
+      KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-3:9091,LISTENER_LOCAL://localhost:29093'
+      KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
+      KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
+      KAFKA_BROKER_ID: '3'
+      KAFKA_BROKER_RACK: '3'
+      KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
+      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
+      KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
+      KAFKA_DELETE_TOPIC_ENABLE: 'true'
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
+  kafka-4:
+    image: 'confluentinc/cp-kafka:5.5.0'
+    restart: always
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
+      KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29094'
+      KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-4:9091,LISTENER_LOCAL://localhost:29094'
+      KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
+      KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
+      KAFKA_BROKER_ID: '4'
+      KAFKA_BROKER_RACK: '4'
+      KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
+      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
+      KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
+      KAFKA_DELETE_TOPIC_ENABLE: 'true'
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
+  kafka-5:
+    image: 'confluentinc/cp-kafka:5.5.0'
+    restart: always
+    environment:
+      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper-1:2181,zookeeper-2:2181,zookeeper-3:2181'
+      KAFKA_LISTENERS: 'LISTENER_INTERNAL://:9091,LISTENER_LOCAL://:29095'
+      KAFKA_ADVERTISED_LISTENERS: 'LISTENER_INTERNAL://kafka-5:9091,LISTENER_LOCAL://localhost:29095'
+      KAFKA_INTER_BROKER_LISTENER_NAME: 'LISTENER_INTERNAL'
+      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: 'LISTENER_INTERNAL:PLAINTEXT,LISTENER_LOCAL:PLAINTEXT'
+      KAFKA_DEFAULT_REPLICATION_FACTOR: '2'
+      KAFKA_BROKER_ID: '5'
+      KAFKA_BROKER_RACK: '5'
+      KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: '3000'
+      KAFKA_ZOOKEEPER_CONNECTION_TIMEOUT_MS: '3000'
+      KAFKA_REPLICA_SELECTOR_CLASS: 'org.apache.kafka.common.replica.RackAwareReplicaSelector'
+      KAFKA_DELETE_TOPIC_ENABLE: 'true'
+      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'false'
+  toxiproxy:
+    image: 'shopify/toxiproxy:2.1.4'
+    ports:
+      # The tests themselves actually start the proies on these ports
+      - '29091:29091'
+      - '29092:29092'
+      - '29093:29093'
+      - '29094:29094'
+      - '29095:29095'
+      # This is the toxiproxy API port
+      - '8474:8474'

+ 6 - 4
functional_client_test.go

@@ -1,3 +1,5 @@
+//+build functional
+
 package sarama
 
 import (
@@ -10,13 +12,13 @@ func TestFuncConnectionFailure(t *testing.T) {
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 
-	Proxies["kafka1"].Enabled = false
+	FunctionalTestEnv.Proxies["kafka1"].Enabled = false
 	SaveProxy(t, "kafka1")
 
 	config := NewConfig()
 	config.Metadata.Retry.Max = 1
 
-	_, err := NewClient([]string{kafkaBrokers[0]}, config)
+	_, err := NewClient([]string{FunctionalTestEnv.KafkaBrokerAddrs[0]}, config)
 	if err != ErrOutOfBrokers {
 		t.Fatal("Expected returned error to be ErrOutOfBrokers, but was: ", err)
 	}
@@ -29,7 +31,7 @@ func TestFuncClientMetadata(t *testing.T) {
 	config := NewConfig()
 	config.Metadata.Retry.Max = 1
 	config.Metadata.Retry.Backoff = 10 * time.Millisecond
-	client, err := NewClient(kafkaBrokers, config)
+	client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -70,7 +72,7 @@ func TestFuncClientCoordinator(t *testing.T) {
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 
-	client, err := NewClient(kafkaBrokers, nil)
+	client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 3 - 3
functional_consumer_group_test.go

@@ -1,4 +1,4 @@
-// +build go1.9
+//+build functional
 
 package sarama
 
@@ -153,7 +153,7 @@ func testFuncConsumerGroupID(t *testing.T) string {
 }
 
 func testFuncConsumerGroupFuzzySeed(topic string) error {
-	client, err := NewClient(kafkaBrokers, nil)
+	client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil)
 	if err != nil {
 		return err
 	}
@@ -245,7 +245,7 @@ func runTestFuncConsumerGroupMember(t *testing.T, groupID, clientID string, maxM
 	config.Consumer.Offsets.Initial = OffsetOldest
 	config.Consumer.Group.Rebalance.Timeout = 10 * time.Second
 
-	group, err := NewConsumerGroup(kafkaBrokers, groupID, config)
+	group, err := NewConsumerGroup(FunctionalTestEnv.KafkaBrokerAddrs, groupID, config)
 	if err != nil {
 		t.Fatal(err)
 		return nil

+ 8 - 6
functional_consumer_test.go

@@ -1,3 +1,5 @@
+//+build functional
+
 package sarama
 
 import (
@@ -16,7 +18,7 @@ func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 
-	consumer, err := NewConsumer(kafkaBrokers, nil)
+	consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -36,7 +38,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) {
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 
-	p, err := NewSyncProducer(kafkaBrokers, nil)
+	p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -47,7 +49,7 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	c, err := NewConsumer(kafkaBrokers, nil)
+	c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -143,7 +145,7 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
 	config.Consumer.IsolationLevel = ReadCommitted
 	config.Version = V0_11_0_0
 
-	consumer, err := NewConsumer(kafkaBrokers, config)
+	consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -205,7 +207,7 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
 				prodCfg.Net.MaxOpenRequests = 1
 			}
 
-			p, err := NewSyncProducer(kafkaBrokers, prodCfg)
+			p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
 			if err != nil {
 				t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
 				continue
@@ -251,7 +253,7 @@ consumerVersionLoop:
 		// message.
 		consCfg := NewConfig()
 		consCfg.Version = consVer
-		c, err := NewConsumer(kafkaBrokers, consCfg)
+		c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg)
 		if err != nil {
 			t.Fatal(err)
 		}

+ 3 - 1
functional_offset_manager_test.go

@@ -1,3 +1,5 @@
+//+build functional
+
 package sarama
 
 import (
@@ -9,7 +11,7 @@ func TestFuncOffsetManager(t *testing.T) {
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 
-	client, err := NewClient(kafkaBrokers, nil)
+	client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, nil)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 10 - 8
functional_producer_test.go

@@ -1,3 +1,5 @@
+//+build functional
+
 package sarama
 
 import (
@@ -53,7 +55,7 @@ func TestFuncMultiPartitionProduce(t *testing.T) {
 	config.Producer.Flush.Frequency = 50 * time.Millisecond
 	config.Producer.Flush.Messages = 200
 	config.Producer.Return.Successes = true
-	producer, err := NewSyncProducer(kafkaBrokers, config)
+	producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -81,7 +83,7 @@ func TestFuncProducingToInvalidTopic(t *testing.T) {
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 
-	producer, err := NewSyncProducer(kafkaBrokers, nil)
+	producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -113,7 +115,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
 	config.Net.MaxOpenRequests = 1
 	config.Version = V0_11_0_0
 
-	producer, err := NewSyncProducer(kafkaBrokers, config)
+	producer, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -131,7 +133,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
 	}
 
 	// break the brokers.
-	for proxyName, proxy := range Proxies {
+	for proxyName, proxy := range FunctionalTestEnv.Proxies {
 		if !strings.Contains(proxyName, "kafka") {
 			continue
 		}
@@ -152,7 +154,7 @@ func TestFuncProducingIdempotentWithBrokerFailure(t *testing.T) {
 	}
 
 	// Now bring the proxy back up
-	for proxyName, proxy := range Proxies {
+	for proxyName, proxy := range FunctionalTestEnv.Proxies {
 		if !strings.Contains(proxyName, "kafka") {
 			continue
 		}
@@ -179,7 +181,7 @@ func testProducingMessages(t *testing.T, config *Config) {
 	defer teardownFunctionalTest(t)
 
 	// Configure some latency in order to properly validate the request latency metric
-	for _, proxy := range Proxies {
+	for _, proxy := range FunctionalTestEnv.Proxies {
 		if _, err := proxy.AddToxic("", "latency", "", 1, toxiproxy.Attributes{"latency": 10}); err != nil {
 			t.Fatal("Unable to configure latency toxicity", err)
 		}
@@ -188,7 +190,7 @@ func testProducingMessages(t *testing.T, config *Config) {
 	config.Producer.Return.Successes = true
 	config.Consumer.Return.Errors = true
 
-	client, err := NewClient(kafkaBrokers, config)
+	client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -380,7 +382,7 @@ func benchmarkProducer(b *testing.B, conf *Config, topic string, value Encoder)
 		}()
 	}
 
-	producer, err := NewAsyncProducer(kafkaBrokers, conf)
+	producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, conf)
 	if err != nil {
 		b.Fatal(err)
 	}

+ 337 - 49
functional_test.go

@@ -1,79 +1,378 @@
+//+build functional
+
 package sarama
 
 import (
+	"context"
+	"fmt"
+	toxiproxy "github.com/Shopify/toxiproxy/client"
+	"io"
 	"log"
-	"math/rand"
 	"net"
+	"net/http"
+	"net/url"
 	"os"
+	"os/exec"
+	"path/filepath"
 	"strconv"
 	"strings"
 	"testing"
 	"time"
-
-	toxiproxy "github.com/Shopify/toxiproxy/client"
 )
 
 const (
-	VagrantToxiproxy      = "http://192.168.100.67:8474"
-	VagrantKafkaPeers     = "192.168.100.67:9091,192.168.100.67:9092,192.168.100.67:9093,192.168.100.67:9094,192.168.100.67:9095"
-	VagrantZookeeperPeers = "192.168.100.67:2181,192.168.100.67:2182,192.168.100.67:2183,192.168.100.67:2184,192.168.100.67:2185"
+	uncomittedMsgJar = "https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar"
 )
 
 var (
-	kafkaAvailable, kafkaRequired bool
-	kafkaBrokers                  []string
+	testTopicDetails = map[string]*TopicDetail{
+		"test.1": {
+			NumPartitions: 1,
+			ReplicationFactor: 3,
+		},
+		"test.4": {
+			NumPartitions: 4,
+			ReplicationFactor: 3,
+		},
+		"test.64": {
+			NumPartitions: 64,
+			ReplicationFactor: 3,
+		},
+		"uncommitted-topic-test-4": {
+			NumPartitions: 1,
+			ReplicationFactor: 3,
+		},
+	}
 
-	proxyClient *toxiproxy.Client
-	Proxies     map[string]*toxiproxy.Proxy
+	FunctionalTestEnv *testEnvironment
 )
 
-func init() {
+func TestMain(m *testing.M) {
+	// Functional tests for Sarama
+	//
+	// You can either set TOXIPROXY_ADDR, which points at a toxiproxy address
+	// already set up with 21801-21805 bound to zookeeper and 29091-29095
+	// bound to kafka. Alternatively, if TOXIPROXY_ADDR is not set, we'll try
+	// and use Docker to bring up a 5-node zookeeper cluster & 5-node kafka
+	// cluster, with toxiproxy configured as above.
+	//
+	// In either case, the following topics will be deleted (if they exist) and
+	// then created/pre-seeded with data for the functional test run:
+	//     * uncomitted-topic-test-4
+	//     * test.1
+	//     * test.4
+	//     * test.64
+	os.Exit(testMain(m))
+}
+
+func testMain(m *testing.M) int {
+	ctx := context.Background()
+	var env testEnvironment
+
 	if os.Getenv("DEBUG") == "true" {
 		Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags)
 	}
 
-	seed := time.Now().UTC().UnixNano()
-	if tmp := os.Getenv("TEST_SEED"); tmp != "" {
-		seed, _ = strconv.ParseInt(tmp, 0, 64)
+	usingExisting, err := existingEnvironment(ctx, &env)
+	if err != nil {
+		panic(err)
+	}
+	if !usingExisting {
+		err := prepareDockerTestEnvironment(ctx, &env)
+		if err != nil {
+			tearDownDockerTestEnvironment(ctx, &env)
+			panic(err)
+		}
+		defer tearDownDockerTestEnvironment(ctx, &env)
 	}
-	Logger.Println("Using random seed:", seed)
-	rand.Seed(seed)
+	if err := prepareTestTopics(ctx, &env); err != nil {
+		panic(err)
+	}
+	FunctionalTestEnv = &env
+	return m.Run()
+}
+
+type testEnvironment struct {
+	ToxiproxyClient  *toxiproxy.Client
+	Proxies          map[string]*toxiproxy.Proxy
+	KafkaBrokerAddrs []string
+	KafkaVersion     string
+}
+
+func prepareDockerTestEnvironment(ctx context.Context, env *testEnvironment) error {
+	Logger.Println("bringing up docker-based test environment")
 
-	proxyAddr := os.Getenv("TOXIPROXY_ADDR")
-	if proxyAddr == "" {
-		proxyAddr = VagrantToxiproxy
+	// Always (try to) tear down first.
+	if err := tearDownDockerTestEnvironment(ctx, env); err != nil {
+		return fmt.Errorf("failed to tear down existing env: %w", err)
 	}
-	proxyClient = toxiproxy.NewClient(proxyAddr)
 
-	kafkaPeers := os.Getenv("KAFKA_PEERS")
-	if kafkaPeers == "" {
-		kafkaPeers = VagrantKafkaPeers
+	c := exec.Command("docker-compose", "up", "-d")
+	c.Stdout = os.Stdout
+	c.Stderr = os.Stderr
+	err := c.Run()
+	if err != nil {
+		return fmt.Errorf("failed to run docker-compose to start test enviroment: %w", err)
 	}
-	kafkaBrokers = strings.Split(kafkaPeers, ",")
 
-	if c, err := net.DialTimeout("tcp", kafkaBrokers[0], 5*time.Second); err == nil {
-		if err = c.Close(); err == nil {
-			kafkaAvailable = true
+	// Set up toxiproxy Proxies
+	env.ToxiproxyClient = toxiproxy.NewClient("localhost:8474")
+	env.Proxies = map[string]*toxiproxy.Proxy{}
+	for i := 1; i <= 5; i++ {
+		proxyName := fmt.Sprintf("kafka%d", i)
+		proxy, err := env.ToxiproxyClient.CreateProxy(
+			proxyName,
+			fmt.Sprintf("0.0.0.0:%d", 29090 + i),
+			fmt.Sprintf("kafka-%d:%d", i, 29090 + i),
+		)
+		if err != nil {
+			return fmt.Errorf("failed to create toxiproxy: %w", err)
 		}
+		env.Proxies[proxyName] = proxy
+		env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("127.0.0.1:%d", 29090 + i))
 	}
 
-	kafkaRequired = os.Getenv("CI") != ""
+	// the mapping of confluent platform docker image vesions -> kafka versions can be
+	// found here: https://docs.confluent.io/current/installation/versions-interoperability.html
+	// We have cp-5.5.0 in the docker-compose file, so that's kafka 2.5.0.
+	env.KafkaVersion = "2.5.0"
+
+	// Wait for the kafka broker to come up
+	allBrokersUp := false
+	for i := 0; i < 45 && !allBrokersUp; i++ {
+		Logger.Println("waiting for kafka brokers to come up")
+		time.Sleep(1 * time.Second)
+		config := NewConfig()
+		config.Version, err = ParseKafkaVersion(env.KafkaVersion)
+		if err != nil {
+			return err
+		}
+		config.Net.DialTimeout = 1 * time.Second
+		config.Net.ReadTimeout = 1 * time.Second
+		config.Net.WriteTimeout = 1 * time.Second
+		config.ClientID = "sarama-tests"
+		brokersOk := make([]bool, len(env.KafkaBrokerAddrs))
+	retryLoop:
+		for j, addr := range env.KafkaBrokerAddrs {
+			client, err := NewClient([]string{addr},config)
+			if err != nil {
+				continue
+			}
+			err = client.RefreshMetadata()
+			if err != nil {
+				continue
+			}
+			brokers := client.Brokers()
+			if len(brokers) < 5 {
+				continue
+			}
+			for _, broker := range brokers {
+				err := broker.Open(client.Config())
+				if err != nil {
+					continue retryLoop
+				}
+				connected, err := broker.Connected()
+				if err != nil || !connected {
+					continue retryLoop
+				}
+			}
+			brokersOk[j] = true
+		}
+		allBrokersUp = true
+		for _, u := range brokersOk {
+			allBrokersUp = allBrokersUp && u
+		}
+	}
+	if !allBrokersUp {
+		return fmt.Errorf("timed out waiting for broker to come up")
+	}
+
+	return nil
 }
 
-func checkKafkaAvailability(t testing.TB) {
-	if !kafkaAvailable {
-		if kafkaRequired {
-			t.Fatalf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
-		} else {
-			t.Skipf("Kafka broker is not available on %s. Set KAFKA_PEERS to connect to Kafka on a different location.", kafkaBrokers[0])
+func existingEnvironment(ctx context.Context, env *testEnvironment) (bool, error) {
+	toxiproxyAddr, ok := os.LookupEnv("TOXIPROXY_ADDR")
+	if !ok {
+		return false, nil
+	}
+	toxiproxyURL, err := url.Parse(toxiproxyAddr)
+	if err != nil {
+		return false, fmt.Errorf("$TOXIPROXY_ADDR not parseable as url")
+	}
+	toxiproxyHost := toxiproxyURL.Hostname()
+
+	env.ToxiproxyClient = toxiproxy.NewClient(toxiproxyAddr)
+	for i := 1; i <= 5; i++ {
+		proxyName := fmt.Sprintf("kafka%d", i)
+		proxy, err := env.ToxiproxyClient.Proxy(proxyName)
+		if err != nil {
+			return false, fmt.Errorf("no proxy kafka%d on toxiproxy: %w", i, err)
+		}
+		env.Proxies[proxyName] = proxy
+		// get the host:port from the proxy & toxiproxy addr, so we can do "$toxiproxy_addr:$proxy_port"
+		_, proxyPort, err := net.SplitHostPort(proxy.Listen)
+		if err != nil {
+			return false, fmt.Errorf("proxy.Listen not a host:port combo: %w", err)
+		}
+		env.KafkaBrokerAddrs = append(env.KafkaBrokerAddrs, fmt.Sprintf("%s:%s", toxiproxyHost, proxyPort))
+	}
+
+	env.KafkaVersion, ok = os.LookupEnv("KAFKA_VERSION")
+	if !ok {
+		return false, fmt.Errorf("KAFKA_VERSION needs to be provided with TOXIPROXY_ADDR")
+	}
+	return true, nil
+}
+
+func tearDownDockerTestEnvironment(ctx context.Context, env *testEnvironment) error {
+	c := exec.Command("docker-compose", "down", "--volumes")
+	c.Stdout = os.Stdout
+	c.Stderr = os.Stderr
+	downErr := c.Run()
+
+	c = exec.Command("docker-compose", "rm", "-v", "--force", "--stop")
+	c.Stdout = os.Stdout
+	c.Stderr = os.Stderr
+	rmErr := c.Run()
+	if downErr != nil {
+		return fmt.Errorf("failed to run docker-compose to stop test enviroment: %w", downErr)
+	}
+	if rmErr != nil {
+		return fmt.Errorf("failed to run docker-compose to rm test enviroment: %w", rmErr)
+	}
+	return nil
+}
+
+func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
+	Logger.Println("creating test topics")
+	var testTopicNames []string
+	for topic, _ := range testTopicDetails {
+		testTopicNames = append(testTopicNames, topic)
+	}
+
+	Logger.Println("Creating topics")
+	config := NewConfig()
+	config.Metadata.Retry.Max = 5
+	config.Metadata.Retry.Backoff = 10 * time.Second
+	config.ClientID = "sarama-tests"
+	var err error
+	config.Version, err = ParseKafkaVersion(env.KafkaVersion)
+	if err != nil {
+		return fmt.Errorf("failed to parse kafka version %s: %w", env.KafkaVersion, err)
+	}
+
+	client, err := NewClient(env.KafkaBrokerAddrs, config)
+	if err != nil {
+		return fmt.Errorf("failed to connect to kafka: %w", err)
+	}
+	defer client.Close()
+
+	controller, err := client.Controller()
+	if err != nil {
+		return fmt.Errorf("failed to connect to kafka controller: %w", err)
+	}
+	defer controller.Close()
+
+	// Start by deleting the test topics (if they already exist)
+	deleteRes, err := controller.DeleteTopics(&DeleteTopicsRequest{
+		Topics: testTopicNames,
+		Timeout: 30 * time.Second,
+	})
+	if err != nil {
+		return fmt.Errorf("failed to delete test topics: %w", err)
+	}
+	for topic, topicErr := range deleteRes.TopicErrorCodes {
+		if !isTopicNotExistsErrorOrOk(topicErr) {
+			return fmt.Errorf("failed to delete topic %s: %w", topic, topicErr)
+		}
+	}
+
+	// wait for the topics to _actually_ be gone - the delete is not guaranteed to be processed
+	// synchronously
+	var topicsOk bool
+	for i := 0; i < 20 && !topicsOk; i++ {
+		time.Sleep(1 * time.Second)
+		md, err := controller.GetMetadata(&MetadataRequest{
+			Topics: testTopicNames,
+		})
+		if err != nil {
+			return fmt.Errorf("failed to get metadata for test topics: %w", err)
+		}
+
+		topicsOk = true
+		for _, topicsMd := range md.Topics {
+			if !isTopicNotExistsErrorOrOk(topicsMd.Err) {
+				topicsOk = false
+			}
 		}
 	}
+	if !topicsOk {
+		return fmt.Errorf("timed out waiting for test topics to be gone")
+	}
+
+	// now create the topics empty
+	createRes, err := controller.CreateTopics(&CreateTopicsRequest{
+		TopicDetails: testTopicDetails,
+		Timeout: 30 * time.Second,
+	})
+	if err != nil {
+		return fmt.Errorf("failed to create test topics: %w", err)
+	}
+	for topic, topicErr := range createRes.TopicErrors {
+		if !isTopicExistsErrorOrOk(topicErr.Err) {
+			return fmt.Errorf("failed to create test topic %s: %w", topic, topicErr)
+		}
+	}
+
+	// This is kind of gross, but we don't actually have support for doing transactional publishing
+	// with sarama, so we need to use a java-based tool to publish uncomitted messages to
+	// the uncommitted-topic-test-4 topic
+	jarName := filepath.Base(uncomittedMsgJar)
+	if _, err := os.Stat(jarName); err != nil {
+		Logger.Printf("Downloading %s\n", uncomittedMsgJar)
+		req, err := http.NewRequest("GET", uncomittedMsgJar, nil)
+		if err != nil {
+			return fmt.Errorf("failed creating requst for uncomitted msg jar: %w", err)
+		}
+		res, err := http.DefaultClient.Do(req)
+		if err != nil {
+			return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err)
+		}
+		defer res.Body.Close()
+		jarFile, err := os.OpenFile(jarName, os.O_WRONLY | os.O_TRUNC | os.O_CREATE, 0644)
+		if err != nil {
+			return fmt.Errorf("failed opening the uncomitted msg jar: %w", err)
+		}
+		defer jarFile.Close()
+
+		_, err = io.Copy(jarFile, res.Body)
+		if err != nil {
+			return fmt.Errorf("failed writing the uncomitted msg jar: %w", err)
+		}
+	}
+
+	c := exec.Command("java", "-jar", jarName, "-b", env.KafkaBrokerAddrs[0], "-c", "4")
+	c.Stdout = os.Stdout
+	c.Stderr = os.Stderr
+	err = c.Run()
+	if err != nil {
+		return fmt.Errorf("failed running uncomitted msg jar: %w", err)
+	}
+	return nil
+}
+
+func isTopicNotExistsErrorOrOk(err KError) bool {
+	return err == ErrUnknownTopicOrPartition || err == ErrInvalidTopic || err == ErrNoError
+}
+
+func isTopicExistsErrorOrOk(err KError) bool {
+	return err == ErrTopicAlreadyExists || err == ErrNoError
 }
 
 func checkKafkaVersion(t testing.TB, requiredVersion string) {
-	kafkaVersion := os.Getenv("KAFKA_VERSION")
+	kafkaVersion := FunctionalTestEnv.KafkaVersion
 	if kafkaVersion == "" {
-		t.Logf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
+		t.Skipf("No KAFKA_VERSION set. This test requires Kafka version %s or higher. Continuing...", requiredVersion)
 	} else {
 		available := parseKafkaVersion(kafkaVersion)
 		required := parseKafkaVersion(requiredVersion)
@@ -84,30 +383,19 @@ func checkKafkaVersion(t testing.TB, requiredVersion string) {
 }
 
 func resetProxies(t testing.TB) {
-	if err := proxyClient.ResetState(); err != nil {
+	if err := FunctionalTestEnv.ToxiproxyClient.ResetState(); err != nil {
 		t.Error(err)
 	}
-	Proxies = nil
-}
-
-func fetchProxies(t testing.TB) {
-	var err error
-	Proxies, err = proxyClient.Proxies()
-	if err != nil {
-		t.Fatal(err)
-	}
 }
 
 func SaveProxy(t *testing.T, px string) {
-	if err := Proxies[px].Save(); err != nil {
+	if err := FunctionalTestEnv.Proxies[px].Save(); err != nil {
 		t.Fatal(err)
 	}
 }
 
 func setupFunctionalTest(t testing.TB) {
-	checkKafkaAvailability(t)
 	resetProxies(t)
-	fetchProxies(t)
 }
 
 func teardownFunctionalTest(t testing.TB) {