Browse Source

Make timestamp fields of type time.Time and time.Duration

Vlad Hanciuta 6 năm trước cách đây
mục cha
commit
ed80c61d79
6 tập tin đã thay đổi với 106 bổ sung60 xóa
  1. 1 7
      consumer.go
  2. 3 19
      message.go
  3. 7 3
      record.go
  4. 14 6
      record_batch.go
  5. 41 25
      record_test.go
  6. 40 0
      timestamp.go

+ 1 - 7
consumer.go

@@ -532,12 +532,6 @@ func (child *partitionConsumer) parseRecords(block *FetchResponseBlock) ([]*Cons
 		}
 		prelude = false
 
-		millis := batch.FirstTimestamp + rec.TimestampDelta
-		timestamp := time.Time{}
-		if millis >= 0 {
-			timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
-		}
-
 		if offset >= child.offset {
 			messages = append(messages, &ConsumerMessage{
 				Topic:     child.topic,
@@ -545,7 +539,7 @@ func (child *partitionConsumer) parseRecords(block *FetchResponseBlock) ([]*Cons
 				Key:       rec.Key,
 				Value:     rec.Value,
 				Offset:    offset,
-				Timestamp: timestamp,
+				Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
 				Headers:   rec.Headers,
 			})
 			child.offset = offset + 1

+ 3 - 19
message.go

@@ -45,15 +45,9 @@ func (m *Message) encode(pe packetEncoder) error {
 	pe.putInt8(attributes)
 
 	if m.Version >= 1 {
-		timestamp := int64(-1)
-
-		if !m.Timestamp.Before(time.Unix(0, 0)) {
-			timestamp = m.Timestamp.UnixNano() / int64(time.Millisecond)
-		} else if !m.Timestamp.IsZero() {
-			return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", m.Timestamp)}
+		if err := (Timestamp{&m.Timestamp}).encode(pe); err != nil {
+			return err
 		}
-
-		pe.putInt64(timestamp)
 	}
 
 	err := pe.putBytes(m.Key)
@@ -133,19 +127,9 @@ func (m *Message) decode(pd packetDecoder) (err error) {
 	m.Codec = CompressionCodec(attribute & compressionCodecMask)
 
 	if m.Version == 1 {
-		millis, err := pd.getInt64()
-		if err != nil {
+		if err := (Timestamp{&m.Timestamp}).decode(pd); err != nil {
 			return err
 		}
-
-		// negative timestamps are invalid, in these cases we should return
-		// a zero time
-		timestamp := time.Time{}
-		if millis >= 0 {
-			timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
-		}
-
-		m.Timestamp = timestamp
 	}
 
 	m.Key, err = pd.getBytes()

+ 7 - 3
record.go

@@ -1,5 +1,7 @@
 package sarama
 
+import "time"
+
 const (
 	controlMask = 0x20
 )
@@ -29,7 +31,7 @@ func (h *RecordHeader) decode(pd packetDecoder) (err error) {
 
 type Record struct {
 	Attributes     int8
-	TimestampDelta int64
+	TimestampDelta time.Duration
 	OffsetDelta    int64
 	Key            []byte
 	Value          []byte
@@ -41,7 +43,7 @@ type Record struct {
 func (r *Record) encode(pe packetEncoder) error {
 	pe.push(&r.length)
 	pe.putInt8(r.Attributes)
-	pe.putVarint(r.TimestampDelta)
+	pe.putVarint(int64(r.TimestampDelta / time.Millisecond))
 	pe.putVarint(r.OffsetDelta)
 	if err := pe.putVarintBytes(r.Key); err != nil {
 		return err
@@ -69,9 +71,11 @@ func (r *Record) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	if r.TimestampDelta, err = pd.getVarint(); err != nil {
+	timestamp, err := pd.getVarint()
+	if err != nil {
 		return err
 	}
+	r.TimestampDelta = time.Duration(timestamp) * time.Millisecond
 
 	if r.OffsetDelta, err = pd.getVarint(); err != nil {
 		return err

+ 14 - 6
record_batch.go

@@ -5,6 +5,7 @@ import (
 	"compress/gzip"
 	"fmt"
 	"io/ioutil"
+	"time"
 
 	"github.com/eapache/go-xerial-snappy"
 	"github.com/pierrec/lz4"
@@ -41,8 +42,8 @@ type RecordBatch struct {
 	Codec                 CompressionCodec
 	Control               bool
 	LastOffsetDelta       int32
-	FirstTimestamp        int64
-	MaxTimestamp          int64
+	FirstTimestamp        time.Time
+	MaxTimestamp          time.Time
 	ProducerID            int64
 	ProducerEpoch         int16
 	FirstSequence         int32
@@ -64,8 +65,15 @@ func (b *RecordBatch) encode(pe packetEncoder) error {
 	pe.push(newCRC32Field(crcCastagnoli))
 	pe.putInt16(b.computeAttributes())
 	pe.putInt32(b.LastOffsetDelta)
-	pe.putInt64(b.FirstTimestamp)
-	pe.putInt64(b.MaxTimestamp)
+
+	if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
+		return err
+	}
+
+	if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
+		return err
+	}
+
 	pe.putInt64(b.ProducerID)
 	pe.putInt16(b.ProducerEpoch)
 	pe.putInt32(b.FirstSequence)
@@ -122,11 +130,11 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	if b.FirstTimestamp, err = pd.getInt64(); err != nil {
+	if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
 		return err
 	}
 
-	if b.MaxTimestamp, err = pd.getInt64(); err != nil {
+	if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
 		return err
 	}
 

+ 41 - 25
record_test.go

@@ -6,6 +6,7 @@ import (
 	"strconv"
 	"strings"
 	"testing"
+	"time"
 
 	"github.com/davecgh/go-spew/spew"
 )
@@ -17,8 +18,13 @@ var recordBatchTestCases = []struct {
 	oldGoEncoded []byte // used in case of gzipped content for go versions prior to 1.8
 }{
 	{
-		name:  "empty record",
-		batch: RecordBatch{Version: 2, Records: []*Record{}},
+		name: "empty record",
+		batch: RecordBatch{
+			Version:        2,
+			FirstTimestamp: time.Unix(0, 0),
+			MaxTimestamp:   time.Unix(0, 0),
+			Records:        []*Record{},
+		},
 		encoded: []byte{
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 49, // Length
@@ -36,8 +42,14 @@ var recordBatchTestCases = []struct {
 		},
 	},
 	{
-		name:  "control batch",
-		batch: RecordBatch{Version: 2, Control: true, Records: []*Record{}},
+		name: "control batch",
+		batch: RecordBatch{
+			Version:        2,
+			Control:        true,
+			FirstTimestamp: time.Unix(0, 0),
+			MaxTimestamp:   time.Unix(0, 0),
+			Records:        []*Record{},
+		},
 		encoded: []byte{
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 49, // Length
@@ -58,9 +70,10 @@ var recordBatchTestCases = []struct {
 		name: "uncompressed record",
 		batch: RecordBatch{
 			Version:        2,
-			FirstTimestamp: 10,
+			FirstTimestamp: time.Unix(1479847795, 0),
+			MaxTimestamp:   time.Unix(0, 0),
 			Records: []*Record{{
-				TimestampDelta: 5,
+				TimestampDelta: 5 * time.Millisecond,
 				Key:            []byte{1, 2, 3, 4},
 				Value:          []byte{5, 6, 7},
 				Headers: []*RecordHeader{{
@@ -74,10 +87,10 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 70, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
 			2,                // Version
-			219, 71, 20, 201, // CRC
+			84, 121, 97, 253, // CRC
 			0, 0, // Attributes
 			0, 0, 0, 0, // Last Offset Delta
-			0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
+			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, // Producer Epoch
@@ -103,9 +116,10 @@ var recordBatchTestCases = []struct {
 		batch: RecordBatch{
 			Version:        2,
 			Codec:          CompressionGZIP,
-			FirstTimestamp: 10,
+			FirstTimestamp: time.Unix(1479847795, 0),
+			MaxTimestamp:   time.Unix(0, 0),
 			Records: []*Record{{
-				TimestampDelta: 5,
+				TimestampDelta: 5 * time.Millisecond,
 				Key:            []byte{1, 2, 3, 4},
 				Value:          []byte{5, 6, 7},
 				Headers: []*RecordHeader{{
@@ -118,11 +132,11 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 94, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
-			2,                // Version
-			15, 156, 184, 78, // CRC
+			2,                  // Version
+			159, 236, 182, 189, // CRC
 			0, 1, // Attributes
 			0, 0, 0, 0, // Last Offset Delta
-			0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
+			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, // Producer Epoch
@@ -136,10 +150,10 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 94, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
 			2,               // Version
-			144, 168, 0, 33, // CRC
+			0, 216, 14, 210, // CRC
 			0, 1, // Attributes
 			0, 0, 0, 0, // Last Offset Delta
-			0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
+			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, // Producer Epoch
@@ -154,9 +168,10 @@ var recordBatchTestCases = []struct {
 		batch: RecordBatch{
 			Version:        2,
 			Codec:          CompressionSnappy,
-			FirstTimestamp: 10,
+			FirstTimestamp: time.Unix(1479847795, 0),
+			MaxTimestamp:   time.Unix(0, 0),
 			Records: []*Record{{
-				TimestampDelta: 5,
+				TimestampDelta: 5 * time.Millisecond,
 				Key:            []byte{1, 2, 3, 4},
 				Value:          []byte{5, 6, 7},
 				Headers: []*RecordHeader{{
@@ -169,11 +184,11 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 72, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
-			2,               // Version
-			95, 173, 35, 17, // CRC
+			2,              // Version
+			21, 0, 159, 97, // CRC
 			0, 2, // Attributes
 			0, 0, 0, 0, // Last Offset Delta
-			0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
+			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, // Producer Epoch
@@ -187,9 +202,10 @@ var recordBatchTestCases = []struct {
 		batch: RecordBatch{
 			Version:        2,
 			Codec:          CompressionLZ4,
-			FirstTimestamp: 10,
+			FirstTimestamp: time.Unix(1479847795, 0),
+			MaxTimestamp:   time.Unix(0, 0),
 			Records: []*Record{{
-				TimestampDelta: 5,
+				TimestampDelta: 5 * time.Millisecond,
 				Key:            []byte{1, 2, 3, 4},
 				Value:          []byte{5, 6, 7},
 				Headers: []*RecordHeader{{
@@ -202,11 +218,11 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 89, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
-			2,                // Version
-			129, 238, 43, 82, // CRC
+			2,                 // Version
+			169, 74, 119, 197, // CRC
 			0, 3, // Attributes
 			0, 0, 0, 0, // Last Offset Delta
-			0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
+			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, // Producer Epoch

+ 40 - 0
timestamp.go

@@ -0,0 +1,40 @@
+package sarama
+
+import (
+	"fmt"
+	"time"
+)
+
+type Timestamp struct {
+	*time.Time
+}
+
+func (t Timestamp) encode(pe packetEncoder) error {
+	timestamp := int64(-1)
+
+	if !t.Before(time.Unix(0, 0)) {
+		timestamp = t.UnixNano() / int64(time.Millisecond)
+	} else if !t.IsZero() {
+		return PacketEncodingError{fmt.Sprintf("invalid timestamp (%v)", t)}
+	}
+
+	pe.putInt64(timestamp)
+	return nil
+}
+
+func (t Timestamp) decode(pd packetDecoder) error {
+	millis, err := pd.getInt64()
+	if err != nil {
+		return err
+	}
+
+	// negative timestamps are invalid, in these cases we should return
+	// a zero time
+	timestamp := time.Time{}
+	if millis >= 0 {
+		timestamp = time.Unix(millis/1000, (millis%1000)*int64(time.Millisecond))
+	}
+
+	*t.Time = timestamp
+	return nil
+}