Browse Source

Unblock consumer when receiving invalid FetchResponse

Adam Dratwinski 6 years ago
parent
commit
d3f8616135
2 changed files with 52 additions and 1 deletions
  1. 1 1
      consumer.go
  2. 51 0
      consumer_test.go

+ 1 - 1
consumer.go

@@ -528,7 +528,7 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
 		child.offset = offset + 1
 	}
 	if len(messages) == 0 {
-		return nil, ErrIncompleteResponse
+		child.offset += 1
 	}
 	return messages, nil
 }

+ 51 - 0
consumer_test.go

@@ -448,6 +448,57 @@ func TestConsumerExtraOffsets(t *testing.T) {
 	}
 }
 
+// In some situations broker may return a block containing only
+// messages older then requested, even though there would be
+// more messages if higher offset was requested.
+func TestConsumerReceivingFetchResponseWithTooOldRecords(t *testing.T) {
+	// Given
+	fetchResponse1 := &FetchResponse{Version: 4}
+	fetchResponse1.AddRecord("my_topic", 0, nil, testMsg, 1)
+
+	fetchResponse2 := &FetchResponse{Version: 4}
+	fetchResponse2.AddRecord("my_topic", 0, nil, testMsg, 1000000)
+
+	cfg := NewConfig()
+	cfg.Consumer.Return.Errors = true
+	cfg.Version = V1_1_0_0
+
+	broker0 := NewMockBroker(t, 0)
+
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetLeader("my_topic", 0, broker0.BrokerID()),
+		"OffsetRequest": NewMockOffsetResponse(t).
+			SetVersion(1).
+			SetOffset("my_topic", 0, OffsetNewest, 1234).
+			SetOffset("my_topic", 0, OffsetOldest, 0),
+		"FetchRequest": NewMockSequence(fetchResponse1, fetchResponse2),
+	})
+
+	master, err := NewConsumer([]string{broker0.Addr()}, cfg)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// When
+	consumer, err := master.ConsumePartition("my_topic", 0, 2)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	select {
+	case msg := <-consumer.Messages():
+		assertMessageOffset(t, msg, 1000000)
+	case err := <-consumer.Errors():
+		t.Fatal(err)
+	}
+
+	safeClose(t, consumer)
+	safeClose(t, master)
+	broker0.Close()
+}
+
 func TestConsumeMessageWithNewerFetchAPIVersion(t *testing.T) {
 	// Given
 	fetchResponse1 := &FetchResponse{Version: 4}