Browse Source

Consumer: handle compressed relative offsets

New message format does something weird with these. See
https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets

Fixes #720. Supercedes #721. Thanks to @dynamix for the first draft of the fix.
Evan Huus 9 years ago
parent
commit
16da292b17
1 changed files with 9 additions and 4 deletions
  1. 9 4
      consumer.go

+ 9 - 4
consumer.go

@@ -488,21 +488,26 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 	for _, msgBlock := range block.MsgSet.Messages {
 
 		for _, msg := range msgBlock.Messages() {
-			if prelude && msg.Offset < child.offset {
+			offset := msg.Offset
+			if msg.Msg.Version >= 1 {
+				baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
+				offset += baseOffset
+			}
+			if prelude && offset < child.offset {
 				continue
 			}
 			prelude = false
 
-			if msg.Offset >= child.offset {
+			if offset >= child.offset {
 				messages = append(messages, &ConsumerMessage{
 					Topic:     child.topic,
 					Partition: child.partition,
 					Key:       msg.Msg.Key,
 					Value:     msg.Msg.Value,
-					Offset:    msg.Offset,
+					Offset:    offset,
 					Timestamp: msg.Msg.Timestamp,
 				})
-				child.offset = msg.Offset + 1
+				child.offset = offset + 1
 			} else {
 				incomplete = true
 			}