Browse Source

Correctly encode RecordBatch.LastOffsetDelta

Existing code always encodes zero. I'm not sure what the implications are,
since consumers are working just fine, but `kafka.tools.DumpLogSegments`
reports invalid data where all record batches have the same one record size.
Ivan Babrou 8 years ago
parent
commit
939043e02e
3 changed files with 19 additions and 18 deletions
  1. 6 5
      produce_request_test.go
  2. 1 1
      record_batch.go
  3. 12 12
      record_test.go

+ 6 - 5
produce_request_test.go

@@ -48,9 +48,9 @@ var (
 		0x00, 0x00, 0x00, 0x46,
 		0x00, 0x00, 0x00, 0x00,
 		0x02,
-		0x54, 0x79, 0x61, 0xFD,
+		0xCA, 0x33, 0xBC, 0x05,
 		0x00, 0x00,
-		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x00, 0x01, 0x58, 0x8D, 0xCD, 0x59, 0x38,
 		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
 		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
@@ -83,9 +83,10 @@ func TestProduceRequest(t *testing.T) {
 
 	request.Version = 3
 	batch := &RecordBatch{
-		Version:        2,
-		FirstTimestamp: time.Unix(1479847795, 0),
-		MaxTimestamp:   time.Unix(0, 0),
+		LastOffsetDelta: 1,
+		Version:         2,
+		FirstTimestamp:  time.Unix(1479847795, 0),
+		MaxTimestamp:    time.Unix(0, 0),
 		Records: []*Record{{
 			TimestampDelta: 5 * time.Millisecond,
 			Key:            []byte{0x01, 0x02, 0x03, 0x04},

+ 1 - 1
record_batch.go

@@ -64,7 +64,7 @@ func (b *RecordBatch) encode(pe packetEncoder) error {
 	pe.putInt8(b.Version)
 	pe.push(newCRC32Field(crcCastagnoli))
 	pe.putInt16(b.computeAttributes())
-	pe.putInt32(b.LastOffsetDelta)
+	pe.putInt32(int32(len(b.Records)))
 
 	if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
 		return err

+ 12 - 12
record_test.go

@@ -87,10 +87,10 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 70, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
-			2,                // Version
-			84, 121, 97, 253, // CRC
+			2,               // Version
+			202, 51, 188, 5, // CRC
 			0, 0, // Attributes
-			0, 0, 0, 0, // Last Offset Delta
+			0, 0, 0, 1, // Last Offset Delta
 			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
@@ -134,10 +134,10 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 94, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
-			2,                  // Version
-			159, 236, 182, 189, // CRC
+			2,                 // Version
+			151, 214, 216, 81, // CRC
 			0, 1, // Attributes
-			0, 0, 0, 0, // Last Offset Delta
+			0, 0, 0, 1, // Last Offset Delta
 			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
@@ -187,10 +187,10 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 72, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
-			2,              // Version
-			21, 0, 159, 97, // CRC
+			2,                 // Version
+			160, 117, 65, 149, // CRC
 			0, 2, // Attributes
-			0, 0, 0, 0, // Last Offset Delta
+			0, 0, 0, 1, // Last Offset Delta
 			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
@@ -222,10 +222,10 @@ var recordBatchTestCases = []struct {
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
 			0, 0, 0, 89, // Length
 			0, 0, 0, 0, // Partition Leader Epoch
-			2,                 // Version
-			169, 74, 119, 197, // CRC
+			2,                // Version
+			223, 53, 65, 233, // CRC
 			0, 3, // Attributes
-			0, 0, 0, 0, // Last Offset Delta
+			0, 0, 0, 1, // Last Offset Delta
 			0, 0, 1, 88, 141, 205, 89, 56, // First Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
 			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID