Browse Source

Merge pull request #895 from dvsekhvalnov/master

#885: added BlockTimestamp to ConsumerMessage
Evan Huus 8 years ago
parent
commit
f7e3024a61
1 changed files with 13 additions and 11 deletions
  1. 13 11
      consumer.go

+ 13 - 11
consumer.go

@@ -10,11 +10,12 @@ import (
 
 
 // ConsumerMessage encapsulates a Kafka message returned by the consumer.
 // ConsumerMessage encapsulates a Kafka message returned by the consumer.
 type ConsumerMessage struct {
 type ConsumerMessage struct {
-	Key, Value []byte
-	Topic      string
-	Partition  int32
-	Offset     int64
-	Timestamp  time.Time // only set if kafka is version 0.10+
+	Key, Value     []byte
+	Topic          string
+	Partition      int32
+	Offset         int64
+	Timestamp      time.Time // only set if kafka is version 0.10+, inner message timestamp
+	BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
 }
 }
 
 
 // ConsumerError is what is provided to the user when an error occurs.
 // ConsumerError is what is provided to the user when an error occurs.
@@ -520,12 +521,13 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 
 
 			if offset >= child.offset {
 			if offset >= child.offset {
 				messages = append(messages, &ConsumerMessage{
 				messages = append(messages, &ConsumerMessage{
-					Topic:     child.topic,
-					Partition: child.partition,
-					Key:       msg.Msg.Key,
-					Value:     msg.Msg.Value,
-					Offset:    offset,
-					Timestamp: msg.Msg.Timestamp,
+					Topic:          child.topic,
+					Partition:      child.partition,
+					Key:            msg.Msg.Key,
+					Value:          msg.Msg.Value,
+					Offset:         offset,
+					Timestamp:      msg.Msg.Timestamp,
+					BlockTimestamp: msgBlock.Msg.Timestamp,
 				})
 				})
 				child.offset = offset + 1
 				child.offset = offset + 1
 			} else {
 			} else {