Browse Source

Merge pull request #293 from Shopify/cleanup-tests

Cleanup tests
Evan Huus 10 years ago
parent
commit
d2bdfc2d01
3 changed files with 319 additions and 308 deletions
  1. 52 49
      client_test.go
  2. 105 96
      consumer_test.go
  3. 162 163
      producer_test.go

+ 52 - 49
client_test.go

@@ -20,40 +20,38 @@ func TestDefaultClientConfigValidates(t *testing.T) {
 }
 
 func TestSimpleClient(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
 
-	mb := NewMockBroker(t, 1)
+	seedBroker.Returns(new(MetadataResponse))
 
-	mb.Returns(new(MetadataResponse))
-
-	client, err := NewClient("client_id", []string{mb.Addr()}, nil)
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, client)
-	defer mb.Close()
+
+	seedBroker.Close()
+	safeClose(t, client)
 }
 
 func TestCachedPartitions(t *testing.T) {
-	mb1 := NewMockBroker(t, 1)
-	mb5 := NewMockBroker(t, 5)
+	seedBroker := NewMockBroker(t, 1)
+	leader := NewMockBroker(t, 5)
+
 	replicas := []int32{3, 1, 5}
 	isr := []int32{5, 1}
 
-	mdr := new(MetadataResponse)
-	mdr.AddBroker(mb5.Addr(), mb5.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, mb5.BrokerID(), replicas, isr, NoError)
-	mdr.AddTopicPartition("my_topic", 1, mb5.BrokerID(), replicas, isr, LeaderNotAvailable)
-	mb1.Returns(mdr)
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, LeaderNotAvailable)
+	seedBroker.Returns(metadataResponse)
 
 	config := NewClientConfig()
 	config.MetadataRetries = 0
-	client, err := NewClient("client_id", []string{mb1.Addr()}, config)
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, client)
-	defer mb1.Close()
-	defer mb5.Close()
 
 	// Verify they aren't cached the same
 	allP := client.cachedPartitionsResults["my_topic"][allPartitions]
@@ -69,48 +67,49 @@ func TestCachedPartitions(t *testing.T) {
 	if 4 != len(client.cachedPartitions("my_topic", allPartitions)) {
 		t.Fatal("Not using the cache!")
 	}
+
+	leader.Close()
+	seedBroker.Close()
+	safeClose(t, client)
 }
 
 func TestClientSeedBrokers(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	discoveredBroker := NewMockBroker(t, 2)
 
-	mb1 := NewMockBroker(t, 1)
-	mb2 := NewMockBroker(t, 2)
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(discoveredBroker.Addr(), discoveredBroker.BrokerID())
+	seedBroker.Returns(metadataResponse)
 
-	mdr := new(MetadataResponse)
-	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mb1.Returns(mdr)
-
-	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, client)
-	defer mb1.Close()
-	defer mb2.Close()
+
+	discoveredBroker.Close()
+	seedBroker.Close()
+	safeClose(t, client)
 }
 
 func TestClientMetadata(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	leader := NewMockBroker(t, 5)
 
-	mb1 := NewMockBroker(t, 1)
-	mb5 := NewMockBroker(t, 5)
 	replicas := []int32{3, 1, 5}
 	isr := []int32{5, 1}
 
-	mdr := new(MetadataResponse)
-	mdr.AddBroker(mb5.Addr(), mb5.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, mb5.BrokerID(), replicas, isr, NoError)
-	mdr.AddTopicPartition("my_topic", 1, mb5.BrokerID(), replicas, isr, LeaderNotAvailable)
-	mb1.Returns(mdr)
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, LeaderNotAvailable)
+	seedBroker.Returns(metadataResponse)
 
 	config := NewClientConfig()
 	config.MetadataRetries = 0
-	client, err := NewClient("client_id", []string{mb1.Addr()}, config)
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer safeClose(t, client)
-	defer mb1.Close()
-	defer mb5.Close()
 
 	topics, err := client.Topics()
 	if err != nil {
@@ -159,21 +158,25 @@ func TestClientMetadata(t *testing.T) {
 	} else if isr[1] != 5 {
 		t.Error("Incorrect (or unsorted) isr")
 	}
+
+	leader.Close()
+	seedBroker.Close()
+	safeClose(t, client)
 }
 
 func TestClientRefreshBehaviour(t *testing.T) {
-	mb1 := NewMockBroker(t, 1)
-	mb5 := NewMockBroker(t, 5)
+	seedBroker := NewMockBroker(t, 1)
+	leader := NewMockBroker(t, 5)
 
-	mdr := new(MetadataResponse)
-	mdr.AddBroker(mb5.Addr(), mb5.BrokerID())
-	mb1.Returns(mdr)
+	metadataResponse1 := new(MetadataResponse)
+	metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
+	seedBroker.Returns(metadataResponse1)
 
-	mdr2 := new(MetadataResponse)
-	mdr2.AddTopicPartition("my_topic", 0xb, mb5.BrokerID(), nil, nil, NoError)
-	mb1.Returns(mdr2)
+	metadataResponse2 := new(MetadataResponse)
+	metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, NoError)
+	seedBroker.Returns(metadataResponse2)
 
-	client, err := NewClient("clientID", []string{mb1.Addr()}, nil)
+	client, err := NewClient("clientID", []string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -193,7 +196,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	}
 
 	client.disconnectBroker(tst)
-	mb5.Close()
-	mb1.Close()
+	leader.Close()
+	seedBroker.Close()
 	safeClose(t, client)
 }

+ 105 - 96
consumer_test.go

@@ -22,21 +22,21 @@ func TestDefaultPartitionConsumerConfigValidates(t *testing.T) {
 }
 
 func TestConsumerOffsetManual(t *testing.T) {
-	mb1 := NewMockBroker(t, 1)
-	mb2 := NewMockBroker(t, 2)
+	seedBroker := NewMockBroker(t, 1)
+	leader := NewMockBroker(t, 2)
 
-	mdr := new(MetadataResponse)
-	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
-	mb1.Returns(mdr)
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	seedBroker.Returns(metadataResponse)
 
 	for i := 0; i <= 10; i++ {
-		fr := new(FetchResponse)
-		fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
-		mb2.Returns(fr)
+		fetchResponse := new(FetchResponse)
+		fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
+		leader.Returns(fetchResponse)
 	}
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
 
 	if err != nil {
 		t.Fatal(err)
@@ -54,7 +54,7 @@ func TestConsumerOffsetManual(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	mb1.Close()
+	seedBroker.Close()
 
 	for i := 0; i < 10; i++ {
 		event := <-consumer.Events()
@@ -68,31 +68,31 @@ func TestConsumerOffsetManual(t *testing.T) {
 
 	safeClose(t, consumer)
 	safeClose(t, client)
-	mb2.Close()
+	leader.Close()
 }
 
 func TestConsumerLatestOffset(t *testing.T) {
-	mb1 := NewMockBroker(t, 1)
-	mb2 := NewMockBroker(t, 2)
+	seedBroker := NewMockBroker(t, 1)
+	leader := NewMockBroker(t, 2)
 
-	mdr := new(MetadataResponse)
-	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
-	mb1.Returns(mdr)
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	seedBroker.Returns(metadataResponse)
 
-	or := new(OffsetResponse)
-	or.AddTopicPartition("my_topic", 0, 0x010101)
-	mb2.Returns(or)
+	offsetResponse := new(OffsetResponse)
+	offsetResponse.AddTopicPartition("my_topic", 0, 0x010101)
+	leader.Returns(offsetResponse)
 
-	fr := new(FetchResponse)
-	fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
-	mb2.Returns(fr)
+	fetchResponse := new(FetchResponse)
+	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
+	leader.Returns(fetchResponse)
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
-	mb1.Close()
+	seedBroker.Close()
 
 	master, err := NewConsumer(client, nil)
 	if err != nil {
@@ -106,7 +106,7 @@ func TestConsumerLatestOffset(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	mb2.Close()
+	leader.Close()
 	safeClose(t, consumer)
 	safeClose(t, client)
 
@@ -120,24 +120,24 @@ func TestConsumerFunnyOffsets(t *testing.T) {
 	// for topics that are compressed and/or compacted (different things!) we have to be
 	// able to handle receiving offsets that are non-sequential (though still strictly increasing) and
 	// possibly starting prior to the actual value we requested
-	mb1 := NewMockBroker(t, 1)
-	mb2 := NewMockBroker(t, 2)
+	seedBroker := NewMockBroker(t, 1)
+	leader := NewMockBroker(t, 2)
 
-	mdr := new(MetadataResponse)
-	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
-	mb1.Returns(mdr)
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	seedBroker.Returns(metadataResponse)
 
-	fr := new(FetchResponse)
-	fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
-	fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
-	mb2.Returns(fr)
+	fetchResponse := new(FetchResponse)
+	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
+	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
+	leader.Returns(fetchResponse)
 
-	fr = new(FetchResponse)
-	fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
-	mb2.Returns(fr)
+	fetchResponse = new(FetchResponse)
+	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
+	leader.Returns(fetchResponse)
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -160,27 +160,27 @@ func TestConsumerFunnyOffsets(t *testing.T) {
 		t.Error("Incorrect message offset!")
 	}
 
-	mb2.Close()
-	mb1.Close()
+	leader.Close()
+	seedBroker.Close()
 	safeClose(t, consumer)
 	safeClose(t, client)
 }
 
 func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	// initial setup
-	mb1 := NewMockBroker(t, 1)
-	mb2 := NewMockBroker(t, 2)
-	mb3 := NewMockBroker(t, 3)
+	seedBroker := NewMockBroker(t, 1)
+	leader0 := NewMockBroker(t, 2)
+	leader1 := NewMockBroker(t, 3)
 
-	mdr := new(MetadataResponse)
-	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddBroker(mb3.Addr(), mb3.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
-	mdr.AddTopicPartition("my_topic", 1, 3, nil, nil, NoError)
-	mb1.Returns(mdr)
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
+	metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
+	seedBroker.Returns(metadataResponse)
 
 	// launch test goroutines
-	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -194,6 +194,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 	config.OffsetMethod = OffsetMethodManual
 	config.OffsetValue = 0
 
+	// we expect to end up (eventually) consuming exactly ten messages on each partition
 	var wg sync.WaitGroup
 	for i := 0; i < 2; i++ {
 		consumer, err := master.ConsumePartition("my_topic", int32(i), config)
@@ -219,67 +220,75 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
 		}(int32(i), consumer)
 	}
 
-	// generate broker responses
-	fr := new(FetchResponse)
+	// leader0 provides first four messages on partition 0
+	fetchResponse := new(FetchResponse)
 	for i := 0; i < 4; i++ {
-		fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
+		fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
 	}
-	mb2.Returns(fr)
-
-	fr = new(FetchResponse)
-	fr.AddError("my_topic", 0, NotLeaderForPartition)
-	mb2.Returns(fr)
-
-	mdr = new(MetadataResponse)
-	mdr.AddTopicPartition("my_topic", 0, 3, nil, nil, NoError)
-	mdr.AddTopicPartition("my_topic", 1, 3, nil, nil, NoError)
-	mb1.Returns(mdr)
+	leader0.Returns(fetchResponse)
+
+	// leader0 says no longer leader of partition 0
+	fetchResponse = new(FetchResponse)
+	fetchResponse.AddError("my_topic", 0, NotLeaderForPartition)
+	leader0.Returns(fetchResponse)
+
+	// metadata assigns both partitions to leader1
+	metadataResponse = new(MetadataResponse)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
+	seedBroker.Returns(metadataResponse)
 	time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering
 
-	fr = new(FetchResponse)
+	// leader1 provides five messages on partition 1
+	fetchResponse = new(FetchResponse)
 	for i := 0; i < 5; i++ {
-		fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
+		fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
 	}
-	mb3.Returns(fr)
+	leader1.Returns(fetchResponse)
 
-	fr = new(FetchResponse)
+	// leader1 provides three more messages on both partitions
+	fetchResponse = new(FetchResponse)
 	for i := 0; i < 3; i++ {
-		fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+4))
-		fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+5))
+		fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+4))
+		fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+5))
 	}
-	mb3.Returns(fr)
+	leader1.Returns(fetchResponse)
 
-	fr = new(FetchResponse)
+	// leader1 provides three more messages on partition0, says no longer leader of partition1
+	fetchResponse = new(FetchResponse)
 	for i := 0; i < 3; i++ {
-		fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
+		fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
 	}
-	fr.AddError("my_topic", 1, NotLeaderForPartition)
-	mb3.Returns(fr)
-
-	mdr = new(MetadataResponse)
-	mdr.AddTopicPartition("my_topic", 0, 3, nil, nil, NoError)
-	mdr.AddTopicPartition("my_topic", 1, 2, nil, nil, NoError)
-	mb1.Returns(mdr)
+	fetchResponse.AddError("my_topic", 1, NotLeaderForPartition)
+	leader1.Returns(fetchResponse)
+
+	// metadata assigns 0 to leader1 and 1 to leader0
+	metadataResponse = new(MetadataResponse)
+	metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader0.BrokerID(), nil, nil, NoError)
+	seedBroker.Returns(metadataResponse)
 	time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering
 
-	fr = new(FetchResponse)
-	fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
-	fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
-	mb2.Returns(fr)
+	// leader0 provides two messages on partition 1
+	fetchResponse = new(FetchResponse)
+	fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
+	fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
+	leader0.Returns(fetchResponse)
 
-	// cleanup
-	fr = new(FetchResponse)
-	fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
-	mb2.Returns(fr)
+	// leader0 provides last message  on partition 1
+	fetchResponse = new(FetchResponse)
+	fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
+	leader0.Returns(fetchResponse)
 
-	fr = new(FetchResponse)
-	fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
-	mb3.Returns(fr)
+	// leader1 provides last message  on partition 0
+	fetchResponse = new(FetchResponse)
+	fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
+	leader1.Returns(fetchResponse)
 
 	wg.Wait()
-	mb3.Close()
-	mb2.Close()
-	mb1.Close()
+	leader1.Close()
+	leader0.Close()
+	seedBroker.Close()
 	safeClose(t, client)
 }
 

+ 162 - 163
producer_test.go

@@ -16,21 +16,21 @@ func TestDefaultProducerConfigValidates(t *testing.T) {
 }
 
 func TestSimpleProducer(t *testing.T) {
-	broker1 := NewMockBroker(t, 1)
-	broker2 := NewMockBroker(t, 2)
+	seedBroker := NewMockBroker(t, 1)
+	leader := NewMockBroker(t, 2)
 
-	response1 := new(MetadataResponse)
-	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
-	broker1.Returns(response1)
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
+	seedBroker.Returns(metadataResponse)
 
-	response2 := new(ProduceResponse)
-	response2.AddTopicPartition("my_topic", 0, NoError)
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
 	for i := 0; i < 10; i++ {
-		broker2.Returns(response2)
+		leader.Returns(prodSuccess)
 	}
 
-	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -49,24 +49,24 @@ func TestSimpleProducer(t *testing.T) {
 
 	safeClose(t, producer)
 	safeClose(t, client)
-	broker2.Close()
-	broker1.Close()
+	leader.Close()
+	seedBroker.Close()
 }
 
 func TestConcurrentSimpleProducer(t *testing.T) {
-	broker1 := NewMockBroker(t, 1)
-	broker2 := NewMockBroker(t, 2)
+	seedBroker := NewMockBroker(t, 1)
+	leader := NewMockBroker(t, 2)
 
-	response1 := new(MetadataResponse)
-	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
-	broker1.Returns(response1)
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
+	seedBroker.Returns(metadataResponse)
 
-	response2 := new(ProduceResponse)
-	response2.AddTopicPartition("my_topic", 0, NoError)
-	broker2.Returns(response2)
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	leader.Returns(prodSuccess)
 
-	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -94,24 +94,24 @@ func TestConcurrentSimpleProducer(t *testing.T) {
 
 	safeClose(t, producer)
 	safeClose(t, client)
-	broker2.Close()
-	broker1.Close()
+	leader.Close()
+	seedBroker.Close()
 }
 
 func TestProducer(t *testing.T) {
-	broker1 := NewMockBroker(t, 1)
-	broker2 := NewMockBroker(t, 2)
+	seedBroker := NewMockBroker(t, 1)
+	leader := NewMockBroker(t, 2)
 
-	response1 := new(MetadataResponse)
-	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
-	broker1.Returns(response1)
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	seedBroker.Returns(metadataResponse)
 
-	response2 := new(ProduceResponse)
-	response2.AddTopicPartition("my_topic", 0, NoError)
-	broker2.Returns(response2)
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	leader.Returns(prodSuccess)
 
-	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -146,26 +146,26 @@ func TestProducer(t *testing.T) {
 
 	safeClose(t, producer)
 	safeClose(t, client)
-	broker2.Close()
-	broker1.Close()
+	leader.Close()
+	seedBroker.Close()
 }
 
 func TestProducerMultipleFlushes(t *testing.T) {
-	broker1 := NewMockBroker(t, 1)
-	broker2 := NewMockBroker(t, 2)
+	seedBroker := NewMockBroker(t, 1)
+	leader := NewMockBroker(t, 2)
 
-	response1 := new(MetadataResponse)
-	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
-	broker1.Returns(response1)
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	seedBroker.Returns(metadataResponse)
 
-	response2 := new(ProduceResponse)
-	response2.AddTopicPartition("my_topic", 0, NoError)
-	broker2.Returns(response2)
-	broker2.Returns(response2)
-	broker2.Returns(response2)
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	leader.Returns(prodSuccess)
+	leader.Returns(prodSuccess)
+	leader.Returns(prodSuccess)
 
-	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -199,31 +199,31 @@ func TestProducerMultipleFlushes(t *testing.T) {
 
 	safeClose(t, producer)
 	safeClose(t, client)
-	broker2.Close()
-	broker1.Close()
+	leader.Close()
+	seedBroker.Close()
 }
 
 func TestProducerMultipleBrokers(t *testing.T) {
-	broker1 := NewMockBroker(t, 1)
-	broker2 := NewMockBroker(t, 2)
-	broker3 := NewMockBroker(t, 3)
-
-	response1 := new(MetadataResponse)
-	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddBroker(broker3.Addr(), broker3.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
-	response1.AddTopicPartition("my_topic", 1, broker3.BrokerID(), nil, nil, NoError)
-	broker1.Returns(response1)
-
-	response2 := new(ProduceResponse)
-	response2.AddTopicPartition("my_topic", 0, NoError)
-	broker2.Returns(response2)
-
-	response3 := new(ProduceResponse)
-	response3.AddTopicPartition("my_topic", 1, NoError)
-	broker3.Returns(response3)
-
-	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	seedBroker := NewMockBroker(t, 1)
+	leader0 := NewMockBroker(t, 2)
+	leader1 := NewMockBroker(t, 3)
+
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
+	metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, NoError)
+	metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
+	seedBroker.Returns(metadataResponse)
+
+	prodResponse0 := new(ProduceResponse)
+	prodResponse0.AddTopicPartition("my_topic", 0, NoError)
+	leader0.Returns(prodResponse0)
+
+	prodResponse1 := new(ProduceResponse)
+	prodResponse1.AddTopicPartition("my_topic", 1, NoError)
+	leader1.Returns(prodResponse1)
+
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -256,22 +256,22 @@ func TestProducerMultipleBrokers(t *testing.T) {
 
 	safeClose(t, producer)
 	safeClose(t, client)
-	broker3.Close()
-	broker2.Close()
-	broker1.Close()
+	leader1.Close()
+	leader0.Close()
+	seedBroker.Close()
 }
 
 func TestProducerFailureRetry(t *testing.T) {
-	broker1 := NewMockBroker(t, 1)
-	broker2 := NewMockBroker(t, 2)
-	broker3 := NewMockBroker(t, 3)
+	seedBroker := NewMockBroker(t, 1)
+	leader1 := NewMockBroker(t, 2)
+	leader2 := NewMockBroker(t, 3)
 
-	response1 := new(MetadataResponse)
-	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
-	broker1.Returns(response1)
+	metadataLeader1 := new(MetadataResponse)
+	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
+	seedBroker.Returns(metadataLeader1)
 
-	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -284,23 +284,23 @@ func TestProducerFailureRetry(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	broker1.Close()
+	seedBroker.Close()
 
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
-	response2 := new(ProduceResponse)
-	response2.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
-	broker2.Returns(response2)
+	prodNotLeader := new(ProduceResponse)
+	prodNotLeader.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
+	leader1.Returns(prodNotLeader)
 
-	response3 := new(MetadataResponse)
-	response3.AddBroker(broker3.Addr(), broker3.BrokerID())
-	response3.AddTopicPartition("my_topic", 0, broker3.BrokerID(), nil, nil, NoError)
-	broker2.Returns(response3)
+	metadataLeader2 := new(MetadataResponse)
+	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, NoError)
+	leader1.Returns(metadataLeader2)
 
-	response4 := new(ProduceResponse)
-	response4.AddTopicPartition("my_topic", 0, NoError)
-	broker3.Returns(response4)
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	leader2.Returns(prodSuccess)
 	for i := 0; i < 10; i++ {
 		select {
 		case msg := <-producer.Errors():
@@ -314,12 +314,12 @@ func TestProducerFailureRetry(t *testing.T) {
 			}
 		}
 	}
-	broker2.Close()
+	leader1.Close()
 
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
-	broker3.Returns(response4)
+	leader2.Returns(prodSuccess)
 	for i := 0; i < 10; i++ {
 		select {
 		case msg := <-producer.Errors():
@@ -334,23 +334,22 @@ func TestProducerFailureRetry(t *testing.T) {
 		}
 	}
 
-	broker3.Close()
+	leader2.Close()
 	safeClose(t, producer)
 	safeClose(t, client)
 }
 
 func TestProducerBrokerBounce(t *testing.T) {
-	broker1 := NewMockBroker(t, 1)
-	broker2 := NewMockBroker(t, 2)
+	seedBroker := NewMockBroker(t, 1)
+	leader := NewMockBroker(t, 2)
+	leaderAddr := leader.Addr()
 
-	addr2 := broker2.Addr()
+	metadataResponse := new(MetadataResponse)
+	metadataResponse.AddBroker(leaderAddr, leader.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
+	seedBroker.Returns(metadataResponse)
 
-	response1 := new(MetadataResponse)
-	response1.AddBroker(addr2, broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
-	broker1.Returns(response1)
-
-	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -367,13 +366,13 @@ func TestProducerBrokerBounce(t *testing.T) {
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
-	broker2.Close()                          // producer should get EOF
-	broker2 = NewMockBrokerAddr(t, 2, addr2) // start it up again right away for giggles
-	broker1.Returns(response1)               // tell it to go to broker 2 again
+	leader.Close()                               // producer should get EOF
+	leader = NewMockBrokerAddr(t, 2, leaderAddr) // start it up again right away for giggles
+	seedBroker.Returns(metadataResponse)         // tell it to go to broker 2 again
 
-	response2 := new(ProduceResponse)
-	response2.AddTopicPartition("my_topic", 0, NoError)
-	broker2.Returns(response2)
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	leader.Returns(prodSuccess)
 	for i := 0; i < 10; i++ {
 		select {
 		case msg := <-producer.Errors():
@@ -387,24 +386,24 @@ func TestProducerBrokerBounce(t *testing.T) {
 			}
 		}
 	}
-	broker1.Close()
-	broker2.Close()
+	seedBroker.Close()
+	leader.Close()
 
 	safeClose(t, producer)
 	safeClose(t, client)
 }
 
 func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
-	broker1 := NewMockBroker(t, 1)
-	broker2 := NewMockBroker(t, 2)
-	broker3 := NewMockBroker(t, 3)
+	seedBroker := NewMockBroker(t, 1)
+	leader1 := NewMockBroker(t, 2)
+	leader2 := NewMockBroker(t, 3)
 
-	response1 := new(MetadataResponse)
-	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
-	broker1.Returns(response1)
+	metadataLeader1 := new(MetadataResponse)
+	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
+	seedBroker.Returns(metadataLeader1)
 
-	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -422,19 +421,19 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
-	broker2.Close()            // producer should get EOF
-	broker1.Returns(response1) // tell it to go to broker2 again even though it's still down
-	broker1.Returns(response1) // tell it to go to broker2 again even though it's still down
+	leader1.Close()                     // producer should get EOF
+	seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
+	seedBroker.Returns(metadataLeader1) // tell it to go to leader1 again even though it's still down
 
-	// ok fine, tell it to go to broker3 finally
-	response2 := new(MetadataResponse)
-	response2.AddBroker(broker3.Addr(), broker3.BrokerID())
-	response2.AddTopicPartition("my_topic", 0, broker3.BrokerID(), nil, nil, NoError)
-	broker1.Returns(response2)
+	// ok fine, tell it to go to leader2 finally
+	metadataLeader2 := new(MetadataResponse)
+	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, NoError)
+	seedBroker.Returns(metadataLeader2)
 
-	response3 := new(ProduceResponse)
-	response3.AddTopicPartition("my_topic", 0, NoError)
-	broker3.Returns(response3)
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	leader2.Returns(prodSuccess)
 	for i := 0; i < 10; i++ {
 		select {
 		case msg := <-producer.Errors():
@@ -448,24 +447,24 @@ func TestProducerBrokerBounceWithStaleMetadata(t *testing.T) {
 			}
 		}
 	}
-	broker1.Close()
-	broker3.Close()
+	seedBroker.Close()
+	leader2.Close()
 
 	safeClose(t, producer)
 	safeClose(t, client)
 }
 
 func TestProducerMultipleRetries(t *testing.T) {
-	broker1 := NewMockBroker(t, 1)
-	broker2 := NewMockBroker(t, 2)
-	broker3 := NewMockBroker(t, 3)
+	seedBroker := NewMockBroker(t, 1)
+	leader1 := NewMockBroker(t, 2)
+	leader2 := NewMockBroker(t, 3)
 
-	response1 := new(MetadataResponse)
-	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
-	broker1.Returns(response1)
+	metadataLeader1 := new(MetadataResponse)
+	metadataLeader1.AddBroker(leader1.Addr(), leader1.BrokerID())
+	metadataLeader1.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
+	seedBroker.Returns(metadataLeader1)
 
-	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
+	client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -483,24 +482,24 @@ func TestProducerMultipleRetries(t *testing.T) {
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
-	response2 := new(ProduceResponse)
-	response2.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
-	broker2.Returns(response2)
-
-	response3 := new(MetadataResponse)
-	response3.AddBroker(broker3.Addr(), broker3.BrokerID())
-	response3.AddTopicPartition("my_topic", 0, broker3.BrokerID(), nil, nil, NoError)
-	broker1.Returns(response3)
-	broker3.Returns(response2)
-	broker1.Returns(response1)
-	broker2.Returns(response2)
-	broker1.Returns(response1)
-	broker2.Returns(response2)
-	broker1.Returns(response3)
-
-	response4 := new(ProduceResponse)
-	response4.AddTopicPartition("my_topic", 0, NoError)
-	broker3.Returns(response4)
+	prodNotLeader := new(ProduceResponse)
+	prodNotLeader.AddTopicPartition("my_topic", 0, NotLeaderForPartition)
+	leader1.Returns(prodNotLeader)
+
+	metadataLeader2 := new(MetadataResponse)
+	metadataLeader2.AddBroker(leader2.Addr(), leader2.BrokerID())
+	metadataLeader2.AddTopicPartition("my_topic", 0, leader2.BrokerID(), nil, nil, NoError)
+	seedBroker.Returns(metadataLeader2)
+	leader2.Returns(prodNotLeader)
+	seedBroker.Returns(metadataLeader1)
+	leader1.Returns(prodNotLeader)
+	seedBroker.Returns(metadataLeader1)
+	leader1.Returns(prodNotLeader)
+	seedBroker.Returns(metadataLeader2)
+
+	prodSuccess := new(ProduceResponse)
+	prodSuccess.AddTopicPartition("my_topic", 0, NoError)
+	leader2.Returns(prodSuccess)
 	for i := 0; i < 10; i++ {
 		select {
 		case msg := <-producer.Errors():
@@ -518,7 +517,7 @@ func TestProducerMultipleRetries(t *testing.T) {
 	for i := 0; i < 10; i++ {
 		producer.Input() <- &MessageToSend{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
 	}
-	broker3.Returns(response4)
+	leader2.Returns(prodSuccess)
 	for i := 0; i < 10; i++ {
 		select {
 		case msg := <-producer.Errors():
@@ -533,9 +532,9 @@ func TestProducerMultipleRetries(t *testing.T) {
 		}
 	}
 
-	broker1.Close()
-	broker2.Close()
-	broker3.Close()
+	seedBroker.Close()
+	leader1.Close()
+	leader2.Close()
 	safeClose(t, producer)
 	safeClose(t, client)
 }