Browse Source

Merge pull request #1227 from gburanov/fix_consuming_compressed_topic

Fix consuming compacted topic
Vlad Gorodetsky 6 years ago
parent
commit
a3e71cf226
1 changed files with 2 additions and 2 deletions
  1. 2 2
      consumer.go

+ 2 - 2
consumer.go

@@ -511,7 +511,7 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
 		}
 	}
 	if len(messages) == 0 {
-		return nil, ErrIncompleteResponse
+		child.offset++
 	}
 	return messages, nil
 }
@@ -539,7 +539,7 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
 		child.offset = offset + 1
 	}
 	if len(messages) == 0 {
-		child.offset += 1
+		child.offset++
 	}
 	return messages, nil
 }