Browse Source

Add support for Kafka 0.11 RecordBatch

Kafka 0.11 introduced RecordBatch as a successor to MessageSet.
Using the new RecordBatch is required for transactions and idempotent
message delivery.
Vlad Hanciuta 8 years ago
parent
commit
7630f80a01
2 changed files with 522 additions and 0 deletions
  1. 254 0
      record_batch.go
  2. 268 0
      record_test.go

+ 254 - 0
record_batch.go

@@ -0,0 +1,254 @@
+package sarama
+
+import (
+	"bytes"
+	"compress/gzip"
+	"fmt"
+	"io/ioutil"
+
+	"github.com/eapache/go-xerial-snappy"
+	"github.com/pierrec/lz4"
+)
+
+const recordBatchOverhead = 49
+
+type RecordBatch struct {
+	FirstOffset           int64
+	PartitionLeaderEpoch  int32
+	Version               int8
+	Codec                 CompressionCodec
+	Control               bool
+	LastOffsetDelta       int32
+	FirstTimestamp        int64
+	MaxTimestamp          int64
+	ProducerID            int64
+	ProducerEpoch         int16
+	FirstSequence         int32
+	Records               []*Record
+	PartialTrailingRecord bool
+
+	compressedRecords []byte
+	recordsLen        int
+}
+
+func (b *RecordBatch) encode(pe packetEncoder) error {
+	if b.Version != 2 {
+		return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
+	}
+	pe.putInt64(b.FirstOffset)
+	pe.push(&lengthField{})
+	pe.putInt32(b.PartitionLeaderEpoch)
+	pe.putInt8(b.Version)
+	pe.push(newCRC32Field(crcCastagnoli))
+	pe.putInt16(b.computeAttributes())
+	pe.putInt32(b.LastOffsetDelta)
+	pe.putInt64(b.FirstTimestamp)
+	pe.putInt64(b.MaxTimestamp)
+	pe.putInt64(b.ProducerID)
+	pe.putInt16(b.ProducerEpoch)
+	pe.putInt32(b.FirstSequence)
+
+	if err := pe.putArrayLength(len(b.Records)); err != nil {
+		return err
+	}
+
+	if b.compressedRecords != nil {
+		if err := pe.putRawBytes(b.compressedRecords); err != nil {
+			return err
+		}
+		if err := pe.pop(); err != nil {
+			return err
+		}
+		if err := pe.pop(); err != nil {
+			return err
+		}
+		return nil
+	}
+
+	var re packetEncoder
+	var raw []byte
+
+	switch b.Codec {
+	case CompressionNone:
+		re = pe
+	case CompressionGZIP, CompressionLZ4, CompressionSnappy:
+		for _, r := range b.Records {
+			l, err := r.getTotalLength()
+			if err != nil {
+				return err
+			}
+			b.recordsLen += l
+		}
+
+		raw = make([]byte, b.recordsLen)
+		re = &realEncoder{raw: raw}
+	default:
+		return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
+	}
+
+	for _, r := range b.Records {
+		if err := r.encode(re); err != nil {
+			return err
+		}
+	}
+
+	switch b.Codec {
+	case CompressionGZIP:
+		var buf bytes.Buffer
+		writer := gzip.NewWriter(&buf)
+		if _, err := writer.Write(raw); err != nil {
+			return err
+		}
+		if err := writer.Close(); err != nil {
+			return err
+		}
+		b.compressedRecords = buf.Bytes()
+	case CompressionSnappy:
+		b.compressedRecords = snappy.Encode(raw)
+	case CompressionLZ4:
+		var buf bytes.Buffer
+		writer := lz4.NewWriter(&buf)
+		if _, err := writer.Write(raw); err != nil {
+			return err
+		}
+		if err := writer.Close(); err != nil {
+			return err
+		}
+		b.compressedRecords = buf.Bytes()
+	}
+	if err := pe.putRawBytes(b.compressedRecords); err != nil {
+		return err
+	}
+
+	if err := pe.pop(); err != nil {
+		return err
+	}
+	if err := pe.pop(); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (b *RecordBatch) decode(pd packetDecoder) (err error) {
+	if b.FirstOffset, err = pd.getInt64(); err != nil {
+		return err
+	}
+
+	var batchLen int32
+	if batchLen, err = pd.getInt32(); err != nil {
+		return err
+	}
+
+	if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
+		return err
+	}
+
+	if b.Version, err = pd.getInt8(); err != nil {
+		return err
+	}
+
+	if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
+		return err
+	}
+
+	var attributes int16
+	if attributes, err = pd.getInt16(); err != nil {
+		return err
+	}
+	b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
+	b.Control = attributes&controlMask == controlMask
+
+	if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
+		return err
+	}
+
+	if b.FirstTimestamp, err = pd.getInt64(); err != nil {
+		return err
+	}
+
+	if b.MaxTimestamp, err = pd.getInt64(); err != nil {
+		return err
+	}
+
+	if b.ProducerID, err = pd.getInt64(); err != nil {
+		return err
+	}
+
+	if b.ProducerEpoch, err = pd.getInt16(); err != nil {
+		return err
+	}
+
+	if b.FirstSequence, err = pd.getInt32(); err != nil {
+		return err
+	}
+
+	numRecs, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+	if numRecs >= 0 {
+		b.Records = make([]*Record, numRecs)
+	}
+
+	bufSize := int(batchLen) - recordBatchOverhead
+	recBuffer, err := pd.getRawBytes(bufSize)
+	if err != nil {
+		return err
+	}
+
+	if err = pd.pop(); err != nil {
+		return err
+	}
+
+	switch b.Codec {
+	case CompressionNone:
+	case CompressionGZIP:
+		reader, err := gzip.NewReader(bytes.NewReader(recBuffer))
+		if err != nil {
+			return err
+		}
+		if recBuffer, err = ioutil.ReadAll(reader); err != nil {
+			return err
+		}
+	case CompressionSnappy:
+		if recBuffer, err = snappy.Decode(recBuffer); err != nil {
+			return err
+		}
+	case CompressionLZ4:
+		reader := lz4.NewReader(bytes.NewReader(recBuffer))
+		if recBuffer, err = ioutil.ReadAll(reader); err != nil {
+			return err
+		}
+	default:
+		return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
+	}
+	recPd := &realDecoder{raw: recBuffer}
+
+	for i := 0; i < numRecs; i++ {
+		rec := &Record{}
+		if err = rec.decode(recPd); err != nil {
+			if err == ErrInsufficientData {
+				b.PartialTrailingRecord = true
+				b.Records = nil
+				return nil
+			}
+			return err
+		}
+		b.Records[i] = rec
+	}
+
+	return nil
+}
+
+func (b *RecordBatch) computeAttributes() int16 {
+	attr := int16(b.Codec) & int16(compressionCodecMask)
+	if b.Control {
+		attr |= controlMask
+	}
+	return attr
+}
+
+func (b *RecordBatch) addRecord(r *Record) {
+	b.Records = append(b.Records, r)
+}

+ 268 - 0
record_test.go

@@ -0,0 +1,268 @@
+package sarama
+
+import (
+	"reflect"
+	"runtime"
+	"strconv"
+	"strings"
+	"testing"
+
+	"github.com/davecgh/go-spew/spew"
+)
+
+var recordBatchTestCases = []struct {
+	name         string
+	batch        RecordBatch
+	encoded      []byte
+	oldGoEncoded []byte // used in case of gzipped content for go versions prior to 1.8
+}{
+	{
+		name:  "empty record",
+		batch: RecordBatch{Version: 2, Records: []*Record{}},
+		encoded: []byte{
+			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
+			0, 0, 0, 49, // Length
+			0, 0, 0, 0, // Partition Leader Epoch
+			2,                // Version
+			89, 95, 183, 221, // CRC
+			0, 0, // Attributes
+			0, 0, 0, 0, // Last Offset Delta
+			0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp
+			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
+			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
+			0, 0, // Producer Epoch
+			0, 0, 0, 0, // First Sequence
+			0, 0, 0, 0, // Number of Records
+		},
+	},
+	{
+		name:  "control batch",
+		batch: RecordBatch{Version: 2, Control: true, Records: []*Record{}},
+		encoded: []byte{
+			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
+			0, 0, 0, 49, // Length
+			0, 0, 0, 0, // Partition Leader Epoch
+			2,               // Version
+			81, 46, 67, 217, // CRC
+			0, 32, // Attributes
+			0, 0, 0, 0, // Last Offset Delta
+			0, 0, 0, 0, 0, 0, 0, 0, // First Timestamp
+			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
+			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
+			0, 0, // Producer Epoch
+			0, 0, 0, 0, // First Sequence
+			0, 0, 0, 0, // Number of Records
+		},
+	},
+	{
+		name: "uncompressed record",
+		batch: RecordBatch{
+			Version:        2,
+			FirstTimestamp: 10,
+			Records: []*Record{{
+				TimestampDelta: 5,
+				Key:            []byte{1, 2, 3, 4},
+				Value:          []byte{5, 6, 7},
+				Headers: []*Header{{
+					Key:   []byte{8, 9, 10},
+					Value: []byte{11, 12},
+				}},
+			}},
+		},
+		encoded: []byte{
+			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
+			0, 0, 0, 70, // Length
+			0, 0, 0, 0, // Partition Leader Epoch
+			2,                // Version
+			219, 71, 20, 201, // CRC
+			0, 0, // Attributes
+			0, 0, 0, 0, // Last Offset Delta
+			0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
+			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
+			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
+			0, 0, // Producer Epoch
+			0, 0, 0, 0, // First Sequence
+			0, 0, 0, 1, // Number of Records
+			40, // Record Length
+			0,  // Attributes
+			10, // Timestamp Delta
+			0,  // Offset Delta
+			8,  // Key Length
+			1, 2, 3, 4,
+			6, // Value Length
+			5, 6, 7,
+			2,        // Number of Headers
+			6,        // Header Key Length
+			8, 9, 10, // Header Key
+			4,      // Header Value Length
+			11, 12, // Header Value
+		},
+	},
+	{
+		name: "gzipped record",
+		batch: RecordBatch{
+			Version:        2,
+			Codec:          CompressionGZIP,
+			FirstTimestamp: 10,
+			Records: []*Record{{
+				TimestampDelta: 5,
+				Key:            []byte{1, 2, 3, 4},
+				Value:          []byte{5, 6, 7},
+				Headers: []*Header{{
+					Key:   []byte{8, 9, 10},
+					Value: []byte{11, 12},
+				}},
+			}},
+		},
+		encoded: []byte{
+			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
+			0, 0, 0, 94, // Length
+			0, 0, 0, 0, // Partition Leader Epoch
+			2,                // Version
+			15, 156, 184, 78, // CRC
+			0, 1, // Attributes
+			0, 0, 0, 0, // Last Offset Delta
+			0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
+			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
+			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
+			0, 0, // Producer Epoch
+			0, 0, 0, 0, // First Sequence
+			0, 0, 0, 1, // Number of Records
+			31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101,
+			99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0,
+		},
+		oldGoEncoded: []byte{
+			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
+			0, 0, 0, 94, // Length
+			0, 0, 0, 0, // Partition Leader Epoch
+			2,               // Version
+			144, 168, 0, 33, // CRC
+			0, 1, // Attributes
+			0, 0, 0, 0, // Last Offset Delta
+			0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
+			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
+			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
+			0, 0, // Producer Epoch
+			0, 0, 0, 0, // First Sequence
+			0, 0, 0, 1, // Number of Records
+			31, 139, 8, 0, 0, 9, 110, 136, 0, 255, 210, 96, 224, 98, 224, 96, 100, 98, 102, 97, 99, 101,
+			99, 103, 98, 227, 224, 228, 98, 225, 230, 1, 4, 0, 0, 255, 255, 173, 201, 88, 103, 21, 0, 0, 0,
+		},
+	},
+	{
+		name: "snappy compressed record",
+		batch: RecordBatch{
+			Version:        2,
+			Codec:          CompressionSnappy,
+			FirstTimestamp: 10,
+			Records: []*Record{{
+				TimestampDelta: 5,
+				Key:            []byte{1, 2, 3, 4},
+				Value:          []byte{5, 6, 7},
+				Headers: []*Header{{
+					Key:   []byte{8, 9, 10},
+					Value: []byte{11, 12},
+				}},
+			}},
+		},
+		encoded: []byte{
+			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
+			0, 0, 0, 72, // Length
+			0, 0, 0, 0, // Partition Leader Epoch
+			2,               // Version
+			95, 173, 35, 17, // CRC
+			0, 2, // Attributes
+			0, 0, 0, 0, // Last Offset Delta
+			0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
+			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
+			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
+			0, 0, // Producer Epoch
+			0, 0, 0, 0, // First Sequence
+			0, 0, 0, 1, // Number of Records
+			21, 80, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2, 6, 8, 9, 10, 4, 11, 12,
+		},
+	},
+	{
+		name: "lz4 compressed record",
+		batch: RecordBatch{
+			Version:        2,
+			Codec:          CompressionLZ4,
+			FirstTimestamp: 10,
+			Records: []*Record{{
+				TimestampDelta: 5,
+				Key:            []byte{1, 2, 3, 4},
+				Value:          []byte{5, 6, 7},
+				Headers: []*Header{{
+					Key:   []byte{8, 9, 10},
+					Value: []byte{11, 12},
+				}},
+			}},
+		},
+		encoded: []byte{
+			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
+			0, 0, 0, 89, // Length
+			0, 0, 0, 0, // Partition Leader Epoch
+			2,                // Version
+			129, 238, 43, 82, // CRC
+			0, 3, // Attributes
+			0, 0, 0, 0, // Last Offset Delta
+			0, 0, 0, 0, 0, 0, 0, 10, // First Timestamp
+			0, 0, 0, 0, 0, 0, 0, 0, // Max Timestamp
+			0, 0, 0, 0, 0, 0, 0, 0, // Producer ID
+			0, 0, // Producer Epoch
+			0, 0, 0, 0, // First Sequence
+			0, 0, 0, 1, // Number of Records
+			4, 34, 77, 24, 100, 112, 185, 21, 0, 0, 128, 40, 0, 10, 0, 8, 1, 2, 3, 4, 6, 5, 6, 7, 2,
+			6, 8, 9, 10, 4, 11, 12, 0, 0, 0, 0, 12, 59, 239, 146,
+		},
+	},
+}
+
+func isOldGo(t *testing.T) bool {
+	v := strings.Split(runtime.Version()[2:], ".")
+	if len(v) < 2 {
+		t.Logf("Can't parse version: %s", runtime.Version())
+		return false
+	}
+	maj, err := strconv.Atoi(v[0])
+	if err != nil {
+		t.Logf("Can't parse version: %s", runtime.Version())
+		return false
+	}
+	min, err := strconv.Atoi(v[1])
+	if err != nil {
+		t.Logf("Can't parse version: %s", runtime.Version())
+		return false
+	}
+	return maj < 1 || (maj == 1 && min < 8)
+}
+
+func TestRecordBatchEncoding(t *testing.T) {
+	for _, tc := range recordBatchTestCases {
+		if tc.oldGoEncoded != nil && isOldGo(t) {
+			testEncodable(t, tc.name, &tc.batch, tc.oldGoEncoded)
+		} else {
+			testEncodable(t, tc.name, &tc.batch, tc.encoded)
+		}
+	}
+}
+
+func TestRecordBatchDecoding(t *testing.T) {
+	for _, tc := range recordBatchTestCases {
+		batch := RecordBatch{}
+		testDecodable(t, tc.name, &batch, tc.encoded)
+		for _, r := range batch.Records {
+			if _, err := r.getTotalLength(); err != nil {
+				t.Fatalf("Unexpected error: %v", err)
+			}
+		}
+		for _, r := range tc.batch.Records {
+			if _, err := r.getTotalLength(); err != nil {
+				t.Fatalf("Unexpected error: %v", err)
+			}
+		}
+		if !reflect.DeepEqual(batch, tc.batch) {
+			t.Errorf(spew.Sprintf("invalid decode of %s\ngot %+v\nwanted %+v", tc.name, batch, tc.batch))
+		}
+	}
+}