Browse Source

Support LogAppend timestamps

Sergey Zimin 7 years ago
parent
commit
f0e5fa7ff7
3 changed files with 16 additions and 2 deletions
  1. 10 2
      consumer.go
  2. 4 0
      message.go
  3. 2 0
      record_batch.go

+ 10 - 2
consumer.go

@@ -487,9 +487,13 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
 	for _, msgBlock := range msgSet.Messages {
 	for _, msgBlock := range msgSet.Messages {
 		for _, msg := range msgBlock.Messages() {
 		for _, msg := range msgBlock.Messages() {
 			offset := msg.Offset
 			offset := msg.Offset
+			timestamp := msg.Msg.Timestamp
 			if msg.Msg.Version >= 1 {
 			if msg.Msg.Version >= 1 {
 				baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
 				baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
 				offset += baseOffset
 				offset += baseOffset
+				if msg.Msg.LogAppendTime {
+					timestamp = msgBlock.Msg.Timestamp
+				}
 			}
 			}
 			if offset < child.offset {
 			if offset < child.offset {
 				continue
 				continue
@@ -500,7 +504,7 @@ func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMe
 				Key:            msg.Msg.Key,
 				Key:            msg.Msg.Key,
 				Value:          msg.Msg.Value,
 				Value:          msg.Msg.Value,
 				Offset:         offset,
 				Offset:         offset,
-				Timestamp:      msg.Msg.Timestamp,
+				Timestamp:      timestamp,
 				BlockTimestamp: msgBlock.Msg.Timestamp,
 				BlockTimestamp: msgBlock.Msg.Timestamp,
 			})
 			})
 			child.offset = offset + 1
 			child.offset = offset + 1
@@ -519,13 +523,17 @@ func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMes
 		if offset < child.offset {
 		if offset < child.offset {
 			continue
 			continue
 		}
 		}
+		timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta)
+		if batch.LogAppendTime {
+			timestamp = batch.MaxTimestamp
+		}
 		messages = append(messages, &ConsumerMessage{
 		messages = append(messages, &ConsumerMessage{
 			Topic:     child.topic,
 			Topic:     child.topic,
 			Partition: child.partition,
 			Partition: child.partition,
 			Key:       rec.Key,
 			Key:       rec.Key,
 			Value:     rec.Value,
 			Value:     rec.Value,
 			Offset:    offset,
 			Offset:    offset,
-			Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
+			Timestamp: timestamp,
 			Headers:   rec.Headers,
 			Headers:   rec.Headers,
 		})
 		})
 		child.offset = offset + 1
 		child.offset = offset + 1

+ 4 - 0
message.go

@@ -19,6 +19,8 @@ const (
 	CompressionZSTD   CompressionCodec = 4
 	CompressionZSTD   CompressionCodec = 4
 )
 )
 
 
+const timestampTypeMask = 0x08
+
 func (cc CompressionCodec) String() string {
 func (cc CompressionCodec) String() string {
 	return []string{
 	return []string{
 		"none",
 		"none",
@@ -36,6 +38,7 @@ const CompressionLevelDefault = -1000
 type Message struct {
 type Message struct {
 	Codec            CompressionCodec // codec used to compress the message contents
 	Codec            CompressionCodec // codec used to compress the message contents
 	CompressionLevel int              // compression level
 	CompressionLevel int              // compression level
+	LogAppendTime    bool             // the used timestamp is LogAppendTime
 	Key              []byte           // the message key, may be nil
 	Key              []byte           // the message key, may be nil
 	Value            []byte           // the message contents
 	Value            []byte           // the message contents
 	Set              *MessageSet      // the message set a message might wrap
 	Set              *MessageSet      // the message set a message might wrap
@@ -108,6 +111,7 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 		return err
 		return err
 	}
 	}
 	m.Codec = CompressionCodec(attribute & compressionCodecMask)
 	m.Codec = CompressionCodec(attribute & compressionCodecMask)
+	m.LogAppendTime = attribute&timestampTypeMask == timestampTypeMask
 
 
 	if m.Version == 1 {
 	if m.Version == 1 {
 		if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil {
 		if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil {

+ 2 - 0
record_batch.go

@@ -36,6 +36,7 @@ type RecordBatch struct {
 	Codec                 CompressionCodec
 	Codec                 CompressionCodec
 	CompressionLevel      int
 	CompressionLevel      int
 	Control               bool
 	Control               bool
+	LogAppendTime         bool
 	LastOffsetDelta       int32
 	LastOffsetDelta       int32
 	FirstTimestamp        time.Time
 	FirstTimestamp        time.Time
 	MaxTimestamp          time.Time
 	MaxTimestamp          time.Time
@@ -120,6 +121,7 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 	}
 	}
 	b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
 	b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
 	b.Control = attributes&controlMask == controlMask
 	b.Control = attributes&controlMask == controlMask
+	b.LogAppendTime = attributes&timestampTypeMask == timestampTypeMask
 
 
 	if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
 	if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
 		return err
 		return err