Browse Source

Added support for Idempotent Producer

Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Co-authored-by: Mickael Maison <mickael.maison@gmail.com>
Mickael Maison 6 years ago
parent
commit
8e2b04b363
9 changed files with 444 additions and 14 deletions
  1. 61 4
      async_producer.go
  2. 180 0
      async_producer_test.go
  3. 23 0
      client.go
  4. 18 0
      config.go
  5. 27 0
      config_test.go
  6. 20 3
      functional_consumer_test.go
  7. 7 1
      produce_response.go
  8. 16 5
      produce_set.go
  9. 92 1
      produce_set_test.go

+ 61 - 4
async_producer.go

@@ -47,6 +47,50 @@ type AsyncProducer interface {
 	Errors() <-chan *ProducerError
 }
 
+// transactionManager keeps the state necessary to ensure idempotent production
+type transactionManager struct {
+	producerID      int64
+	producerEpoch   int16
+	sequenceNumbers map[string]int32
+	mutex           sync.Mutex
+}
+
+const (
+	noProducerID    = -1
+	noProducerEpoch = -1
+)
+
+func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 {
+	key := fmt.Sprintf("%s-%d", topic, partition)
+	t.mutex.Lock()
+	defer t.mutex.Unlock()
+	sequence := t.sequenceNumbers[key]
+	t.sequenceNumbers[key] = sequence + 1
+	return sequence
+}
+
+func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
+	txnmgr := &transactionManager{
+		producerID:    noProducerID,
+		producerEpoch: noProducerEpoch,
+	}
+
+	if conf.Producer.Idempotent {
+		initProducerIDResponse, err := client.InitProducerID()
+		if err != nil {
+			return nil, err
+		}
+		txnmgr.producerID = initProducerIDResponse.ProducerID
+		txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
+		txnmgr.sequenceNumbers = make(map[string]int32)
+		txnmgr.mutex = sync.Mutex{}
+
+		Logger.Printf("Obtained a ProducerId: %d epoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
+	}
+
+	return txnmgr, nil
+}
+
 type asyncProducer struct {
 	client    Client
 	conf      *Config
@@ -59,6 +103,8 @@ type asyncProducer struct {
 	brokers    map[*Broker]chan<- *ProducerMessage
 	brokerRefs map[chan<- *ProducerMessage]int
 	brokerLock sync.Mutex
+
+	txnmgr *transactionManager
 }
 
 // NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
@@ -84,6 +130,11 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
 		return nil, ErrClosedClient
 	}
 
+	txnmgr, err := newTransactionManager(client.Config(), client)
+	if err != nil {
+		return nil, err
+	}
+
 	p := &asyncProducer{
 		client:     client,
 		conf:       client.Config(),
@@ -93,6 +144,7 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
 		retries:    make(chan *ProducerMessage),
 		brokers:    make(map[*Broker]chan<- *ProducerMessage),
 		brokerRefs: make(map[chan<- *ProducerMessage]int),
+		txnmgr:     txnmgr,
 	}
 
 	// launch our singleton dispatchers
@@ -145,9 +197,10 @@ type ProducerMessage struct {
 	// least version 0.10.0.
 	Timestamp time.Time
 
-	retries     int
-	flags       flagSet
-	expectation chan *ProducerError
+	retries        int
+	flags          flagSet
+	expectation    chan *ProducerError
+	sequenceNumber int32
 }
 
 const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
@@ -328,6 +381,10 @@ func (tp *topicProducer) dispatch() {
 				continue
 			}
 		}
+		if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
+			msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
+			//Logger.Printf("Message %s for TP %s-%d got sequence number: %d\n", msg.Value, msg.Topic, msg.Partition, msg.sequenceNumber)
+		}
 
 		handler := tp.handlers[msg.Partition]
 		if handler == nil {
@@ -752,7 +809,7 @@ func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceRespo
 			bp.parent.returnErrors(msgs, ErrIncompleteResponse)
 			return
 		}
-
+		fmt.Printf("response has error %v", block.Err)
 		switch block.Err {
 		// Success
 		case ErrNoError:

+ 180 - 0
async_producer_test.go

@@ -2,9 +2,11 @@ package sarama
 
 import (
 	"errors"
+	"fmt"
 	"log"
 	"os"
 	"os/signal"
+	"strconv"
 	"sync"
 	"testing"
 	"time"
@@ -753,6 +755,184 @@ func TestAsyncProducerNoReturns(t *testing.T) {
 	leader.Close()
 }
 
+func TestAsyncProducerIdempotent(t *testing.T) {
+	broker := NewMockBroker(t, 1)
+
+	clusterID := "cid"
+	metadataResponse := &MetadataResponse{
+		Version:        3,
+		ThrottleTimeMs: 0,
+		ClusterID:      &clusterID,
+		ControllerID:   1,
+	}
+	metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
+	broker.Returns(metadataResponse)
+
+	initProducerID := &InitProducerIDResponse{
+		ThrottleTime:  0,
+		ProducerID:    1000,
+		ProducerEpoch: 1,
+	}
+	broker.Returns(initProducerID)
+
+	config := NewConfig()
+	config.Producer.Flush.Messages = 10
+	config.Producer.Return.Successes = true
+	config.Producer.Retry.Max = 4
+	config.Producer.RequiredAcks = WaitForAll
+	config.Producer.Retry.Backoff = 0
+	config.Producer.Idempotent = true
+	config.Version = V0_11_0_0
+	producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 10; i++ {
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+	}
+
+	prodSuccess := &ProduceResponse{
+		Version:      3,
+		ThrottleTime: 0,
+	}
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+	broker.Returns(prodSuccess)
+	expectResults(t, producer, 10, 0)
+
+	broker.Close()
+	closeProducer(t, producer)
+}
+
+func TestAsyncProducerIdempotentRetry(t *testing.T) {
+	broker := NewMockBroker(t, 1)
+
+	clusterID := "cid"
+	metadataResponse := &MetadataResponse{
+		Version:        3,
+		ThrottleTimeMs: 0,
+		ClusterID:      &clusterID,
+		ControllerID:   1,
+	}
+	metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
+	broker.Returns(metadataResponse)
+
+	initProducerID := &InitProducerIDResponse{
+		ThrottleTime:  0,
+		ProducerID:    1000,
+		ProducerEpoch: 1,
+	}
+	broker.Returns(initProducerID)
+
+	config := NewConfig()
+	config.Producer.Flush.Messages = 10
+	config.Producer.Return.Successes = true
+	config.Producer.Retry.Max = 4
+	config.Producer.RequiredAcks = WaitForAll
+	config.Producer.Retry.Backoff = 0
+	config.Producer.Idempotent = true
+	config.Version = V0_11_0_0
+	producer, err := NewAsyncProducer([]string{broker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 10; i++ {
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage)}
+	}
+
+	prodNotLeader := &ProduceResponse{
+		Version:      3,
+		ThrottleTime: 0,
+	}
+	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas)
+	broker.Returns(prodNotLeader)
+
+	broker.Returns(metadataResponse)
+
+	prodSuccess := &ProduceResponse{
+		Version:      3,
+		ThrottleTime: 0,
+	}
+	prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+	broker.Returns(prodSuccess)
+	expectResults(t, producer, 10, 0)
+
+	broker.Close()
+	closeProducer(t, producer)
+}
+
+func TestAsyncProducerIdempotentRetryBatch(t *testing.T) {
+	Logger = log.New(os.Stderr, "", log.LstdFlags)
+	/*broker := NewMockBroker(t, 1)
+
+	clusterID := "cid"
+	metadataResponse := &MetadataResponse{
+		Version:        3,
+		ThrottleTimeMs: 0,
+		ClusterID:      &clusterID,
+		ControllerID:   1,
+	}
+	metadataResponse.AddBroker(broker.Addr(), broker.BrokerID())
+	metadataResponse.AddTopicPartition("my_topic", 0, broker.BrokerID(), nil, nil, ErrNoError)
+	broker.Returns(metadataResponse)
+
+	initProducerID := &InitProducerIDResponse{
+		ThrottleTime:  0,
+		ProducerID:    1000,
+		ProducerEpoch: 1,
+	}
+	broker.Returns(initProducerID)
+	*/
+	config := NewConfig()
+	config.Producer.Flush.Messages = 3
+	config.Producer.Return.Successes = true
+	config.Producer.Retry.Max = 4
+	config.Producer.RequiredAcks = WaitForAll
+	config.Producer.Retry.Backoff = 100 * time.Millisecond
+	config.Producer.Idempotent = true
+	config.Version = V0_11_0_0
+	producer, err := NewAsyncProducer([]string{"localhost:9092"}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	for i := 0; i < 3; i++ {
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder(TestMessage + strconv.Itoa(i))}
+	}
+	/*prodNotLeader := &ProduceResponse{
+		Version:      3,
+		ThrottleTime: 0,
+	}
+	prodNotLeader.AddTopicPartition("my_topic", 0, ErrNotEnoughReplicas)
+	broker.Returns(prodNotLeader)
+	*/
+	go func() {
+		for i := 0; i < 6; i++ {
+			producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: nil, Value: StringEncoder("goroutine" + strconv.Itoa(i))}
+			time.Sleep(100 * time.Millisecond)
+		}
+	}()
+	/*
+		broker.Returns(metadataResponse)
+
+		prodSuccess := &ProduceResponse{
+			Version:      3,
+			ThrottleTime: 0,
+		}
+		prodSuccess.AddTopicPartition("my_topic", 0, ErrNoError)
+		broker.Returns(prodSuccess)*/
+	expectResults(t, producer, 9, 0)
+
+	fmt.Printf("**** Closing Broker \n")
+	//broker.Close()
+	fmt.Printf("**** Closing producer \n")
+	closeProducer(t, producer)
+	fmt.Printf("**** Closed producer \n")
+}
+
 // This example shows how to use the producer while simultaneously
 // reading the Errors channel to know about any failures.
 func ExampleAsyncProducer_select() {

+ 23 - 0
client.go

@@ -67,6 +67,9 @@ type Client interface {
 	// in local cache. This function only works on Kafka 0.8.2 and higher.
 	RefreshCoordinator(consumerGroup string) error
 
+	// InitProducerID retrieves information required for Idempotent Producer
+	InitProducerID() (*InitProducerIDResponse, error)
+
 	// Close shuts down all broker connections managed by this client. It is required
 	// to call this function before a client object passes out of scope, as it will
 	// otherwise leak memory. You must close any Producers or Consumers using a client
@@ -183,6 +186,26 @@ func (client *client) Brokers() []*Broker {
 	return brokers
 }
 
+func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
+	var err error
+	for broker := client.any(); broker != nil; broker = client.any() {
+
+		req := &InitProducerIDRequest{}
+
+		response, err := broker.InitProducerID(req)
+		switch err.(type) {
+		case nil:
+			return response, nil
+		default:
+			// some error, remove that broker and try again
+			Logger.Printf("Error is %v", err)
+			_ = broker.Close()
+			client.deregisterBroker(broker)
+		}
+	}
+	return nil, err
+}
+
 func (client *client) Close() error {
 	if client.Closed() {
 		// Chances are this is being called from a defer() and the error will go unobserved

+ 18 - 0
config.go

@@ -124,6 +124,9 @@ type Config struct {
 		// (defaults to hashing the message key). Similar to the `partitioner.class`
 		// setting for the JVM producer.
 		Partitioner PartitionerConstructor
+		// If enabled, the producer will ensure that exactly one copy of each message is
+		// written.
+		Idempotent bool
 
 		// Return specifies what channels will be populated. If they are set to true,
 		// you must read from the respective channels to prevent deadlock. If,
@@ -511,6 +514,21 @@ func (c *Config) Validate() error {
 		}
 	}
 
+	if c.Producer.Idempotent {
+		if !c.Version.IsAtLeast(V0_11_0_0) {
+			return ConfigurationError("Idempotent producer requires Version >= V0_11_0_0")
+		}
+		if c.Producer.Retry.Max == 0 {
+			return ConfigurationError("Idempotent producer requires Producer.Retry.Max >= 1")
+		}
+		if c.Producer.RequiredAcks != WaitForAll {
+			return ConfigurationError("Idempotent producer requires Producer.RequiredAcks to be WaitForAll")
+		}
+		if c.Net.MaxOpenRequests > 5 {
+			return ConfigurationError("Idempotent producer requires Net.MaxOpenRequests <= 5")
+		}
+	}
+
 	// validate the Consumer values
 	switch {
 	case c.Consumer.Fetch.Min <= 0:

+ 27 - 0
config_test.go

@@ -207,6 +207,33 @@ func TestProducerConfigValidates(t *testing.T) {
 				cfg.Producer.Retry.Backoff = -1
 			},
 			"Producer.Retry.Backoff must be >= 0"},
+		{"Idempotent Version",
+			func(cfg *Config) {
+				cfg.Producer.Idempotent = true
+				cfg.Version = V0_10_0_0
+			},
+			"Idempotent producer requires Version >= V0_11_0_0"},
+		{"Idempotent with Producer.Retry.Max",
+			func(cfg *Config) {
+				cfg.Version = V0_11_0_0
+				cfg.Producer.Idempotent = true
+				cfg.Producer.Retry.Max = 0
+			},
+			"Idempotent producer requires Producer.Retry.Max >= 1"},
+		{"Idempotent with Producer.RequiredAcks",
+			func(cfg *Config) {
+				cfg.Version = V0_11_0_0
+				cfg.Producer.Idempotent = true
+			},
+			"Idempotent producer requires Producer.RequiredAcks to be WaitForAll"},
+		{"Idempotent with Net.MaxOpenRequests",
+			func(cfg *Config) {
+				cfg.Version = V0_11_0_0
+				cfg.Producer.Idempotent = true
+				cfg.Producer.RequiredAcks = WaitForAll
+				cfg.Net.MaxOpenRequests = 6
+			},
+			"Idempotent producer requires Net.MaxOpenRequests <= 5"},
 	}
 
 	for i, test := range tests {

+ 20 - 3
functional_consumer_test.go

@@ -81,7 +81,7 @@ func TestVersionMatrix(t *testing.T) {
 	// protocol versions and compressions for the except of LZ4.
 	testVersions := versionRange(V0_8_2_0)
 	allCodecsButLZ4 := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy}
-	producedMessages := produceMsgs(t, testVersions, allCodecsButLZ4, 17, 100)
+	producedMessages := produceMsgs(t, testVersions, allCodecsButLZ4, 17, 100, false)
 
 	// When/Then
 	consumeMsgs(t, testVersions, producedMessages)
@@ -98,7 +98,20 @@ func TestVersionMatrixLZ4(t *testing.T) {
 	// and all possible compressions.
 	testVersions := versionRange(V0_10_0_0)
 	allCodecs := []CompressionCodec{CompressionNone, CompressionGZIP, CompressionSnappy, CompressionLZ4}
-	producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100)
+	producedMessages := produceMsgs(t, testVersions, allCodecs, 17, 100, false)
+
+	// When/Then
+	consumeMsgs(t, testVersions, producedMessages)
+}
+
+func TestVersionMatrixIdempotent(t *testing.T) {
+	setupFunctionalTest(t)
+	defer teardownFunctionalTest(t)
+
+	// Produce lot's of message with all possible combinations of supported
+	// protocol versions starting with v0.11 (first where idempotent was supported)
+	testVersions := versionRange(V0_11_0_0)
+	producedMessages := produceMsgs(t, testVersions, []CompressionCodec{CompressionNone}, 17, 100, true)
 
 	// When/Then
 	consumeMsgs(t, testVersions, producedMessages)
@@ -133,7 +146,7 @@ func versionRange(lower KafkaVersion) []KafkaVersion {
 	return versions
 }
 
-func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int) []*ProducerMessage {
+func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage {
 	var wg sync.WaitGroup
 	var producedMessagesMu sync.Mutex
 	var producedMessages []*ProducerMessage
@@ -145,6 +158,10 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
 			prodCfg.Producer.Return.Errors = true
 			prodCfg.Producer.Flush.MaxMessages = flush
 			prodCfg.Producer.Compression = codec
+			prodCfg.Producer.Idempotent = idempotent
+			if idempotent {
+				prodCfg.Producer.RequiredAcks = WaitForAll
+			}
 
 			p, err := NewSyncProducer(kafkaBrokers, prodCfg)
 			if err != nil {

+ 7 - 1
produce_response.go

@@ -179,5 +179,11 @@ func (r *ProduceResponse) AddTopicPartition(topic string, partition int32, err K
 		byTopic = make(map[int32]*ProduceResponseBlock)
 		r.Blocks[topic] = byTopic
 	}
-	byTopic[partition] = &ProduceResponseBlock{Err: err}
+	block := &ProduceResponseBlock{
+		Err: err,
+	}
+	if r.Version >= 2 {
+		block.Timestamp = time.Now()
+	}
+	byTopic[partition] = block
 }

+ 16 - 5
produce_set.go

@@ -2,6 +2,8 @@ package sarama
 
 import (
 	"encoding/binary"
+	"errors"
+	"fmt"
 	"time"
 )
 
@@ -58,12 +60,17 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
 	set := partitions[msg.Partition]
 	if set == nil {
 		if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
+			fmt.Printf("Creating a new batch for partition %s-%d with base sequence %d \n", msg.Topic, msg.Partition, msg.sequenceNumber)
 			batch := &RecordBatch{
 				FirstTimestamp:   timestamp,
 				Version:          2,
-				ProducerID:       -1, /* No producer id */
 				Codec:            ps.parent.conf.Producer.Compression,
 				CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
+				ProducerID:       ps.parent.txnmgr.producerID,
+				ProducerEpoch:    ps.parent.txnmgr.producerEpoch,
+			}
+			if ps.parent.conf.Producer.Idempotent {
+				batch.FirstSequence = msg.sequenceNumber
 			}
 			set = &partitionSet{recordsToSend: newDefaultRecords(batch)}
 			size = recordBatchOverhead
@@ -72,9 +79,13 @@ func (ps *produceSet) add(msg *ProducerMessage) error {
 		}
 		partitions[msg.Partition] = set
 	}
-
+	fmt.Printf("Adding message with sequence %d to batch for partition %s-%d value: %v\n", msg.sequenceNumber, msg.Topic, msg.Partition, msg.Value)
 	set.msgs = append(set.msgs, msg)
+
 	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
+		if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
+			return errors.New("Assertion failed: Message out of sequence added to a batch")
+		}
 		// We are being conservative here to avoid having to prep encode the record
 		size += maximumRecordOverhead
 		rec := &Record{
@@ -120,8 +131,8 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 		req.Version = 3
 	}
 
-	for topic, partitionSet := range ps.msgs {
-		for partition, set := range partitionSet {
+	for topic, partitionSets := range ps.msgs {
+		for partition, set := range partitionSets {
 			if req.Version >= 3 {
 				// If the API version we're hitting is 3 or greater, we need to calculate
 				// offsets for each record in the batch relative to FirstOffset.
@@ -137,7 +148,7 @@ func (ps *produceSet) buildRequest() *ProduceRequest {
 						record.OffsetDelta = int64(i)
 					}
 				}
-
+				fmt.Printf("Add batch to ProduceRequest for TP %s-%d with firstSeq %d, size: %d\n", topic, partition, rb.FirstSequence, len(rb.Records))
 				req.AddBatch(topic, partition, rb)
 				continue
 			}

+ 92 - 1
produce_set_test.go

@@ -7,8 +7,11 @@ import (
 )
 
 func makeProduceSet() (*asyncProducer, *produceSet) {
+	conf := NewConfig()
+	txnmgr, _ := newTransactionManager(conf, nil)
 	parent := &asyncProducer{
-		conf: NewConfig(),
+		conf:   conf,
+		txnmgr: txnmgr,
 	}
 	return parent, newProduceSet(parent)
 }
@@ -253,3 +256,91 @@ func TestProduceSetV3RequestBuilding(t *testing.T) {
 		}
 	}
 }
+
+func TestProduceSetIdempotentRequestBuilding(t *testing.T) {
+	const pID = 1000
+	const pEpoch = 1234
+
+	config := NewConfig()
+	config.Producer.RequiredAcks = WaitForAll
+	config.Producer.Idempotent = true
+	config.Version = V0_11_0_0
+
+	parent := &asyncProducer{
+		conf: config,
+		txnmgr: &transactionManager{
+			producerID:    pID,
+			producerEpoch: pEpoch,
+		},
+	}
+	ps := newProduceSet(parent)
+
+	now := time.Now()
+	msg := &ProducerMessage{
+		Topic:     "t1",
+		Partition: 0,
+		Key:       StringEncoder(TestMessage),
+		Value:     StringEncoder(TestMessage),
+		Headers: []RecordHeader{
+			RecordHeader{
+				Key:   []byte("header-1"),
+				Value: []byte("value-1"),
+			},
+			RecordHeader{
+				Key:   []byte("header-2"),
+				Value: []byte("value-2"),
+			},
+			RecordHeader{
+				Key:   []byte("header-3"),
+				Value: []byte("value-3"),
+			},
+		},
+		Timestamp:      now,
+		sequenceNumber: 123,
+	}
+	for i := 0; i < 10; i++ {
+		safeAddMessage(t, ps, msg)
+		msg.Timestamp = msg.Timestamp.Add(time.Second)
+	}
+
+	req := ps.buildRequest()
+
+	if req.Version != 3 {
+		t.Error("Wrong request version")
+	}
+
+	batch := req.records["t1"][0].RecordBatch
+	if batch.FirstTimestamp != now {
+		t.Errorf("Wrong first timestamp: %v", batch.FirstTimestamp)
+	}
+	if batch.ProducerID != pID {
+		t.Errorf("Wrong producerID: %v", batch.ProducerID)
+	}
+	if batch.ProducerEpoch != pEpoch {
+		t.Errorf("Wrong producerEpoch: %v", batch.ProducerEpoch)
+	}
+	if batch.FirstSequence != 123 {
+		t.Errorf("Wrong first sequence: %v", batch.FirstSequence)
+	}
+	for i := 0; i < 10; i++ {
+		rec := batch.Records[i]
+		if rec.TimestampDelta != time.Duration(i)*time.Second {
+			t.Errorf("Wrong timestamp delta: %v", rec.TimestampDelta)
+		}
+
+		if rec.OffsetDelta != int64(i) {
+			t.Errorf("Wrong relative inner offset, expected %d, got %d", i, rec.OffsetDelta)
+		}
+
+		for j, h := range batch.Records[i].Headers {
+			exp := fmt.Sprintf("header-%d", j+1)
+			if string(h.Key) != exp {
+				t.Errorf("Wrong header key, expected %v, got %v", exp, h.Key)
+			}
+			exp = fmt.Sprintf("value-%d", j+1)
+			if string(h.Value) != exp {
+				t.Errorf("Wrong header value, expected %v, got %v", exp, h.Value)
+			}
+		}
+	}
+}