|
|
@@ -350,16 +350,16 @@ func TestConsumerInterleavedClose(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestConsumerBounceWithReferenceOpen(t *testing.T) {
|
|
|
- t.Skip("This is not yet working due to concurrency on the mock broker")
|
|
|
-
|
|
|
seedBroker := newMockBroker(t, 1)
|
|
|
leader := newMockBroker(t, 2)
|
|
|
leaderAddr := leader.Addr()
|
|
|
+ tmp := newMockBroker(t, 3)
|
|
|
|
|
|
metadataResponse := new(MetadataResponse)
|
|
|
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ metadataResponse.AddBroker(tmp.Addr(), tmp.BrokerID())
|
|
|
metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
|
|
|
- metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 1, tmp.BrokerID(), nil, nil, ErrNoError)
|
|
|
seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
config := NewConfig()
|
|
|
@@ -371,17 +371,44 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
|
|
|
+ offsetResponseNewest := new(OffsetResponse)
|
|
|
+ offsetResponseNewest.AddTopicPartition("my_topic", 0, 1234)
|
|
|
+ leader.Returns(offsetResponseNewest)
|
|
|
+
|
|
|
+ offsetResponseOldest := new(OffsetResponse)
|
|
|
+ offsetResponseOldest.AddTopicPartition("my_topic", 0, 0)
|
|
|
+ leader.Returns(offsetResponseOldest)
|
|
|
+
|
|
|
c0, err := master.ConsumePartition("my_topic", 0, 0)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
|
|
|
+ offsetResponseNewest = new(OffsetResponse)
|
|
|
+ offsetResponseNewest.AddTopicPartition("my_topic", 1, 1234)
|
|
|
+ tmp.Returns(offsetResponseNewest)
|
|
|
+
|
|
|
+ offsetResponseOldest = new(OffsetResponse)
|
|
|
+ offsetResponseOldest.AddTopicPartition("my_topic", 1, 0)
|
|
|
+ tmp.Returns(offsetResponseOldest)
|
|
|
+
|
|
|
c1, err := master.ConsumePartition("my_topic", 1, 0)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
|
|
|
+ //redirect partition 1 back to main leader
|
|
|
fetchResponse := new(FetchResponse)
|
|
|
+ fetchResponse.AddError("my_topic", 1, ErrNotLeaderForPartition)
|
|
|
+ tmp.Returns(fetchResponse)
|
|
|
+ metadataResponse = new(MetadataResponse)
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, ErrNoError)
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), nil, nil, ErrNoError)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
+ time.Sleep(5 * time.Millisecond)
|
|
|
+
|
|
|
+ // now send one message to each partition to make sure everything is primed
|
|
|
+ fetchResponse = new(FetchResponse)
|
|
|
fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(0))
|
|
|
fetchResponse.AddError("my_topic", 1, ErrNoError)
|
|
|
leader.Returns(fetchResponse)
|
|
|
@@ -393,6 +420,7 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
|
|
|
leader.Returns(fetchResponse)
|
|
|
<-c1.Messages()
|
|
|
|
|
|
+ // bounce the broker
|
|
|
leader.Close()
|
|
|
leader = newMockBrokerAddr(t, 2, leaderAddr)
|
|
|
|
|
|
@@ -419,6 +447,8 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
|
|
|
// send it back to the same broker
|
|
|
seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
+ time.Sleep(5 * time.Millisecond)
|
|
|
+
|
|
|
select {
|
|
|
case <-c0.Messages():
|
|
|
case <-c1.Messages():
|
|
|
@@ -438,6 +468,7 @@ func TestConsumerBounceWithReferenceOpen(t *testing.T) {
|
|
|
}()
|
|
|
wg.Wait()
|
|
|
safeClose(t, master)
|
|
|
+ tmp.Close()
|
|
|
}
|
|
|
|
|
|
func TestConsumerOffsetOutOfRange(t *testing.T) {
|