|
|
@@ -389,10 +389,12 @@ func TestConsumerExtraOffsets(t *testing.T) {
|
|
|
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 2)
|
|
|
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 3)
|
|
|
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 4)
|
|
|
+ newFetchResponse.SetLastOffsetDelta("my_topic", 0, 4)
|
|
|
newFetchResponse.SetLastStableOffset("my_topic", 0, 4)
|
|
|
for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
|
|
|
var offsetResponseVersion int16
|
|
|
cfg := NewConfig()
|
|
|
+ cfg.Consumer.Return.Errors = true
|
|
|
if fetchResponse1.Version >= 4 {
|
|
|
cfg.Version = V0_11_0_0
|
|
|
offsetResponseVersion = 1
|
|
|
@@ -426,8 +428,19 @@ func TestConsumerExtraOffsets(t *testing.T) {
|
|
|
|
|
|
// Then: messages with offsets 1 and 2 are not returned even though they
|
|
|
// are present in the response.
|
|
|
- assertMessageOffset(t, <-consumer.Messages(), 3)
|
|
|
- assertMessageOffset(t, <-consumer.Messages(), 4)
|
|
|
+ select {
|
|
|
+ case msg := <-consumer.Messages():
|
|
|
+ assertMessageOffset(t, msg, 3)
|
|
|
+ case err := <-consumer.Errors():
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ select {
|
|
|
+ case msg := <-consumer.Messages():
|
|
|
+ assertMessageOffset(t, msg, 4)
|
|
|
+ case err := <-consumer.Errors():
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
|
|
|
safeClose(t, consumer)
|
|
|
safeClose(t, master)
|
|
|
@@ -490,6 +503,7 @@ func TestConsumerNonSequentialOffsets(t *testing.T) {
|
|
|
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 5)
|
|
|
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 7)
|
|
|
newFetchResponse.AddRecord("my_topic", 0, nil, testMsg, 11)
|
|
|
+ newFetchResponse.SetLastOffsetDelta("my_topic", 0, 11)
|
|
|
newFetchResponse.SetLastStableOffset("my_topic", 0, 11)
|
|
|
for _, fetchResponse1 := range []*FetchResponse{legacyFetchResponse, newFetchResponse} {
|
|
|
var offsetResponseVersion int16
|