|
|
@@ -47,6 +47,7 @@ func TestConsumerOffsetManual(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
safeClose(t, consumer)
|
|
|
+ safeClose(t, master)
|
|
|
leader.Close()
|
|
|
}
|
|
|
|
|
|
@@ -80,6 +81,7 @@ func TestConsumerLatestOffset(t *testing.T) {
|
|
|
|
|
|
leader.Close()
|
|
|
safeClose(t, consumer)
|
|
|
+ safeClose(t, master)
|
|
|
|
|
|
// we deliver one message, so it should be one higher than we return in the OffsetResponse
|
|
|
if consumer.(*partitionConsumer).offset != 0x010102 {
|
|
|
@@ -126,6 +128,7 @@ func TestConsumerFunnyOffsets(t *testing.T) {
|
|
|
leader.Close()
|
|
|
seedBroker.Close()
|
|
|
safeClose(t, consumer)
|
|
|
+ safeClose(t, master)
|
|
|
}
|
|
|
|
|
|
func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
@@ -248,6 +251,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
leader1.Close()
|
|
|
leader0.Close()
|
|
|
seedBroker.Close()
|
|
|
+ safeClose(t, master)
|
|
|
}
|
|
|
|
|
|
func TestConsumerInterleavedClose(t *testing.T) {
|
|
|
@@ -288,6 +292,7 @@ func TestConsumerInterleavedClose(t *testing.T) {
|
|
|
|
|
|
safeClose(t, c1)
|
|
|
safeClose(t, c0)
|
|
|
+ safeClose(t, master)
|
|
|
leader.Close()
|
|
|
seedBroker.Close()
|
|
|
}
|