浏览代码

Produce records with consistent timestamps (#1455)

It is possible for the same record to have a different timestamp
depending on where it appears in a produceSet, as the test case in
this commit illustrates.

The problem is that the produceSet's FirstTimestamp and the
record's TimestampDelta are each calculated with nanosecond
precision, and then truncated to millisecond precision during
encoding. This leads to accumulated rounding error when the
original timestamp is later reconstructed.

Instead, truncate all timestamps to millisecond precision before
calculating the FirstTimestamp and TimestampDelta, so that if the
same record is produced multiple times, it will always have the
same timestamp.
Steven McDonald 6 年之前
父节点
当前提交
d514254537
共有 2 个文件被更改,包括 61 次插入3 次删除
  1. 2 1
      produce_set.go
  2. 59 2
      produce_set_test.go

+ 2 - 1
produce_set.go

@@ -44,9 +44,10 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
 	}
 	}
 
 
 	timestamp := msg.Timestamp
 	timestamp := msg.Timestamp
-	if msg.Timestamp.IsZero() {
+	if timestamp.IsZero() {
 		timestamp = time.Now()
 		timestamp = time.Now()
 	}
 	}
+	timestamp = timestamp.Truncate(time.Millisecond)
 
 
 	partitions := ps.msgs[msg.Topic]
 	partitions := ps.msgs[msg.Topic]
 	if partitions == nil {
 	if partitions == nil {

+ 59 - 2
produce_set_test.go

@@ -255,7 +255,7 @@ func TestProduceSetV3RequestBuilding(t *testing.T) {
 	}
 	}
 
 
 	batch := req.records["t1"][0].RecordBatch
 	batch := req.records["t1"][0].RecordBatch
-	if batch.FirstTimestamp != now {
+	if batch.FirstTimestamp != now.Truncate(time.Millisecond) {
 		t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
 		t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
 	}
 	}
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
@@ -334,7 +334,7 @@ func TestProduceSetIdempotentRequestBuilding(t *testing.T) {
 	}
 	}
 
 
 	batch := req.records["t1"][0].RecordBatch
 	batch := req.records["t1"][0].RecordBatch
-	if batch.FirstTimestamp != now {
+	if batch.FirstTimestamp != now.Truncate(time.Millisecond) {
 		t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
 		t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
 	}
 	}
 	if batch.ProducerID != pID {
 	if batch.ProducerID != pID {
@@ -368,3 +368,60 @@ func TestProduceSetIdempotentRequestBuilding(t *testing.T) {
 		}
 		}
 	}
 	}
 }
 }
+
+func TestProduceSetConsistentTimestamps(t *testing.T) {
+	parent, ps1 := makeProduceSet()
+	ps2 := newProduceSet(parent)
+	parent.conf.Producer.RequiredAcks = WaitForAll
+	parent.conf.Producer.Timeout = 10 * time.Second
+	parent.conf.Version = V0_11_0_0
+
+	msg1 := &ProducerMessage{
+		Topic:          "t1",
+		Partition:      0,
+		Key:            StringEncoder(TestMessage),
+		Value:          StringEncoder(TestMessage),
+		Timestamp:      time.Unix(1555718400, 500000000),
+		sequenceNumber: 123,
+	}
+	msg2 := &ProducerMessage{
+		Topic:          "t1",
+		Partition:      0,
+		Key:            StringEncoder(TestMessage),
+		Value:          StringEncoder(TestMessage),
+		Timestamp:      time.Unix(1555718400, 500900000),
+		sequenceNumber: 123,
+	}
+	msg3 := &ProducerMessage{
+		Topic:          "t1",
+		Partition:      0,
+		Key:            StringEncoder(TestMessage),
+		Value:          StringEncoder(TestMessage),
+		Timestamp:      time.Unix(1555718400, 600000000),
+		sequenceNumber: 123,
+	}
+
+	safeAddMessage(t, ps1, msg1)
+	safeAddMessage(t, ps1, msg3)
+	req1 := ps1.buildRequest()
+	if req1.Version != 3 {
+		t.Error("Wrong request version")
+	}
+	batch1 := req1.records["t1"][0].RecordBatch
+	ft1 := batch1.FirstTimestamp.Unix()*1000 + int64(batch1.FirstTimestamp.Nanosecond()/1000000)
+	time1 := ft1 + int64(batch1.Records[1].TimestampDelta/time.Millisecond)
+
+	safeAddMessage(t, ps2, msg2)
+	safeAddMessage(t, ps2, msg3)
+	req2 := ps2.buildRequest()
+	if req2.Version != 3 {
+		t.Error("Wrong request version")
+	}
+	batch2 := req2.records["t1"][0].RecordBatch
+	ft2 := batch2.FirstTimestamp.Unix()*1000 + int64(batch2.FirstTimestamp.Nanosecond()/1000000)
+	time2 := ft2 + int64(batch2.Records[1].TimestampDelta/time.Millisecond)
+
+	if time1 != time2 {
+		t.Errorf("Message timestamps do not match: %v, %v", time1, time2)
+	}
+}