|
@@ -26,7 +26,7 @@ func TestConsumerOffsetManual(t *testing.T) {
|
|
|
offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
|
|
|
leader.Returns(offsetResponseOldest)
|
|
|
|
|
|
- for i := 0; i <= 10; i++ {
|
|
|
+ for i := 0; i < 10; i++ {
|
|
|
fetchResponse := new(FetchResponse)
|
|
|
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
|
|
|
leader.Returns(fetchResponse)
|
|
@@ -326,27 +326,16 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
|
|
|
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
|
|
|
leader0.Returns(fetchResponse)
|
|
|
+ time.Sleep(50 * time.Millisecond)
|
|
|
|
|
|
-
|
|
|
- fetchResponse = new(FetchResponse)
|
|
|
- fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
|
|
|
- leader0.Returns(fetchResponse)
|
|
|
-
|
|
|
-
|
|
|
- fetchResponse = new(FetchResponse)
|
|
|
- fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
|
|
|
- leader1.Returns(fetchResponse)
|
|
|
-
|
|
|
- wg.Wait()
|
|
|
leader1.Close()
|
|
|
leader0.Close()
|
|
|
+ wg.Wait()
|
|
|
seedBroker.Close()
|
|
|
safeClose(t, master)
|
|
|
}
|
|
|
|
|
|
func TestConsumerInterleavedClose(t *testing.T) {
|
|
|
- t.Skip("Enable once bug #325 is fixed.")
|
|
|
-
|
|
|
seedBroker := newMockBroker(t, 1)
|
|
|
leader := newMockBroker(t, 2)
|
|
|
|
|
@@ -379,6 +368,7 @@ func TestConsumerInterleavedClose(t *testing.T) {
|
|
|
fetchResponse := new(FetchResponse)
|
|
|
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
|
|
|
leader.Returns(fetchResponse)
|
|
|
+ time.Sleep(50 * time.Millisecond)
|
|
|
|
|
|
offsetResponseNewest1 := new(OffsetResponse)
|
|
|
offsetResponseNewest1.AddTopicPartition("my_topic", 1, 1234)
|
|
@@ -392,7 +382,9 @@ func TestConsumerInterleavedClose(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
+ <-c0.Messages()
|
|
|
|
|
|
+ fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
|
|
|
fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
|
|
|
leader.Returns(fetchResponse)
|
|
|
|