|
|
@@ -22,21 +22,21 @@ func TestDefaultPartitionConsumerConfigValidates(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestConsumerOffsetManual(t *testing.T) {
|
|
|
- mb1 := NewMockBroker(t, 1)
|
|
|
- mb2 := NewMockBroker(t, 2)
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ leader := NewMockBroker(t, 2)
|
|
|
|
|
|
- mdr := new(MetadataResponse)
|
|
|
- mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
|
|
|
- mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
|
|
|
- mb1.Returns(mdr)
|
|
|
+ metadataResponse := new(MetadataResponse)
|
|
|
+ metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
for i := 0; i <= 10; i++ {
|
|
|
- fr := new(FetchResponse)
|
|
|
- fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
|
|
|
- mb2.Returns(fr)
|
|
|
+ fetchResponse := new(FetchResponse)
|
|
|
+ fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+1234))
|
|
|
+ leader.Returns(fetchResponse)
|
|
|
}
|
|
|
|
|
|
- client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
|
|
|
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
@@ -54,7 +54,7 @@ func TestConsumerOffsetManual(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- mb1.Close()
|
|
|
+ seedBroker.Close()
|
|
|
|
|
|
for i := 0; i < 10; i++ {
|
|
|
event := <-consumer.Events()
|
|
|
@@ -68,31 +68,31 @@ func TestConsumerOffsetManual(t *testing.T) {
|
|
|
|
|
|
safeClose(t, consumer)
|
|
|
safeClose(t, client)
|
|
|
- mb2.Close()
|
|
|
+ leader.Close()
|
|
|
}
|
|
|
|
|
|
func TestConsumerLatestOffset(t *testing.T) {
|
|
|
- mb1 := NewMockBroker(t, 1)
|
|
|
- mb2 := NewMockBroker(t, 2)
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ leader := NewMockBroker(t, 2)
|
|
|
|
|
|
- mdr := new(MetadataResponse)
|
|
|
- mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
|
|
|
- mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
|
|
|
- mb1.Returns(mdr)
|
|
|
+ metadataResponse := new(MetadataResponse)
|
|
|
+ metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
- or := new(OffsetResponse)
|
|
|
- or.AddTopicPartition("my_topic", 0, 0x010101)
|
|
|
- mb2.Returns(or)
|
|
|
+ offsetResponse := new(OffsetResponse)
|
|
|
+ offsetResponse.AddTopicPartition("my_topic", 0, 0x010101)
|
|
|
+ leader.Returns(offsetResponse)
|
|
|
|
|
|
- fr := new(FetchResponse)
|
|
|
- fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
|
|
|
- mb2.Returns(fr)
|
|
|
+ fetchResponse := new(FetchResponse)
|
|
|
+ fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
|
|
|
+ leader.Returns(fetchResponse)
|
|
|
|
|
|
- client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- mb1.Close()
|
|
|
+ seedBroker.Close()
|
|
|
|
|
|
master, err := NewConsumer(client, nil)
|
|
|
if err != nil {
|
|
|
@@ -106,7 +106,7 @@ func TestConsumerLatestOffset(t *testing.T) {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
|
|
|
- mb2.Close()
|
|
|
+ leader.Close()
|
|
|
safeClose(t, consumer)
|
|
|
safeClose(t, client)
|
|
|
|
|
|
@@ -120,24 +120,24 @@ func TestConsumerFunnyOffsets(t *testing.T) {
|
|
|
// for topics that are compressed and/or compacted (different things!) we have to be
|
|
|
// able to handle receiving offsets that are non-sequential (though still strictly increasing) and
|
|
|
// possibly starting prior to the actual value we requested
|
|
|
- mb1 := NewMockBroker(t, 1)
|
|
|
- mb2 := NewMockBroker(t, 2)
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ leader := NewMockBroker(t, 2)
|
|
|
|
|
|
- mdr := new(MetadataResponse)
|
|
|
- mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
|
|
|
- mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
|
|
|
- mb1.Returns(mdr)
|
|
|
+ metadataResponse := new(MetadataResponse)
|
|
|
+ metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
- fr := new(FetchResponse)
|
|
|
- fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
|
|
|
- fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
|
|
|
- mb2.Returns(fr)
|
|
|
+ fetchResponse := new(FetchResponse)
|
|
|
+ fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(1))
|
|
|
+ fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(3))
|
|
|
+ leader.Returns(fetchResponse)
|
|
|
|
|
|
- fr = new(FetchResponse)
|
|
|
- fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
|
|
|
- mb2.Returns(fr)
|
|
|
+ fetchResponse = new(FetchResponse)
|
|
|
+ fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(5))
|
|
|
+ leader.Returns(fetchResponse)
|
|
|
|
|
|
- client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
@@ -160,27 +160,27 @@ func TestConsumerFunnyOffsets(t *testing.T) {
|
|
|
t.Error("Incorrect message offset!")
|
|
|
}
|
|
|
|
|
|
- mb2.Close()
|
|
|
- mb1.Close()
|
|
|
+ leader.Close()
|
|
|
+ seedBroker.Close()
|
|
|
safeClose(t, consumer)
|
|
|
safeClose(t, client)
|
|
|
}
|
|
|
|
|
|
func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
// initial setup
|
|
|
- mb1 := NewMockBroker(t, 1)
|
|
|
- mb2 := NewMockBroker(t, 2)
|
|
|
- mb3 := NewMockBroker(t, 3)
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ leader0 := NewMockBroker(t, 2)
|
|
|
+ leader1 := NewMockBroker(t, 3)
|
|
|
|
|
|
- mdr := new(MetadataResponse)
|
|
|
- mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
|
|
|
- mdr.AddBroker(mb3.Addr(), mb3.BrokerID())
|
|
|
- mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
|
|
|
- mdr.AddTopicPartition("my_topic", 1, 3, nil, nil, NoError)
|
|
|
- mb1.Returns(mdr)
|
|
|
+ metadataResponse := new(MetadataResponse)
|
|
|
+ metadataResponse.AddBroker(leader0.Addr(), leader0.BrokerID())
|
|
|
+ metadataResponse.AddBroker(leader1.Addr(), leader1.BrokerID())
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, leader0.BrokerID(), nil, nil, NoError)
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
// launch test goroutines
|
|
|
- client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
@@ -194,6 +194,7 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
config.OffsetMethod = OffsetMethodManual
|
|
|
config.OffsetValue = 0
|
|
|
|
|
|
+ // we expect to end up (eventually) consuming exactly ten messages on each partition
|
|
|
var wg sync.WaitGroup
|
|
|
for i := 0; i < 2; i++ {
|
|
|
consumer, err := master.ConsumePartition("my_topic", int32(i), config)
|
|
|
@@ -219,67 +220,75 @@ func TestConsumerRebalancingMultiplePartitions(t *testing.T) {
|
|
|
}(int32(i), consumer)
|
|
|
}
|
|
|
|
|
|
- // generate broker responses
|
|
|
- fr := new(FetchResponse)
|
|
|
+ // leader0 provides first four messages on partition 0
|
|
|
+ fetchResponse := new(FetchResponse)
|
|
|
for i := 0; i < 4; i++ {
|
|
|
- fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
|
|
|
+ fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
|
|
|
}
|
|
|
- mb2.Returns(fr)
|
|
|
-
|
|
|
- fr = new(FetchResponse)
|
|
|
- fr.AddError("my_topic", 0, NotLeaderForPartition)
|
|
|
- mb2.Returns(fr)
|
|
|
-
|
|
|
- mdr = new(MetadataResponse)
|
|
|
- mdr.AddTopicPartition("my_topic", 0, 3, nil, nil, NoError)
|
|
|
- mdr.AddTopicPartition("my_topic", 1, 3, nil, nil, NoError)
|
|
|
- mb1.Returns(mdr)
|
|
|
+ leader0.Returns(fetchResponse)
|
|
|
+
|
|
|
+ // leader0 says no longer leader of partition 0
|
|
|
+ fetchResponse = new(FetchResponse)
|
|
|
+ fetchResponse.AddError("my_topic", 0, NotLeaderForPartition)
|
|
|
+ leader0.Returns(fetchResponse)
|
|
|
+
|
|
|
+ // metadata assigns both partitions to leader1
|
|
|
+ metadataResponse = new(MetadataResponse)
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 1, leader1.BrokerID(), nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering
|
|
|
|
|
|
- fr = new(FetchResponse)
|
|
|
+ // leader1 provides five messages on partition 1
|
|
|
+ fetchResponse = new(FetchResponse)
|
|
|
for i := 0; i < 5; i++ {
|
|
|
- fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
|
|
|
+ fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i))
|
|
|
}
|
|
|
- mb3.Returns(fr)
|
|
|
+ leader1.Returns(fetchResponse)
|
|
|
|
|
|
- fr = new(FetchResponse)
|
|
|
+ // leader1 provides three more messages on both partitions
|
|
|
+ fetchResponse = new(FetchResponse)
|
|
|
for i := 0; i < 3; i++ {
|
|
|
- fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+4))
|
|
|
- fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+5))
|
|
|
+ fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+4))
|
|
|
+ fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+5))
|
|
|
}
|
|
|
- mb3.Returns(fr)
|
|
|
+ leader1.Returns(fetchResponse)
|
|
|
|
|
|
- fr = new(FetchResponse)
|
|
|
+ // leader1 provides three more messages on partition0, says no longer leader of partition1
|
|
|
+ fetchResponse = new(FetchResponse)
|
|
|
for i := 0; i < 3; i++ {
|
|
|
- fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
|
|
|
+ fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(i+7))
|
|
|
}
|
|
|
- fr.AddError("my_topic", 1, NotLeaderForPartition)
|
|
|
- mb3.Returns(fr)
|
|
|
-
|
|
|
- mdr = new(MetadataResponse)
|
|
|
- mdr.AddTopicPartition("my_topic", 0, 3, nil, nil, NoError)
|
|
|
- mdr.AddTopicPartition("my_topic", 1, 2, nil, nil, NoError)
|
|
|
- mb1.Returns(mdr)
|
|
|
+ fetchResponse.AddError("my_topic", 1, NotLeaderForPartition)
|
|
|
+ leader1.Returns(fetchResponse)
|
|
|
+
|
|
|
+ // metadata assigns 0 to leader1 and 1 to leader0
|
|
|
+ metadataResponse = new(MetadataResponse)
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, leader1.BrokerID(), nil, nil, NoError)
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 1, leader0.BrokerID(), nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
time.Sleep(5 * time.Millisecond) // dumbest way to force a particular response ordering
|
|
|
|
|
|
- fr = new(FetchResponse)
|
|
|
- fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
|
|
|
- fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
|
|
|
- mb2.Returns(fr)
|
|
|
+ // leader0 provides two messages on partition 1
|
|
|
+ fetchResponse = new(FetchResponse)
|
|
|
+ fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(8))
|
|
|
+ fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(9))
|
|
|
+ leader0.Returns(fetchResponse)
|
|
|
|
|
|
- // cleanup
|
|
|
- fr = new(FetchResponse)
|
|
|
- fr.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
|
|
|
- mb2.Returns(fr)
|
|
|
+ // leader0 provides last message on partition 1
|
|
|
+ fetchResponse = new(FetchResponse)
|
|
|
+ fetchResponse.AddMessage("my_topic", 1, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
|
|
|
+ leader0.Returns(fetchResponse)
|
|
|
|
|
|
- fr = new(FetchResponse)
|
|
|
- fr.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
|
|
|
- mb3.Returns(fr)
|
|
|
+ // leader1 provides last message on partition 0
|
|
|
+ fetchResponse = new(FetchResponse)
|
|
|
+ fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), int64(10))
|
|
|
+ leader1.Returns(fetchResponse)
|
|
|
|
|
|
wg.Wait()
|
|
|
- mb3.Close()
|
|
|
- mb2.Close()
|
|
|
- mb1.Close()
|
|
|
+ leader1.Close()
|
|
|
+ leader0.Close()
|
|
|
+ seedBroker.Close()
|
|
|
safeClose(t, client)
|
|
|
}
|
|
|
|