|
|
@@ -488,21 +488,26 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
|
|
|
for _, msgBlock := range block.MsgSet.Messages {
|
|
|
|
|
|
for _, msg := range msgBlock.Messages() {
|
|
|
- if prelude && msg.Offset < child.offset {
|
|
|
+ offset := msg.Offset
|
|
|
+ if msg.Msg.Version >= 1 {
|
|
|
+ baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
|
|
|
+ offset += baseOffset
|
|
|
+ }
|
|
|
+ if prelude && offset < child.offset {
|
|
|
continue
|
|
|
}
|
|
|
prelude = false
|
|
|
|
|
|
- if msg.Offset >= child.offset {
|
|
|
+ if offset >= child.offset {
|
|
|
messages = append(messages, &ConsumerMessage{
|
|
|
Topic: child.topic,
|
|
|
Partition: child.partition,
|
|
|
Key: msg.Msg.Key,
|
|
|
Value: msg.Msg.Value,
|
|
|
- Offset: msg.Offset,
|
|
|
+ Offset: offset,
|
|
|
Timestamp: msg.Msg.Timestamp,
|
|
|
})
|
|
|
- child.offset = msg.Offset + 1
|
|
|
+ child.offset = offset + 1
|
|
|
} else {
|
|
|
incomplete = true
|
|
|
}
|