|
@@ -59,7 +59,7 @@ func TestConsumerOffsetManual(t *testing.T) {
|
|
|
leader.Close()
|
|
leader.Close()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func TestConsumerLatestOffset(t *testing.T) {
|
|
|
|
|
|
|
+func TestConsumerOffsetNewest(t *testing.T) {
|
|
|
seedBroker := newMockBroker(t, 1)
|
|
seedBroker := newMockBroker(t, 1)
|
|
|
leader := newMockBroker(t, 2)
|
|
leader := newMockBroker(t, 2)
|
|
|
|
|
|
|
@@ -69,15 +69,17 @@ func TestConsumerLatestOffset(t *testing.T) {
|
|
|
seedBroker.Returns(metadataResponse)
|
|
seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
|
|
offsetResponseNewest := new(OffsetResponse)
|
|
offsetResponseNewest := new(OffsetResponse)
|
|
|
- offsetResponseNewest.AddTopicPartition("my_topic", 0, 0x010102)
|
|
|
|
|
|
|
+ offsetResponseNewest.AddTopicPartition("my_topic", 0, 10)
|
|
|
leader.Returns(offsetResponseNewest)
|
|
leader.Returns(offsetResponseNewest)
|
|
|
|
|
|
|
|
offsetResponseOldest := new(OffsetResponse)
|
|
offsetResponseOldest := new(OffsetResponse)
|
|
|
- offsetResponseOldest.AddTopicPartition("my_topic", 0, 0x010101)
|
|
|
|
|
|
|
+ offsetResponseOldest.AddTopicPartition("my_topic", 0, 7)
|
|
|
leader.Returns(offsetResponseOldest)
|
|
leader.Returns(offsetResponseOldest)
|
|
|
|
|
|
|
|
fetchResponse := new(FetchResponse)
|
|
fetchResponse := new(FetchResponse)
|
|
|
- fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 0x010101)
|
|
|
|
|
|
|
+ fetchResponse.AddMessage("my_topic", 0, nil, ByteEncoder([]byte{0x00, 0x0E}), 10)
|
|
|
|
|
+ block := fetchResponse.GetBlock("my_topic", 0)
|
|
|
|
|
+ block.HighWaterMarkOffset = 14
|
|
|
leader.Returns(fetchResponse)
|
|
leader.Returns(fetchResponse)
|
|
|
|
|
|
|
|
master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
|
|
master, err := NewConsumer([]string{seedBroker.Addr()}, nil)
|
|
@@ -91,12 +93,24 @@ func TestConsumerLatestOffset(t *testing.T) {
|
|
|
t.Fatal(err)
|
|
t.Fatal(err)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ msg := <-consumer.Messages()
|
|
|
|
|
+
|
|
|
|
|
+ // we deliver one message, so it should be one higher than we return in the OffsetResponse
|
|
|
|
|
+ if msg.Offset != 10 {
|
|
|
|
|
+ t.Error("Latest message offset not fetched correctly:", msg.Offset)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 {
|
|
|
|
|
+ t.Errorf("Expected high water mark offset 14, found %d", hwmo)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
leader.Close()
|
|
leader.Close()
|
|
|
safeClose(t, consumer)
|
|
safeClose(t, consumer)
|
|
|
safeClose(t, master)
|
|
safeClose(t, master)
|
|
|
|
|
|
|
|
- // we deliver one message, so it should be one higher than we return in the OffsetResponse
|
|
|
|
|
- if consumer.(*partitionConsumer).offset != 0x010102 {
|
|
|
|
|
|
|
+ // We deliver one message, so it should be one higher than we return in the OffsetResponse.
|
|
|
|
|
+ // This way it is set correctly for the next FetchRequest.
|
|
|
|
|
+ if consumer.(*partitionConsumer).offset != 11 {
|
|
|
t.Error("Latest offset not fetched correctly:", consumer.(*partitionConsumer).offset)
|
|
t.Error("Latest offset not fetched correctly:", consumer.(*partitionConsumer).offset)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|