Jelajahi Sumber

Check for insufficient data when we try to get the records.

In some corner cases (like v2 records produced on the topic but the
sarama consumer uses v0 Fetch API with a limit less than one record size)
we might receive a truncated record batch. We want to be able to make
progress by increasing the fetch limit, so don't return an error, but
rater mark the batch as partial trailing record.
Vlad Hanciuta 8 tahun lalu
induk
melakukan
eee4c711a2
1 mengubah file dengan 5 tambahan dan 0 penghapusan
  1. 5 0
      record_batch.go

+ 5 - 0
record_batch.go

@@ -161,6 +161,11 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 	bufSize := int(batchLen) - recordBatchOverhead
 	recBuffer, err := pd.getRawBytes(bufSize)
 	if err != nil {
+		if err == ErrInsufficientData {
+			b.PartialTrailingRecord = true
+			b.Records = nil
+			return nil
+		}
 		return err
 	}