Browse Source

Set Record.OffsetDelta to avoid broker recompression

This mimicks #1002, but for v3 requests. On Kafka 1.0.0
this prevents recompress on broker to add missing offsets.
Ivan Babrou 7 years ago
parent
commit
6899ef50de
2 changed files with 9 additions and 1 deletions
  1. 4 0
      produce_set.go
  2. 5 1
      produce_set_test.go

+ 4 - 0
produce_set.go

@@ -122,6 +122,10 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 	for topic, partitionSet := range ps.msgs {
 		for partition, set := range partitionSet {
 			if req.Version >= 3 {
+				for i, record := range set.recordsToSend.recordBatch.Records {
+					record.OffsetDelta = int64(i)
+				}
+
 				req.AddBatch(topic, partition, set.recordsToSend.recordBatch)
 				continue
 			}

+ 5 - 1
produce_set_test.go

@@ -179,7 +179,7 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) {
 				t.Error("Wrong compressed message version")
 			}
 			if compMsgBlock.Offset != int64(i) {
-				t.Error("Wrong relative inner offset")
+				t.Errorf("Wrong relative inner offset, expected %d, got %d", i, compMsgBlock.Offset)
 			}
 		}
 		if msg.Version != 1 {
@@ -237,6 +237,10 @@ func TestProduceSetV3RequestBuilding(t *testing.T) {
 			t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta)
 		}
 
+		if rec.OffsetDelta != int64(i) {
+			t.Errorf("Wrong relative inner offset, expected %d, got %d", i, rec.OffsetDelta)
+		}
+
 		for j, h := range batch.Records[i].Headers {
 			exp := fmt.Sprintf("header-%d", j+1)
 			if string(h.Key) != exp {