Browse Source

Merge pull request #976 from wladh/producer

Add producer support for Kafka 0.11 records
Evan Huus 8 years ago
parent
commit
b305748200
15 changed files with 324 additions and 80 deletions
  1. 20 3
      async_producer.go
  2. 1 1
      mockresponses.go
  3. 1 0
      packet_decoder.go
  4. 1 0
      packet_encoder.go
  5. 8 0
      prep_encoder.go
  6. 100 52
      produce_request.go
  7. 56 0
      produce_request_test.go
  8. 2 0
      produce_response.go
  9. 68 20
      produce_set.go
  10. 39 2
      produce_set_test.go
  11. 9 0
      real_decoder.go
  12. 8 0
      real_encoder.go
  13. 6 2
      record.go
  14. 1 0
      record_batch.go
  15. 4 0
      record_test.go

+ 20 - 3
async_producer.go

@@ -1,6 +1,7 @@
 package sarama
 
 import (
+	"encoding/binary"
 	"fmt"
 	"sync"
 	"time"
@@ -119,6 +120,10 @@ type ProducerMessage struct {
 	// StringEncoder and ByteEncoder.
 	Value Encoder
 
+	// The headers are key-value pairs that are transparently passed
+	// by Kafka between producers and consumers.
+	Headers []RecordHeader
+
 	// This field is used to hold arbitrary data you wish to include so it
 	// will be available when receiving on the Successes and Errors channels.
 	// Sarama completely ignores this field and is only to be used for
@@ -146,8 +151,16 @@ type ProducerMessage struct {
 
 const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
 
-func (m *ProducerMessage) byteSize() int {
-	size := producerMessageOverhead
+func (m *ProducerMessage) byteSize(version int) int {
+	var size int
+	if version >= 2 {
+		size = maximumRecordOverhead
+		for _, h := range m.Headers {
+			size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
+		}
+	} else {
+		size = producerMessageOverhead
+	}
 	if m.Key != nil {
 		size += m.Key.Length()
 	}
@@ -254,7 +267,11 @@ func (p *asyncProducer) dispatcher() {
 			p.inFlight.Add(1)
 		}
 
-		if msg.byteSize() > p.conf.Producer.MaxMessageBytes {
+		version := 1
+		if p.conf.Version.IsAtLeast(V0_11_0_0) {
+			version = 2
+		}
+		if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
 			p.returnError(msg, ErrMessageSizeTooLarge)
 			continue
 		}

+ 1 - 1
mockresponses.go

@@ -408,7 +408,7 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE
 func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
 	req := reqBody.(*ProduceRequest)
 	res := &ProduceResponse{}
-	for topic, partitions := range req.msgSets {
+	for topic, partitions := range req.records {
 		for partition := range partitions {
 			res.AddTopicPartition(topic, partition, mr.getError(topic, partition))
 		}

+ 1 - 0
packet_decoder.go

@@ -17,6 +17,7 @@ type packetDecoder interface {
 	getVarintBytes() ([]byte, error)
 	getRawBytes(length int) ([]byte, error)
 	getString() (string, error)
+	getNullableString() (*string, error)
 	getInt32Array() ([]int32, error)
 	getInt64Array() ([]int64, error)
 	getStringArray() ([]string, error)

+ 1 - 0
packet_encoder.go

@@ -19,6 +19,7 @@ type packetEncoder interface {
 	putVarintBytes(in []byte) error
 	putRawBytes(in []byte) error
 	putString(in string) error
+	putNullableString(in *string) error
 	putStringArray(in []string) error
 	putInt32Array(in []int32) error
 	putInt64Array(in []int64) error

+ 8 - 0
prep_encoder.go

@@ -71,6 +71,14 @@ func (pe *prepEncoder) putRawBytes(in []byte) error {
 	return nil
 }
 
+func (pe *prepEncoder) putNullableString(in *string) error {
+	if in == nil {
+		pe.length += 2
+		return nil
+	}
+	return pe.putString(*in)
+}
+
 func (pe *prepEncoder) putString(in string) error {
 	pe.length += 2
 	if len(in) > math.MaxInt16 {

+ 100 - 52
produce_request.go

@@ -21,19 +21,56 @@ const (
 )
 
 type ProduceRequest struct {
-	RequiredAcks RequiredAcks
-	Timeout      int32
-	Version      int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10
-	msgSets      map[string]map[int32]*MessageSet
+	TransactionalID *string
+	RequiredAcks    RequiredAcks
+	Timeout         int32
+	Version         int16 // v1 requires Kafka 0.9, v2 requires Kafka 0.10, v3 requires Kafka 0.11
+	records         map[string]map[int32]Records
+}
+
+func updateMsgSetMetrics(msgSet *MessageSet, compressionRatioMetric metrics.Histogram,
+	topicCompressionRatioMetric metrics.Histogram) int64 {
+	var topicRecordCount int64
+	for _, messageBlock := range msgSet.Messages {
+		// Is this a fake "message" wrapping real messages?
+		if messageBlock.Msg.Set != nil {
+			topicRecordCount += int64(len(messageBlock.Msg.Set.Messages))
+		} else {
+			// A single uncompressed message
+			topicRecordCount++
+		}
+		// Better be safe than sorry when computing the compression ratio
+		if messageBlock.Msg.compressedSize != 0 {
+			compressionRatio := float64(len(messageBlock.Msg.Value)) /
+				float64(messageBlock.Msg.compressedSize)
+			// Histogram do not support decimal values, let's multiple it by 100 for better precision
+			intCompressionRatio := int64(100 * compressionRatio)
+			compressionRatioMetric.Update(intCompressionRatio)
+			topicCompressionRatioMetric.Update(intCompressionRatio)
+		}
+	}
+	return topicRecordCount
+}
+
+func updateBatchMetrics(recordBatch *RecordBatch, compressionRatioMetric metrics.Histogram,
+	topicCompressionRatioMetric metrics.Histogram) int64 {
+	if recordBatch.compressedRecords != nil {
+		compressionRatio := int64(float64(recordBatch.recordsLen) / float64(len(recordBatch.compressedRecords)) * 100)
+		compressionRatioMetric.Update(compressionRatio)
+		topicCompressionRatioMetric.Update(compressionRatio)
+	}
+
+	return int64(len(recordBatch.Records))
 }
 
 func (r *ProduceRequest) encode(pe packetEncoder) error {
+	if r.Version >= 3 {
+		if err := pe.putNullableString(r.TransactionalID); err != nil {
+			return err
+		}
+	}
 	pe.putInt16(int16(r.RequiredAcks))
 	pe.putInt32(r.Timeout)
-	err := pe.putArrayLength(len(r.msgSets))
-	if err != nil {
-		return err
-	}
 	metricRegistry := pe.metricRegistry()
 	var batchSizeMetric metrics.Histogram
 	var compressionRatioMetric metrics.Histogram
@@ -41,9 +78,14 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
 		batchSizeMetric = getOrRegisterHistogram("batch-size", metricRegistry)
 		compressionRatioMetric = getOrRegisterHistogram("compression-ratio", metricRegistry)
 	}
-
 	totalRecordCount := int64(0)
-	for topic, partitions := range r.msgSets {
+
+	err := pe.putArrayLength(len(r.records))
+	if err != nil {
+		return err
+	}
+
+	for topic, partitions := range r.records {
 		err = pe.putString(topic)
 		if err != nil {
 			return err
@@ -57,11 +99,11 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
 		if metricRegistry != nil {
 			topicCompressionRatioMetric = getOrRegisterTopicHistogram("compression-ratio", topic, metricRegistry)
 		}
-		for id, msgSet := range partitions {
+		for id, records := range partitions {
 			startOffset := pe.offset()
 			pe.putInt32(id)
 			pe.push(&lengthField{})
-			err = msgSet.encode(pe)
+			err = records.encode(pe)
 			if err != nil {
 				return err
 			}
@@ -70,23 +112,10 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
 				return err
 			}
 			if metricRegistry != nil {
-				for _, messageBlock := range msgSet.Messages {
-					// Is this a fake "message" wrapping real messages?
-					if messageBlock.Msg.Set != nil {
-						topicRecordCount += int64(len(messageBlock.Msg.Set.Messages))
-					} else {
-						// A single uncompressed message
-						topicRecordCount++
-					}
-					// Better be safe than sorry when computing the compression ratio
-					if messageBlock.Msg.compressedSize != 0 {
-						compressionRatio := float64(len(messageBlock.Msg.Value)) /
-							float64(messageBlock.Msg.compressedSize)
-						// Histogram do not support decimal values, let's multiple it by 100 for better precision
-						intCompressionRatio := int64(100 * compressionRatio)
-						compressionRatioMetric.Update(intCompressionRatio)
-						topicCompressionRatioMetric.Update(intCompressionRatio)
-					}
+				if r.Version >= 3 {
+					topicRecordCount += updateBatchMetrics(records.recordBatch, compressionRatioMetric, topicCompressionRatioMetric)
+				} else {
+					topicRecordCount += updateMsgSetMetrics(records.msgSet, compressionRatioMetric, topicCompressionRatioMetric)
 				}
 				batchSize := int64(pe.offset() - startOffset)
 				batchSizeMetric.Update(batchSize)
@@ -108,6 +137,15 @@ func (r *ProduceRequest) encode(pe packetEncoder) error {
 }
 
 func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
+	r.Version = version
+
+	if version >= 3 {
+		id, err := pd.getNullableString()
+		if err != nil {
+			return err
+		}
+		r.TransactionalID = id
+	}
 	requiredAcks, err := pd.getInt16()
 	if err != nil {
 		return err
@@ -123,7 +161,8 @@ func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
 	if topicCount == 0 {
 		return nil
 	}
-	r.msgSets = make(map[string]map[int32]*MessageSet)
+
+	r.records = make(map[string]map[int32]Records)
 	for i := 0; i < topicCount; i++ {
 		topic, err := pd.getString()
 		if err != nil {
@@ -133,28 +172,34 @@ func (r *ProduceRequest) decode(pd packetDecoder, version int16) error {
 		if err != nil {
 			return err
 		}
-		r.msgSets[topic] = make(map[int32]*MessageSet)
+		r.records[topic] = make(map[int32]Records)
+
 		for j := 0; j < partitionCount; j++ {
 			partition, err := pd.getInt32()
 			if err != nil {
 				return err
 			}
-			messageSetSize, err := pd.getInt32()
+			size, err := pd.getInt32()
 			if err != nil {
 				return err
 			}
-			msgSetDecoder, err := pd.getSubset(int(messageSetSize))
+			recordsDecoder, err := pd.getSubset(int(size))
 			if err != nil {
 				return err
 			}
-			msgSet := &MessageSet{}
-			err = msgSet.decode(msgSetDecoder)
-			if err != nil {
+			var records Records
+			if version >= 3 {
+				records = newDefaultRecords(nil)
+			} else {
+				records = newLegacyRecords(nil)
+			}
+			if err := records.decode(recordsDecoder); err != nil {
 				return err
 			}
-			r.msgSets[topic][partition] = msgSet
+			r.records[topic][partition] = records
 		}
 	}
+
 	return nil
 }
 
@@ -172,38 +217,41 @@ func (r *ProduceRequest) requiredVersion() KafkaVersion {
 		return V0_9_0_0
 	case 2:
 		return V0_10_0_0
+	case 3:
+		return V0_11_0_0
 	default:
 		return minVersion
 	}
 }
 
-func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
-	if r.msgSets == nil {
-		r.msgSets = make(map[string]map[int32]*MessageSet)
+func (r *ProduceRequest) ensureRecords(topic string, partition int32) {
+	if r.records == nil {
+		r.records = make(map[string]map[int32]Records)
 	}
 
-	if r.msgSets[topic] == nil {
-		r.msgSets[topic] = make(map[int32]*MessageSet)
+	if r.records[topic] == nil {
+		r.records[topic] = make(map[int32]Records)
 	}
+}
 
-	set := r.msgSets[topic][partition]
+func (r *ProduceRequest) AddMessage(topic string, partition int32, msg *Message) {
+	r.ensureRecords(topic, partition)
+	set := r.records[topic][partition].msgSet
 
 	if set == nil {
 		set = new(MessageSet)
-		r.msgSets[topic][partition] = set
+		r.records[topic][partition] = newLegacyRecords(set)
 	}
 
 	set.addMessage(msg)
 }
 
 func (r *ProduceRequest) AddSet(topic string, partition int32, set *MessageSet) {
-	if r.msgSets == nil {
-		r.msgSets = make(map[string]map[int32]*MessageSet)
-	}
-
-	if r.msgSets[topic] == nil {
-		r.msgSets[topic] = make(map[int32]*MessageSet)
-	}
+	r.ensureRecords(topic, partition)
+	r.records[topic][partition] = newLegacyRecords(set)
+}
 
-	r.msgSets[topic][partition] = set
+func (r *ProduceRequest) AddBatch(topic string, partition int32, batch *RecordBatch) {
+	r.ensureRecords(topic, partition)
+	r.records[topic][partition] = newDefaultRecords(batch)
 }

+ 56 - 0
produce_request_test.go

@@ -2,6 +2,7 @@ package sarama
 
 import (
 	"testing"
+	"time"
 )
 
 var (
@@ -32,6 +33,41 @@ var (
 		0x00,
 		0xFF, 0xFF, 0xFF, 0xFF,
 		0x00, 0x00, 0x00, 0x02, 0x00, 0xEE}
+
+	produceRequestOneRecord = []byte{
+		0xFF, 0xFF, // Transaction ID
+		0x01, 0x23, // Required Acks
+		0x00, 0x00, 0x04, 0x44, // Timeout
+		0x00, 0x00, 0x00, 0x01, // Number of Topics
+		0x00, 0x05, 't', 'o', 'p', 'i', 'c', // Topic
+		0x00, 0x00, 0x00, 0x01, // Number of Partitions
+		0x00, 0x00, 0x00, 0xAD, // Partition
+		0x00, 0x00, 0x00, 0x52, // Records length
+		// recordBatch
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x46,
+		0x00, 0x00, 0x00, 0x00,
+		0x02,
+		0x54, 0x79, 0x61, 0xFD,
+		0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x01, 0x58, 0x8D, 0xCD, 0x59, 0x38,
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x00, 0x00, 0x01,
+		// record
+		0x28,
+		0x00,
+		0x0A,
+		0x00,
+		0x08, 0x01, 0x02, 0x03, 0x04,
+		0x06, 0x05, 0x06, 0x07,
+		0x02,
+		0x06, 0x08, 0x09, 0x0A,
+		0x04, 0x0B, 0x0C,
+	}
 )
 
 func TestProduceRequest(t *testing.T) {
@@ -44,4 +80,24 @@ func TestProduceRequest(t *testing.T) {
 
 	request.AddMessage("topic", 0xAD, &Message{Codec: CompressionNone, Key: nil, Value: []byte{0x00, 0xEE}})
 	testRequest(t, "one message", request, produceRequestOneMessage)
+
+	request.Version = 3
+	batch := &RecordBatch{
+		Version:        2,
+		FirstTimestamp: time.Unix(1479847795, 0),
+		MaxTimestamp:   time.Unix(0, 0),
+		Records: []*Record{{
+			TimestampDelta: 5 * time.Millisecond,
+			Key:            []byte{0x01, 0x02, 0x03, 0x04},
+			Value:          []byte{0x05, 0x06, 0x07},
+			Headers: []*RecordHeader{{
+				Key:   []byte{0x08, 0x09, 0x0A},
+				Value: []byte{0x0B, 0x0C},
+			}},
+		}},
+	}
+	request.AddBatch("topic", 0xAD, batch)
+	packet := testRequestEncode(t, "one record", request, produceRequestOneRecord)
+	batch.Records[0].length.startOffset = 0
+	testRequestDecode(t, "one record", request, packet)
 }

+ 2 - 0
produce_response.go

@@ -149,6 +149,8 @@ func (r *ProduceResponse) requiredVersion() KafkaVersion {
 		return V0_9_0_0
 	case 2:
 		return V0_10_0_0
+	case 3:
+		return V0_11_0_0
 	default:
 		return minVersion
 	}

+ 68 - 20
produce_set.go

@@ -1,11 +1,14 @@
 package sarama
 
-import "time"
+import (
+	"encoding/binary"
+	"time"
+)
 
 type partitionSet struct {
-	msgs        []*ProducerMessage
-	setToSend   *MessageSet
-	bufferBytes int
+	msgs          []*ProducerMessage
+	recordsToSend Records
+	bufferBytes   int
 }
 
 type produceSet struct {
@@ -39,31 +42,64 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
 		}
 	}
 
+	timestamp := msg.Timestamp
+	if msg.Timestamp.IsZero() {
+		timestamp = time.Now()
+	}
+
 	partitions := ps.msgs[msg.Topic]
 	if partitions == nil {
 		partitions = make(map[int32]*partitionSet)
 		ps.msgs[msg.Topic] = partitions
 	}
 
+	var size int
+
 	set := partitions[msg.Partition]
 	if set == nil {
-		set = &partitionSet{setToSend: new(MessageSet)}
+		if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
+			batch := &RecordBatch{
+				FirstTimestamp: timestamp,
+				Version:        2,
+				ProducerID:     -1, /* No producer id */
+				Codec:          ps.parent.conf.Producer.Compression,
+			}
+			set = &partitionSet{recordsToSend: newDefaultRecords(batch)}
+			size = recordBatchOverhead
+		} else {
+			set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))}
+		}
 		partitions[msg.Partition] = set
 	}
 
 	set.msgs = append(set.msgs, msg)
-	msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
-	if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
-		if msg.Timestamp.IsZero() {
-			msgToSend.Timestamp = time.Now()
-		} else {
-			msgToSend.Timestamp = msg.Timestamp
+	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
+		// We are being conservative here to avoid having to prep encode the record
+		size += maximumRecordOverhead
+		rec := &Record{
+			Key:            key,
+			Value:          val,
+			TimestampDelta: timestamp.Sub(set.recordsToSend.recordBatch.FirstTimestamp),
+		}
+		size += len(key) + len(val)
+		if len(msg.Headers) > 0 {
+			rec.Headers = make([]*RecordHeader, len(msg.Headers))
+			for i, h := range msg.Headers {
+				rec.Headers[i] = &h
+				size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
+			}
+		}
+		set.recordsToSend.recordBatch.addRecord(rec)
+	} else {
+		msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
+		if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
+			msgToSend.Timestamp = timestamp
+			msgToSend.Version = 1
 		}
-		msgToSend.Version = 1
+		set.recordsToSend.msgSet.addMessage(msgToSend)
+		size = producerMessageOverhead + len(key) + len(val)
 	}
-	set.setToSend.addMessage(msgToSend)
 
-	size := producerMessageOverhead + len(key) + len(val)
 	set.bufferBytes += size
 	ps.bufferBytes += size
 	ps.bufferCount++
@@ -79,17 +115,24 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 	if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
 		req.Version = 2
 	}
+	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
+		req.Version = 3
+	}
 
 	for topic, partitionSet := range ps.msgs {
 		for partition, set := range partitionSet {
+			if req.Version >= 3 {
+				req.AddBatch(topic, partition, set.recordsToSend.recordBatch)
+				continue
+			}
 			if ps.parent.conf.Producer.Compression == CompressionNone {
-				req.AddSet(topic, partition, set.setToSend)
+				req.AddSet(topic, partition, set.recordsToSend.msgSet)
 			} else {
 				// When compression is enabled, the entire set for each partition is compressed
 				// and sent as the payload of a single fake "message" with the appropriate codec
 				// set and no key. When the server sees a message with a compression codec, it
 				// decompresses the payload and treats the result as its message set.
-				payload, err := encode(set.setToSend, ps.parent.conf.MetricRegistry)
+				payload, err := encode(set.recordsToSend.msgSet, ps.parent.conf.MetricRegistry)
 				if err != nil {
 					Logger.Println(err) // if this happens, it's basically our fault.
 					panic(err)
@@ -98,11 +141,11 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 					Codec: ps.parent.conf.Producer.Compression,
 					Key:   nil,
 					Value: payload,
-					Set:   set.setToSend, // Provide the underlying message set for accurate metrics
+					Set:   set.recordsToSend.msgSet, // Provide the underlying message set for accurate metrics
 				}
 				if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
 					compMsg.Version = 1
-					compMsg.Timestamp = set.setToSend.Messages[0].Msg.Timestamp
+					compMsg.Timestamp = set.recordsToSend.msgSet.Messages[0].Msg.Timestamp
 				}
 				req.AddMessage(topic, partition, compMsg)
 			}
@@ -135,14 +178,19 @@ func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMe
 }
 
 func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
+	version := 1
+	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
+		version = 2
+	}
+
 	switch {
 	// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
-	case ps.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)):
+	case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)):
 		return true
 	// Would we overflow the size-limit of a compressed message-batch for this partition?
 	case ps.parent.conf.Producer.Compression != CompressionNone &&
 		ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
-		ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize() >= ps.parent.conf.Producer.MaxMessageBytes:
+		ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
 		return true
 	// Would we overflow simply in number of messages?
 	case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages:

+ 39 - 2
produce_set_test.go

@@ -137,7 +137,7 @@ func TestProduceSetRequestBuilding(t *testing.T) {
 		t.Error("Timeout not set properly")
 	}
 
-	if len(req.msgSets) != 2 {
+	if len(req.records) != 2 {
 		t.Error("Wrong number of topics in request")
 	}
 }
@@ -166,7 +166,7 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) {
 		t.Error("Wrong request version")
 	}
 
-	for _, msgBlock := range req.msgSets["t1"][0].Messages {
+	for _, msgBlock := range req.records["t1"][0].msgSet.Messages {
 		msg := msgBlock.Msg
 		err := msg.decodeSet()
 		if err != nil {
@@ -183,3 +183,40 @@ func TestProduceSetCompressedRequestBuilding(t *testing.T) {
 		}
 	}
 }
+
+func TestProduceSetV3RequestBuilding(t *testing.T) {
+	parent, ps := makeProduceSet()
+	parent.conf.Producer.RequiredAcks = WaitForAll
+	parent.conf.Producer.Timeout = 10 * time.Second
+	parent.conf.Version = V0_11_0_0
+
+	now := time.Now()
+	msg := &ProducerMessage{
+		Topic:     "t1",
+		Partition: 0,
+		Key:       StringEncoder(TestMessage),
+		Value:     StringEncoder(TestMessage),
+		Timestamp: now,
+	}
+	for i := 0; i < 10; i++ {
+		safeAddMessage(t, ps, msg)
+		msg.Timestamp = msg.Timestamp.Add(time.Second)
+	}
+
+	req := ps.buildRequest()
+
+	if req.Version != 3 {
+		t.Error("Wrong request version")
+	}
+
+	batch := req.records["t1"][0].recordBatch
+	if batch.FirstTimestamp != now {
+		t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
+	}
+	for i := 0; i < 10; i++ {
+		rec := batch.Records[i]
+		if rec.TimestampDelta != time.Duration(i)*time.Second {
+			t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta)
+		}
+	}
+}

+ 9 - 0
real_decoder.go

@@ -142,6 +142,15 @@ func (rd *realDecoder) getString() (string, error) {
 	return tmpStr, nil
 }
 
+func (rd *realDecoder) getNullableString() (*string, error) {
+	tmp, err := rd.getInt16()
+	if err != nil || tmp == -1 {
+		return nil, err
+	}
+	str, err := rd.getString()
+	return &str, err
+}
+
 func (rd *realDecoder) getInt32Array() ([]int32, error) {
 	if rd.remaining() < 4 {
 		rd.off = len(rd.raw)

+ 8 - 0
real_encoder.go

@@ -77,6 +77,14 @@ func (re *realEncoder) putString(in string) error {
 	return nil
 }
 
+func (re *realEncoder) putNullableString(in *string) error {
+	if in == nil {
+		re.putInt16(-1)
+		return nil
+	}
+	return re.putString(*in)
+}
+
 func (re *realEncoder) putStringArray(in []string) error {
 	err := re.putArrayLength(len(in))
 	if err != nil {

+ 6 - 2
record.go

@@ -1,9 +1,13 @@
 package sarama
 
-import "time"
+import (
+	"encoding/binary"
+	"time"
+)
 
 const (
-	controlMask = 0x20
+	controlMask           = 0x20
+	maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1
 )
 
 type RecordHeader struct {

+ 1 - 0
record_batch.go

@@ -191,6 +191,7 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 		return PacketDecodingError{fmt.Sprintf("invalid compression specified (%d)", b.Codec)}
 	}
 
+	b.recordsLen = len(recBuffer)
 	err = decode(recBuffer, recordsArray(b.Records))
 	if err == ErrInsufficientData {
 		b.PartialTrailingRecord = true

+ 4 - 0
record_test.go

@@ -81,6 +81,7 @@ var recordBatchTestCases = []struct {
 					Value: []byte{11, 12},
 				}},
 			}},
+			recordsLen: 21,
 		},
 		encoded: []byte{
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
@@ -127,6 +128,7 @@ var recordBatchTestCases = []struct {
 					Value: []byte{11, 12},
 				}},
 			}},
+			recordsLen: 21,
 		},
 		encoded: []byte{
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
@@ -179,6 +181,7 @@ var recordBatchTestCases = []struct {
 					Value: []byte{11, 12},
 				}},
 			}},
+			recordsLen: 21,
 		},
 		encoded: []byte{
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset
@@ -213,6 +216,7 @@ var recordBatchTestCases = []struct {
 					Value: []byte{11, 12},
 				}},
 			}},
+			recordsLen: 21,
 		},
 		encoded: []byte{
 			0, 0, 0, 0, 0, 0, 0, 0, // First Offset