Browse Source

Merge pull request #1307 from FrancoisPoinsot/read-committed

Implement ReadCommitted
Vlad Gorodetsky 6 years ago
parent
commit
8c3fe6939f

+ 1 - 0
.travis.yml

@@ -19,6 +19,7 @@ before_install:
 - vagrant/install_cluster.sh
 - vagrant/boot_cluster.sh
 - vagrant/create_topics.sh
+- vagrant/run_java_producer.sh
 
 install: make install_dependencies
 

+ 12 - 0
config.go

@@ -338,6 +338,11 @@ type Config struct {
 				Max int
 			}
 		}
+
+		// IsolationLevel support 2 mode:
+		// 	- use `ReadUncommitted` (default) to consume and return all messages in message channel
+		//	- use `ReadCommitted` to hide messages that are part of an aborted transaction
+		IsolationLevel IsolationLevel
 	}
 
 	// A user-provided string sent with every request to the brokers for logging,
@@ -602,6 +607,13 @@ func (c *Config) Validate() error {
 		return ConfigurationError("Consumer.Offsets.Initial must be OffsetOldest or OffsetNewest")
 	case c.Consumer.Offsets.Retry.Max < 0:
 		return ConfigurationError("Consumer.Offsets.Retry.Max must be >= 0")
+	case c.Consumer.IsolationLevel != ReadUncommitted && c.Consumer.IsolationLevel != ReadCommitted:
+		return ConfigurationError("Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted")
+	}
+
+	// validate IsolationLevel
+	if c.Consumer.IsolationLevel == ReadCommitted && !c.Version.IsAtLeast(V0_11_0_0) {
+		return ConfigurationError("ReadCommitted requires Version >= V0_11_0_0")
 	}
 
 	// validate the Consumer Group values

+ 30 - 0
config_test.go

@@ -282,6 +282,36 @@ func TestProducerConfigValidates(t *testing.T) {
 		}
 	}
 }
+func TestConsumerConfigValidates(t *testing.T) {
+	tests := []struct {
+		name string
+		cfg  func(*Config)
+		err  string
+	}{
+		{"ReadCommitted Version",
+			func(cfg *Config) {
+				cfg.Version = V0_10_0_0
+				cfg.Consumer.IsolationLevel = ReadCommitted
+			},
+			"ReadCommitted requires Version >= V0_11_0_0",
+		},
+		{"Incorrect isolation level",
+			func(cfg *Config) {
+				cfg.Version = V0_11_0_0
+				cfg.Consumer.IsolationLevel = IsolationLevel(42)
+			},
+			"Consumer.IsolationLevel must be ReadUncommitted or ReadCommitted",
+		},
+	}
+
+	for i, test := range tests {
+		c := NewConfig()
+		test.cfg(c)
+		if err := c.Validate(); string(err.(ConfigurationError)) != test.err {
+			t.Errorf("[%d]:[%s] Expected %s, Got %s\n", i, test.name, test.err, err)
+		}
+	}
+}
 
 func TestLZ4ConfigValidation(t *testing.T) {
 	config := NewConfig()

+ 50 - 4
consumer.go

@@ -47,7 +47,6 @@ func (ce ConsumerErrors) Error() string {
 // on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
 // scope.
 type Consumer interface {
-
 	// Topics returns the set of available topics as retrieved from the cluster
 	// metadata. This method is the same as Client.Topics(), and is provided for
 	// convenience.
@@ -260,7 +259,6 @@ func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
 // Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
 // also drain the Messages channel, harvest all errors & return them once cleanup has completed.
 type PartitionConsumer interface {
-
 	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
 	// should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
 	// function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
@@ -606,6 +604,12 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 	child.fetchSize = child.conf.Consumer.Fetch.Default
 	atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
 
+	// abortedProducerIDs contains producerID which message should be ignored as uncommitted
+	// - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
+	// - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
+	abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions))
+	abortedTransactions := block.getAbortedTransactions()
+
 	messages := []*ConsumerMessage{}
 	for _, records := range block.RecordsSet {
 		switch records.recordsType {
@@ -617,14 +621,56 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu
 
 			messages = append(messages, messageSetMessages...)
 		case defaultRecords:
+			// Consume remaining abortedTransaction up to last offset of current batch
+			for _, txn := range abortedTransactions {
+				if txn.FirstOffset > records.RecordBatch.LastOffset() {
+					break
+				}
+				abortedProducerIDs[txn.ProducerID] = struct{}{}
+				// Pop abortedTransactions so that we never add it again
+				abortedTransactions = abortedTransactions[1:]
+			}
+
 			recordBatchMessages, err := child.parseRecords(records.RecordBatch)
 			if err != nil {
 				return nil, err
 			}
-			if control, err := records.isControl(); err != nil || control {
+
+			// Parse and commit offset but do not expose messages that are:
+			// - control records
+			// - part of an aborted transaction when set to `ReadCommitted`
+
+			// control record
+			isControl, err := records.isControl()
+			if err != nil {
+				// I don't know why there is this continue in case of error to begin with
+				// Safe bet is to ignore control messages if ReadUncommitted
+				// and block on them in case of error and ReadCommitted
+				if child.conf.Consumer.IsolationLevel == ReadCommitted {
+					return nil, err
+				}
+				continue
+			}
+			if isControl {
+				controlRecord, err := records.getControlRecord()
+				if err != nil {
+					return nil, err
+				}
+
+				if controlRecord.Type == ControlRecordAbort {
+					delete(abortedProducerIDs, records.RecordBatch.ProducerID)
+				}
 				continue
 			}
 
+			// filter aborted transactions
+			if child.conf.Consumer.IsolationLevel == ReadCommitted {
+				_, isAborted := abortedProducerIDs[records.RecordBatch.ProducerID]
+				if records.RecordBatch.IsTransactional && isAborted {
+					continue
+				}
+			}
+
 			messages = append(messages, recordBatchMessages...)
 		default:
 			return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
@@ -826,7 +872,7 @@ func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
 	}
 	if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
 		request.Version = 4
-		request.Isolation = ReadUncommitted // We don't support yet transactions.
+		request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
 	}
 
 	for child := range bc.subscriptions {

+ 63 - 0
consumer_test.go

@@ -1128,6 +1128,69 @@ func TestConsumerTimestamps(t *testing.T) {
 	}
 }
 
+// When set to ReadCommitted, no uncommitted message should be available in messages channel
+func TestExcludeUncommitted(t *testing.T) {
+	// Given
+	broker0 := NewMockBroker(t, 0)
+
+	fetchResponse := &FetchResponse{
+		Version: 4,
+		Blocks: map[string]map[int32]*FetchResponseBlock{"my_topic": {0: {
+			AbortedTransactions: []*AbortedTransaction{{ProducerID: 7, FirstOffset: 1235}},
+		}}},
+	}
+	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1234, 7, true)   // committed msg
+	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1235, 7, true)   // uncommitted msg
+	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1236, 7, true)   // uncommitted msg
+	fetchResponse.AddControlRecord("my_topic", 0, 1237, 7, ControlRecordAbort) // abort control record
+	fetchResponse.AddRecordBatch("my_topic", 0, nil, testMsg, 1238, 7, true)   // committed msg
+
+	broker0.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetBroker(broker0.Addr(), broker0.BrokerID()).
+			SetLeader("my_topic", 0, broker0.BrokerID()),
+		"OffsetRequest": NewMockOffsetResponse(t).
+			SetVersion(1).
+			SetOffset("my_topic", 0, OffsetOldest, 0).
+			SetOffset("my_topic", 0, OffsetNewest, 1237),
+		"FetchRequest": NewMockWrapper(fetchResponse),
+	})
+
+	cfg := NewConfig()
+	cfg.Consumer.Return.Errors = true
+	cfg.Version = V0_11_0_0
+	cfg.Consumer.IsolationLevel = ReadCommitted
+
+	// When
+	master, err := NewConsumer([]string{broker0.Addr()}, cfg)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	consumer, err := master.ConsumePartition("my_topic", 0, 1234)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// Then: only the 2 committed messages are returned
+	select {
+	case message := <-consumer.Messages():
+		assertMessageOffset(t, message, int64(1234))
+	case err := <-consumer.Errors():
+		t.Error(err)
+	}
+	select {
+	case message := <-consumer.Messages():
+		assertMessageOffset(t, message, int64(1238))
+	case err := <-consumer.Errors():
+		t.Error(err)
+	}
+
+	safeClose(t, consumer)
+	safeClose(t, master)
+	broker0.Close()
+}
+
 func assertMessageOffset(t *testing.T, msg *ConsumerMessage, expectedOffset int64) {
 	if msg.Offset != expectedOffset {
 		t.Errorf("Incorrect message offset: expected=%d, actual=%d", expectedOffset, msg.Offset)

+ 70 - 0
control_record.go

@@ -0,0 +1,70 @@
+package sarama
+
+type ControlRecordType int
+
+const (
+	ControlRecordAbort ControlRecordType = iota
+	ControlRecordCommit
+	ControlRecordUnknown
+)
+
+// Control records are returned as a record by fetchRequest
+// However unlike "normal" records, they mean nothing application wise.
+// They only serve internal logic for supporting transactions.
+type ControlRecord struct {
+	Version          int16
+	CoordinatorEpoch int32
+	Type             ControlRecordType
+}
+
+func (cr *ControlRecord) decode(key, value packetDecoder) error {
+	{
+		var err error
+		cr.Version, err = value.getInt16()
+		if err != nil {
+			return err
+		}
+		cr.CoordinatorEpoch, err = value.getInt32()
+		if err != nil {
+			return err
+		}
+	}
+	{
+		var err error
+		// There a version for the value part AND the key part. And I have no idea if they are supposed to match or not
+		// Either way, all these version can only be 0 for now
+		cr.Version, err = key.getInt16()
+		if err != nil {
+			return err
+		}
+
+		recordType, err := key.getInt16()
+		if err != nil {
+			return err
+		}
+		switch recordType {
+		case 0:
+			cr.Type = ControlRecordAbort
+		case 1:
+			cr.Type = ControlRecordCommit
+		default:
+			// from JAVA implementation:
+			// UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
+			cr.Type = ControlRecordUnknown
+		}
+	}
+	return nil
+}
+
+func (cr *ControlRecord) encode(key, value packetEncoder) {
+	value.putInt16(cr.Version)
+	value.putInt32(cr.CoordinatorEpoch)
+
+	key.putInt16(cr.Version)
+	switch cr.Type {
+	case ControlRecordAbort:
+		key.putInt16(0)
+	case ControlRecordCommit:
+		key.putInt16(1)
+	}
+}

+ 1 - 0
control_record_test.go

@@ -0,0 +1 @@
+package sarama

+ 80 - 0
fetch_response.go

@@ -1,6 +1,7 @@
 package sarama
 
 import (
+	"sort"
 	"time"
 )
 
@@ -185,6 +186,17 @@ func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error)
 	return pe.pop()
 }
 
+func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
+	// I can't find any doc that guarantee the field `fetchResponse.AbortedTransactions` is ordered
+	// plus Java implementation use a PriorityQueue based on `FirstOffset`. I guess we have to order it ourself
+	at := b.AbortedTransactions
+	sort.Slice(
+		at,
+		func(i, j int) bool { return at[i].FirstOffset < at[j].FirstOffset },
+	)
+	return at
+}
+
 type FetchResponse struct {
 	Blocks        map[string]map[int32]*FetchResponseBlock
 	ThrottleTime  time.Duration
@@ -385,6 +397,65 @@ func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, ke
 	batch.addRecord(rec)
 }
 
+// AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
+// But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse
+// Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
+func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) {
+	frb := r.getOrCreateBlock(topic, partition)
+	kb, vb := encodeKV(key, value)
+
+	records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
+	batch := &RecordBatch{
+		Version:         2,
+		LogAppendTime:   r.LogAppendTime,
+		FirstTimestamp:  timestamp,
+		MaxTimestamp:    r.Timestamp,
+		FirstOffset:     offset,
+		LastOffsetDelta: 0,
+		ProducerID:      producerID,
+		IsTransactional: isTransactional,
+	}
+	rec := &Record{Key: kb, Value: vb, OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
+	batch.addRecord(rec)
+	records.RecordBatch = batch
+
+	frb.RecordsSet = append(frb.RecordsSet, &records)
+}
+
+func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time) {
+	frb := r.getOrCreateBlock(topic, partition)
+
+	// batch
+	batch := &RecordBatch{
+		Version:         2,
+		LogAppendTime:   r.LogAppendTime,
+		FirstTimestamp:  timestamp,
+		MaxTimestamp:    r.Timestamp,
+		FirstOffset:     offset,
+		LastOffsetDelta: 0,
+		ProducerID:      producerID,
+		IsTransactional: true,
+		Control:         true,
+	}
+
+	// records
+	records := newDefaultRecords(nil)
+	records.RecordBatch = batch
+
+	// record
+	crAbort := ControlRecord{
+		Version: 0,
+		Type:    recordType,
+	}
+	crKey := &realEncoder{raw: make([]byte, 4)}
+	crValue := &realEncoder{raw: make([]byte, 6)}
+	crAbort.encode(crKey, crValue)
+	rec := &Record{Key: ByteEncoder(crKey.raw), Value: ByteEncoder(crValue.raw), OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
+	batch.addRecord(rec)
+
+	frb.RecordsSet = append(frb.RecordsSet, &records)
+}
+
 func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
 	r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
 }
@@ -393,6 +464,15 @@ func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Enco
 	r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
 }
 
+func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool) {
+	r.AddRecordBatchWithTimestamp(topic, partition, key, value, offset, producerID, isTransactional, time.Time{})
+}
+
+func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType) {
+	// define controlRecord key and value
+	r.AddControlRecordWithTimestamp(topic, partition, offset, producerID, recordType, time.Time{})
+}
+
 func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
 	frb := r.getOrCreateBlock(topic, partition)
 	if len(frb.RecordsSet) == 0 {

+ 26 - 0
functional_consumer_test.go

@@ -8,6 +8,8 @@ import (
 	"sync"
 	"testing"
 	"time"
+
+	"github.com/stretchr/testify/require"
 )
 
 func TestFuncConsumerOffsetOutOfRange(t *testing.T) {
@@ -117,6 +119,30 @@ func TestVersionMatrixIdempotent(t *testing.T) {
 	consumeMsgs(t, testVersions, producedMessages)
 }
 
+func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
+	checkKafkaVersion(t, "0.11.0")
+	setupFunctionalTest(t)
+	defer teardownFunctionalTest(t)
+
+	config := NewConfig()
+	config.Consumer.IsolationLevel = ReadCommitted
+	config.Version = V0_11_0_0
+
+	consumer, err := NewConsumer(kafkaBrokers, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	pc, err := consumer.ConsumePartition("uncommitted-topic-test-4", 0, OffsetOldest)
+	require.NoError(t, err)
+
+	msgChannel := pc.Messages()
+	for i := 1; i <= 6; i++ {
+		msg := <-msgChannel
+		require.Equal(t, fmt.Sprintf("Committed %v", i), string(msg.Value))
+	}
+}
+
 func prodMsg2Str(prodMsg *ProducerMessage) string {
 	return fmt.Sprintf("{offset: %d, value: %s}", prodMsg.Offset, string(prodMsg.Value.(StringEncoder)))
 }

+ 2 - 0
go.mod

@@ -8,8 +8,10 @@ require (
 	github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21
 	github.com/eapache/queue v1.1.0
 	github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db // indirect
+	github.com/kisielk/errcheck v1.2.0 // indirect
 	github.com/pierrec/lz4 v2.0.5+incompatible
 	github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
+	github.com/stretchr/testify v1.3.0
 	github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
 	github.com/xdg/stringprep v1.0.0 // indirect
 	golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25 // indirect

+ 10 - 0
go.sum

@@ -2,6 +2,7 @@ github.com/DataDog/zstd v1.3.5 h1:DtpNbljikUepEPD16hD4LvIcmhnhdLTiW/5pHgbmp14=
 github.com/DataDog/zstd v1.3.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
 github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
 github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/eapache/go-resiliency v1.1.0 h1:1NtRmCAqadE2FN4ZcN6g90TP3uk8cg9rn9eNK2197aU=
@@ -12,10 +13,17 @@ github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
 github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db h1:woRePGFeVFfLKN/pOkfl+p/TAqKOfFu+7KPlMVpok/w=
 github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
+github.com/kisielk/errcheck v1.2.0 h1:reN85Pxc5larApoH1keMBiu2GWtPqXQ1nc9gx+jOU+E=
+github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
 github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
 github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
 github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
@@ -25,3 +33,5 @@ golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACk
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563 h1:NIou6eNFigscvKJmsbyez16S2cIS6idossORlFtSt2E=
+golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

+ 1 - 0
record.go

@@ -6,6 +6,7 @@ import (
 )
 
 const (
+	isTransactionalMask   = 0x10
 	controlMask           = 0x20
 	maximumRecordOverhead = 5*binary.MaxVarintLen32 + binary.MaxVarintLen64 + 1
 )

+ 9 - 0
record_batch.go

@@ -45,11 +45,16 @@ type RecordBatch struct {
 	FirstSequence         int32
 	Records               []*Record
 	PartialTrailingRecord bool
+	IsTransactional       bool
 
 	compressedRecords []byte
 	recordsLen        int // uncompressed records size
 }
 
+func (b *RecordBatch) LastOffset() int64 {
+	return b.FirstOffset + int64(b.LastOffsetDelta)
+}
+
 func (b *RecordBatch) encode(pe packetEncoder) error {
 	if b.Version != 2 {
 		return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
@@ -122,6 +127,7 @@ func (b *RecordBatch) decode(pd packetDecoder) (err error) {
 	b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
 	b.Control = attributes&controlMask == controlMask
 	b.LogAppendTime = attributes&timestampTypeMask == timestampTypeMask
+	b.IsTransactional = attributes&isTransactionalMask == isTransactionalMask
 
 	if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
 		return err
@@ -205,6 +211,9 @@ func (b *RecordBatch) computeAttributes() int16 {
 	if b.LogAppendTime {
 		attr |= timestampTypeMask
 	}
+	if b.IsTransactional {
+		attr |= isTransactionalMask
+	}
 	return attr
 }
 

+ 15 - 0
records.go

@@ -192,3 +192,18 @@ func magicValue(pd packetDecoder) (int8, error) {
 
 	return dec.getInt8()
 }
+
+func (r *Records) getControlRecord() (ControlRecord, error) {
+	if r.RecordBatch == nil || len(r.RecordBatch.Records) <= 0 {
+		return ControlRecord{}, fmt.Errorf("cannot get control record, record batch is empty")
+	}
+
+	firstRecord := r.RecordBatch.Records[0]
+	controlRecord := ControlRecord{}
+	err := controlRecord.decode(&realDecoder{raw: firstRecord.Key}, &realDecoder{raw: firstRecord.Value})
+	if err != nil {
+		return ControlRecord{}, err
+	}
+
+	return controlRecord, nil
+}

+ 2 - 1
records_test.go

@@ -76,7 +76,8 @@ func TestLegacyRecords(t *testing.T) {
 
 func TestDefaultRecords(t *testing.T) {
 	batch := &RecordBatch{
-		Version: 2,
+		IsTransactional: true,
+		Version:         2,
 		Records: []*Record{
 			{
 				Value: []byte{1},

+ 1 - 0
vagrant/create_topics.sh

@@ -6,3 +6,4 @@ cd ${KAFKA_INSTALL_ROOT}/kafka-9092
 bin/kafka-topics.sh --create --partitions 1 --replication-factor 3 --topic test.1 --zookeeper localhost:2181
 bin/kafka-topics.sh --create --partitions 4 --replication-factor 3 --topic test.4 --zookeeper localhost:2181
 bin/kafka-topics.sh --create --partitions 64 --replication-factor 3 --topic test.64  --zookeeper localhost:2181
+bin/kafka-topics.sh --create --partitions 1 --replication-factor 3 --topic uncommitted-topic-test-4  --zookeeper localhost:2181

+ 1 - 1
vagrant/install_cluster.sh

@@ -6,7 +6,7 @@ TOXIPROXY_VERSION=2.1.4
 
 mkdir -p ${KAFKA_INSTALL_ROOT}
 if [ ! -f ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz ]; then
-    wget --quiet https://www-us.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz -O ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz
+    wget --quiet https://archive.apache.org/dist/kafka/${KAFKA_VERSION}/kafka_${KAFKA_SCALA_VERSION}-${KAFKA_VERSION}.tgz -O ${KAFKA_INSTALL_ROOT}/kafka-${KAFKA_VERSION}.tgz
 fi
 if [ ! -f ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION} ]; then
     wget --quiet https://github.com/Shopify/toxiproxy/releases/download/v${TOXIPROXY_VERSION}/toxiproxy-server-linux-amd64 -O ${KAFKA_INSTALL_ROOT}/toxiproxy-${TOXIPROXY_VERSION}

+ 1 - 0
vagrant/provision.sh

@@ -14,3 +14,4 @@ export REPOSITORY_ROOT=/vagrant
 sh /vagrant/vagrant/install_cluster.sh
 sh /vagrant/vagrant/setup_services.sh
 sh /vagrant/vagrant/create_topics.sh
+sh /vagrant/vagrant/run_java_producer.sh

+ 6 - 0
vagrant/run_java_producer.sh

@@ -0,0 +1,6 @@
+#!/bin/sh
+
+set -ex
+
+wget https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar
+java -jar simplest-uncommitted-msg-0.1-jar-with-dependencies.jar -b ${KAFKA_HOSTNAME}:9092 -c 4