It's only needed for transactions (which we don't support yet) and the logic was wrong anyway. Fixes slow consuming for certain Kafka 0.11 configurations.
@@ -546,11 +546,6 @@ func (child *partitionConsumer) parseRecords(block *FetchResponseBlock) ([]*Cons
} else {
incomplete = true
}
-
- if child.offset > block.LastStableOffset {
- // We reached the end of closed transactions
- break
- }
if incomplete || len(messages) == 0 {