浏览代码

Merge pull request #1041 from pkedy/fix_last_offset_delta

LastOffsetDelta calculation fix
Evan Huus 7 年之前
父节点
当前提交
ff5412241f
共有 3 个文件被更改,包括 39 次插入31 次删除
  1. 7 3
      produce_set.go
  2. 1 1
      record_batch.go
  3. 31 27
      record_test.go

+ 7 - 3
produce_set.go

@@ -122,11 +122,15 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 	for topic, partitionSet := range ps.msgs {
 	for topic, partitionSet := range ps.msgs {
 		for partition, set := range partitionSet {
 		for partition, set := range partitionSet {
 			if req.Version >= 3 {
 			if req.Version >= 3 {
-				for i, record := range set.recordsToSend.recordBatch.Records {
-					record.OffsetDelta = int64(i)
+				rb := set.recordsToSend.recordBatch
+				if len(rb.Records) > 0 {
+					rb.LastOffsetDelta = int32(len(rb.Records) - 1)
+					for i, record := range rb.Records {
+						record.OffsetDelta = int64(i)
+					}
 				}
 				}
 
 
-				req.AddBatch(topic, partition, set.recordsToSend.recordBatch)
+				req.AddBatch(topic, partition, rb)
 				continue
 				continue
 			}
 			}
 			if ps.parent.conf.Producer.Compression == CompressionNone {
 			if ps.parent.conf.Producer.Compression == CompressionNone {

+ 1 - 1
record_batch.go

@@ -64,7 +64,7 @@ func (b *RecordBatch) encode(pe packetEncoder) error {
 	pe.putInt8(b.Version)
 	pe.putInt8(b.Version)
 	pe.push(newCRC32Field(crcCastagnoli))
 	pe.push(newCRC32Field(crcCastagnoli))
 	pe.putInt16(b.computeAttributes())
 	pe.putInt16(b.computeAttributes())
-	pe.putInt32(int32(len(b.Records)))
+	pe.putInt32(b.LastOffsetDelta)
 
 
 	if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
 	if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
 		return err
 		return err

+ 31 - 27
record_test.go

@@ -69,9 +69,10 @@ var recordBatchTestCases = []struct {
 	{
 	{
 		name: "uncompressed record",
 		name: "uncompressed record",
 		batch: RecordBatch{
 		batch: RecordBatch{
-			Version:        2,
-			FirstTimestamp: time.Unix(1479847795, 0),
-			MaxTimestamp:   time.Unix(0, 0),
+			Version:         2,
+			FirstTimestamp:  time.Unix(1479847795, 0),
+			MaxTimestamp:    time.Unix(0, 0),
+			LastOffsetDelta: 0,
 			Records: []*Record{{
 			Records: []*Record{{
 				TimestampDelta: 5 * time.Millisecond,
 				TimestampDelta: 5 * time.Millisecond,
 				Key:            []byte{1, 2, 3, 4},
 				Key:            []byte{1, 2, 3, 4},
@@ -87,10 +88,10 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 70, // Length
 			0, 0, 0, 70, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
 			0, 0, 0, 0, // Partition Leader Epoch
-			2,               // Version
-			202, 51, 188, 5, // CRC
+			2,                // Version
+			84, 121, 97, 253, // CRC
 			0, 0, // Attributes
 			0, 0, // Attributes
-			0, 0, 0, 1, // Last Offset Delta
+			0, 0, 0, 0, // Last Offset Delta
 			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
@@ -115,10 +116,11 @@ var recordBatchTestCases = []struct {
 	{
 	{
 		name: "gzipped record",
 		name: "gzipped record",
 		batch: RecordBatch{
 		batch: RecordBatch{
-			Version:        2,
-			Codec:          CompressionGZIP,
-			FirstTimestamp: time.Unix(1479847795, 0),
-			MaxTimestamp:   time.Unix(0, 0),
+			Version:         2,
+			Codec:           CompressionGZIP,
+			FirstTimestamp:  time.Unix(1479847795, 0),
+			MaxTimestamp:    time.Unix(0, 0),
+			LastOffsetDelta: 0,
 			Records: []*Record{{
 			Records: []*Record{{
 				TimestampDelta: 5 * time.Millisecond,
 				TimestampDelta: 5 * time.Millisecond,
 				Key:            []byte{1, 2, 3, 4},
 				Key:            []byte{1, 2, 3, 4},
@@ -134,10 +136,10 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 94, // Length
 			0, 0, 0, 94, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
 			0, 0, 0, 0, // Partition Leader Epoch
-			2,                 // Version
-			151, 214, 216, 81, // CRC
+			2,                  // Version
+			159, 236, 182, 189, // CRC
 			0, 1, // Attributes
 			0, 1, // Attributes
-			0, 0, 0, 1, // Last Offset Delta
+			0, 0, 0, 0, // Last Offset Delta
 			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
@@ -168,10 +170,11 @@ var recordBatchTestCases = []struct {
 	{
 	{
 		name: "snappy compressed record",
 		name: "snappy compressed record",
 		batch: RecordBatch{
 		batch: RecordBatch{
-			Version:        2,
-			Codec:          CompressionSnappy,
-			FirstTimestamp: time.Unix(1479847795, 0),
-			MaxTimestamp:   time.Unix(0, 0),
+			Version:         2,
+			Codec:           CompressionSnappy,
+			FirstTimestamp:  time.Unix(1479847795, 0),
+			MaxTimestamp:    time.Unix(0, 0),
+			LastOffsetDelta: 0,
 			Records: []*Record{{
 			Records: []*Record{{
 				TimestampDelta: 5 * time.Millisecond,
 				TimestampDelta: 5 * time.Millisecond,
 				Key:            []byte{1, 2, 3, 4},
 				Key:            []byte{1, 2, 3, 4},
@@ -187,10 +190,10 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 72, // Length
 			0, 0, 0, 72, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
 			0, 0, 0, 0, // Partition Leader Epoch
-			2,                 // Version
-			160, 117, 65, 149, // CRC
+			2,              // Version
+			21, 0, 159, 97, // CRC
 			0, 2, // Attributes
 			0, 2, // Attributes
-			0, 0, 0, 1, // Last Offset Delta
+			0, 0, 0, 0, // Last Offset Delta
 			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
@@ -203,10 +206,11 @@ var recordBatchTestCases = []struct {
 	{
 	{
 		name: "lz4 compressed record",
 		name: "lz4 compressed record",
 		batch: RecordBatch{
 		batch: RecordBatch{
-			Version:        2,
-			Codec:          CompressionLZ4,
-			FirstTimestamp: time.Unix(1479847795, 0),
-			MaxTimestamp:   time.Unix(0, 0),
+			Version:         2,
+			Codec:           CompressionLZ4,
+			FirstTimestamp:  time.Unix(1479847795, 0),
+			MaxTimestamp:    time.Unix(0, 0),
+			LastOffsetDelta: 0,
 			Records: []*Record{{
 			Records: []*Record{{
 				TimestampDelta: 5 * time.Millisecond,
 				TimestampDelta: 5 * time.Millisecond,
 				Key:            []byte{1, 2, 3, 4},
 				Key:            []byte{1, 2, 3, 4},
@@ -222,10 +226,10 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 89, // Length
 			0, 0, 0, 89, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
 			0, 0, 0, 0, // Partition Leader Epoch
-			2,                // Version
-			223, 53, 65, 233, // CRC
+			2,                 // Version
+			169, 74, 119, 197, // CRC
 			0, 3, // Attributes
 			0, 3, // Attributes
-			0, 0, 0, 1, // Last Offset Delta
+			0, 0, 0, 0, // Last Offset Delta
 			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID