Jelajahi Sumber

add transaction-related request/responses (22, 24, 25, 26, 28)

Robin 6 tahun lalu
induk
melakukan
d81319704e

+ 52 - 0
add_offsets_to_txn_request.go

@@ -0,0 +1,52 @@
+package sarama
+
+type AddOffsetsToTxnRequest struct {
+	TransactionalID string
+	ProducerID      int64
+	ProducerEpoch   int16
+	GroupID         string
+}
+
+func (a *AddOffsetsToTxnRequest) encode(pe packetEncoder) error {
+	if err := pe.putString(a.TransactionalID); err != nil {
+		return err
+	}
+
+	pe.putInt64(a.ProducerID)
+
+	pe.putInt16(a.ProducerEpoch)
+
+	if err := pe.putString(a.GroupID); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (a *AddOffsetsToTxnRequest) decode(pd packetDecoder, version int16) (err error) {
+	if a.TransactionalID, err = pd.getString(); err != nil {
+		return err
+	}
+	if a.ProducerID, err = pd.getInt64(); err != nil {
+		return err
+	}
+	if a.ProducerEpoch, err = pd.getInt16(); err != nil {
+		return err
+	}
+	if a.GroupID, err = pd.getString(); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (a *AddOffsetsToTxnRequest) key() int16 {
+	return 25
+}
+
+func (a *AddOffsetsToTxnRequest) version() int16 {
+	return 0
+}
+
+func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}

+ 23 - 0
add_offsets_to_txn_request_test.go

@@ -0,0 +1,23 @@
+package sarama
+
+import "testing"
+
+var (
+	addOffsetsToTxnRequest = []byte{
+		0, 3, 't', 'x', 'n',
+		0, 0, 0, 0, 0, 0, 31, 64,
+		0, 0,
+		0, 7, 'g', 'r', 'o', 'u', 'p', 'i', 'd',
+	}
+)
+
+func TestAddOffsetsToTxnRequest(t *testing.T) {
+	req := &AddOffsetsToTxnRequest{
+		TransactionalID: "txn",
+		ProducerID:      8000,
+		ProducerEpoch:   0,
+		GroupID:         "groupid",
+	}
+
+	testRequest(t, "", req, addOffsetsToTxnRequest)
+}

+ 44 - 0
add_offsets_to_txn_response.go

@@ -0,0 +1,44 @@
+package sarama
+
+import (
+	"time"
+)
+
+type AddOffsetsToTxnResponse struct {
+	ThrottleTime time.Duration
+	Err          KError
+}
+
+func (a *AddOffsetsToTxnResponse) encode(pe packetEncoder) error {
+	pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
+	pe.putInt16(int16(a.Err))
+	return nil
+}
+
+func (a *AddOffsetsToTxnResponse) decode(pd packetDecoder, version int16) (err error) {
+	throttleTime, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+
+	kerr, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+	a.Err = KError(kerr)
+
+	return nil
+}
+
+func (a *AddOffsetsToTxnResponse) key() int16 {
+	return 25
+}
+
+func (a *AddOffsetsToTxnResponse) version() int16 {
+	return 0
+}
+
+func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}

+ 22 - 0
add_offsets_to_txn_response_test.go

@@ -0,0 +1,22 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+var (
+	addOffsetsToTxnResponse = []byte{
+		0, 0, 0, 100,
+		0, 47,
+	}
+)
+
+func TestAddOffsetsToTxnResponse(t *testing.T) {
+	resp := &AddOffsetsToTxnResponse{
+		ThrottleTime: 100 * time.Millisecond,
+		Err:          ErrInvalidProducerEpoch,
+	}
+
+	testResponse(t, "", resp, addOffsetsToTxnResponse)
+}

+ 76 - 0
add_partitions_to_txn_request.go

@@ -0,0 +1,76 @@
+package sarama
+
+type AddPartitionsToTxnRequest struct {
+	TransactionalID string
+	ProducerID      int64
+	ProducerEpoch   int16
+	TopicPartitions map[string][]int32
+}
+
+func (a *AddPartitionsToTxnRequest) encode(pe packetEncoder) error {
+	if err := pe.putString(a.TransactionalID); err != nil {
+		return err
+	}
+	pe.putInt64(a.ProducerID)
+	pe.putInt16(a.ProducerEpoch)
+
+	if err := pe.putArrayLength(len(a.TopicPartitions)); err != nil {
+		return err
+	}
+	for topic, partitions := range a.TopicPartitions {
+		if err := pe.putString(topic); err != nil {
+			return err
+		}
+		if err := pe.putInt32Array(partitions); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
+func (a *AddPartitionsToTxnRequest) decode(pd packetDecoder, version int16) (err error) {
+	if a.TransactionalID, err = pd.getString(); err != nil {
+		return err
+	}
+	if a.ProducerID, err = pd.getInt64(); err != nil {
+		return err
+	}
+	if a.ProducerEpoch, err = pd.getInt16(); err != nil {
+		return err
+	}
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	a.TopicPartitions = make(map[string][]int32)
+	for i := 0; i < n; i++ {
+		topic, err := pd.getString()
+		if err != nil {
+			return err
+		}
+
+		partitions, err := pd.getInt32Array()
+		if err != nil {
+			return err
+		}
+
+		a.TopicPartitions[topic] = partitions
+	}
+
+	return nil
+}
+
+func (a *AddPartitionsToTxnRequest) key() int16 {
+	return 24
+}
+
+func (a *AddPartitionsToTxnRequest) version() int16 {
+	return 0
+}
+
+func (a *AddPartitionsToTxnRequest) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}

+ 27 - 0
add_partitions_to_txn_request_test.go

@@ -0,0 +1,27 @@
+package sarama
+
+import "testing"
+
+var (
+	addPartitionsToTxnRequest = []byte{
+		0, 3, 't', 'x', 'n',
+		0, 0, 0, 0, 0, 0, 31, 64, // ProducerID
+		0, 0, 0, 0, // ProducerEpoch
+		0, 1, // 1 topic
+		0, 5, 't', 'o', 'p', 'i', 'c',
+		0, 0, 0, 1, 0, 0, 0, 1,
+	}
+)
+
+func TestAddPartitionsToTxnRequest(t *testing.T) {
+	req := &AddPartitionsToTxnRequest{
+		TransactionalID: "txn",
+		ProducerID:      8000,
+		ProducerEpoch:   0,
+		TopicPartitions: map[string][]int32{
+			"topic": []int32{1},
+		},
+	}
+
+	testRequest(t, "", req, addPartitionsToTxnRequest)
+}

+ 108 - 0
add_partitions_to_txn_response.go

@@ -0,0 +1,108 @@
+package sarama
+
+import (
+	"time"
+)
+
+type AddPartitionsToTxnResponse struct {
+	ThrottleTime time.Duration
+	Errors       map[string][]*PartitionError
+}
+
+func (a *AddPartitionsToTxnResponse) encode(pe packetEncoder) error {
+	pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
+	if err := pe.putArrayLength(len(a.Errors)); err != nil {
+		return err
+	}
+
+	for topic, e := range a.Errors {
+		if err := pe.putString(topic); err != nil {
+			return err
+		}
+		if err := pe.putArrayLength(len(e)); err != nil {
+			return err
+		}
+		for _, partitionError := range e {
+			if err := partitionError.encode(pe); err != nil {
+				return err
+			}
+		}
+	}
+
+	return nil
+}
+
+func (a *AddPartitionsToTxnResponse) decode(pd packetDecoder, version int16) (err error) {
+	throttleTime, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	a.Errors = make(map[string][]*PartitionError)
+
+	for i := 0; i < n; i++ {
+		topic, err := pd.getString()
+		if err != nil {
+			return err
+		}
+
+		m, err := pd.getArrayLength()
+		if err != nil {
+			return err
+		}
+
+		a.Errors[topic] = make([]*PartitionError, m)
+
+		for j := 0; j < m; j++ {
+			a.Errors[topic][j] = new(PartitionError)
+			if err := a.Errors[topic][j].decode(pd, version); err != nil {
+				return err
+			}
+		}
+	}
+
+	return nil
+}
+
+func (a *AddPartitionsToTxnResponse) key() int16 {
+	return 24
+}
+
+func (a *AddPartitionsToTxnResponse) version() int16 {
+	return 0
+}
+
+func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}
+
+type PartitionError struct {
+	Partition int32
+	Err       KError
+}
+
+func (p *PartitionError) encode(pe packetEncoder) error {
+	pe.putInt32(p.Partition)
+	pe.putInt16(int16(p.Err))
+	return nil
+}
+
+func (p *PartitionError) decode(pd packetDecoder, version int16) (err error) {
+	if p.Partition, err = pd.getInt32(); err != nil {
+		return err
+	}
+
+	kerr, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+	p.Err = KError(kerr)
+
+	return nil
+}

+ 31 - 0
add_partitions_to_txn_response_test.go

@@ -0,0 +1,31 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+var (
+	addPartitionsToTxnResponse = []byte{
+		0, 0, 0, 100,
+		0, 0, 0, 1,
+		0, 5, 't', 'o', 'p', 'i', 'c',
+		0, 0, 0, 1, // 1 partition error
+		0, 0, 0, 2, // partition 2
+		0, 48, // error
+	}
+)
+
+func TestAddPartitionsToTxnResponse(t *testing.T) {
+	resp := &AddPartitionsToTxnResponse{
+		ThrottleTime: 100 * time.Millisecond,
+		Errors: map[string][]*PartitionError{
+			"topic": []*PartitionError{&PartitionError{
+				Err:       ErrInvalidTxnState,
+				Partition: 2,
+			}},
+		},
+	}
+
+	testResponse(t, "", resp, addPartitionsToTxnResponse)
+}

+ 55 - 0
broker.go

@@ -428,6 +428,61 @@ func (b *Broker) DeleteAcls(request *DeleteAclsRequest) (*DeleteAclsResponse, er
 	return response, nil
 }
 
+func (b *Broker) InitProducerID(request *InitProducerIDRequest) (*InitProducerIDResponse, error) {
+	response := new(InitProducerIDResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+func (b *Broker) AddPartitionsToTxn(request *AddPartitionsToTxnRequest) (*AddPartitionsToTxnResponse, error) {
+	response := new(AddPartitionsToTxnResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+func (b *Broker) AddOffsetsToTxn(request *AddOffsetsToTxnRequest) (*AddOffsetsToTxnResponse, error) {
+	response := new(AddOffsetsToTxnResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+func (b *Broker) EndTxn(request *EndTxnRequest) (*EndTxnResponse, error) {
+	response := new(EndTxnResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+func (b *Broker) TxnOffsetCommit(request *TxnOffsetCommitRequest) (*TxnOffsetCommitResponse, error) {
+	response := new(TxnOffsetCommitResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()

+ 50 - 0
end_txn_request.go

@@ -0,0 +1,50 @@
+package sarama
+
+type EndTxnRequest struct {
+	TransactionalID   string
+	ProducerID        int64
+	ProducerEpoch     int16
+	TransactionResult bool
+}
+
+func (a *EndTxnRequest) encode(pe packetEncoder) error {
+	if err := pe.putString(a.TransactionalID); err != nil {
+		return err
+	}
+
+	pe.putInt64(a.ProducerID)
+
+	pe.putInt16(a.ProducerEpoch)
+
+	pe.putBool(a.TransactionResult)
+
+	return nil
+}
+
+func (a *EndTxnRequest) decode(pd packetDecoder, version int16) (err error) {
+	if a.TransactionalID, err = pd.getString(); err != nil {
+		return err
+	}
+	if a.ProducerID, err = pd.getInt64(); err != nil {
+		return err
+	}
+	if a.ProducerEpoch, err = pd.getInt16(); err != nil {
+		return err
+	}
+	if a.TransactionResult, err = pd.getBool(); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (a *EndTxnRequest) key() int16 {
+	return 26
+}
+
+func (a *EndTxnRequest) version() int16 {
+	return 0
+}
+
+func (a *EndTxnRequest) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}

+ 23 - 0
end_txn_request_test.go

@@ -0,0 +1,23 @@
+package sarama
+
+import "testing"
+
+var (
+	endTxnRequest = []byte{
+		0, 3, 't', 'x', 'n',
+		0, 0, 0, 0, 0, 0, 31, 64,
+		0, 1,
+		1,
+	}
+)
+
+func TestEndTxnRequest(t *testing.T) {
+	req := &EndTxnRequest{
+		TransactionalID:   "txn",
+		ProducerID:        8000,
+		ProducerEpoch:     1,
+		TransactionResult: true,
+	}
+
+	testRequest(t, "", req, endTxnRequest)
+}

+ 44 - 0
end_txn_response.go

@@ -0,0 +1,44 @@
+package sarama
+
+import (
+	"time"
+)
+
+type EndTxnResponse struct {
+	ThrottleTime time.Duration
+	Err          KError
+}
+
+func (e *EndTxnResponse) encode(pe packetEncoder) error {
+	pe.putInt32(int32(e.ThrottleTime / time.Millisecond))
+	pe.putInt16(int16(e.Err))
+	return nil
+}
+
+func (e *EndTxnResponse) decode(pd packetDecoder, version int16) (err error) {
+	throttleTime, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	e.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+
+	kerr, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+	e.Err = KError(kerr)
+
+	return nil
+}
+
+func (e *EndTxnResponse) key() int16 {
+	return 25
+}
+
+func (e *EndTxnResponse) version() int16 {
+	return 0
+}
+
+func (e *EndTxnResponse) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}

+ 22 - 0
end_txn_response_test.go

@@ -0,0 +1,22 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+var (
+	endTxnResponse = []byte{
+		0, 0, 0, 100,
+		0, 49,
+	}
+)
+
+func TestEndTxnResponse(t *testing.T) {
+	resp := &EndTxnResponse{
+		ThrottleTime: 100 * time.Millisecond,
+		Err:          ErrInvalidProducerIDMapping,
+	}
+
+	testResponse(t, "", resp, endTxnResponse)
+}

+ 43 - 0
init_producer_id_request.go

@@ -0,0 +1,43 @@
+package sarama
+
+import "time"
+
+type InitProducerIDRequest struct {
+	TransactionalID    *string
+	TransactionTimeout time.Duration
+}
+
+func (i *InitProducerIDRequest) encode(pe packetEncoder) error {
+	if err := pe.putNullableString(i.TransactionalID); err != nil {
+		return err
+	}
+	pe.putInt32(int32(i.TransactionTimeout / time.Millisecond))
+
+	return nil
+}
+
+func (i *InitProducerIDRequest) decode(pd packetDecoder, version int16) (err error) {
+	if i.TransactionalID, err = pd.getNullableString(); err != nil {
+		return err
+	}
+
+	timeout, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	i.TransactionTimeout = time.Duration(timeout) * time.Millisecond
+
+	return nil
+}
+
+func (i *InitProducerIDRequest) key() int16 {
+	return 22
+}
+
+func (i *InitProducerIDRequest) version() int16 {
+	return 0
+}
+
+func (i *InitProducerIDRequest) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}

+ 31 - 0
init_producer_id_request_test.go

@@ -0,0 +1,31 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+var (
+	initProducerIDRequestNull = []byte{
+		255, 255,
+		0, 0, 0, 100,
+	}
+
+	initProducerIDRequest = []byte{
+		0, 3, 't', 'x', 'n',
+		0, 0, 0, 100,
+	}
+)
+
+func TestInitProducerIDRequest(t *testing.T) {
+	req := &InitProducerIDRequest{
+		TransactionTimeout: 100 * time.Millisecond,
+	}
+
+	testRequest(t, "null transaction id", req, initProducerIDRequestNull)
+
+	transactionID := "txn"
+	req.TransactionalID = &transactionID
+
+	testRequest(t, "transaction id", req, initProducerIDRequest)
+}

+ 55 - 0
init_producer_id_response.go

@@ -0,0 +1,55 @@
+package sarama
+
+import "time"
+
+type InitProducerIDResponse struct {
+	ThrottleTime  time.Duration
+	Err           KError
+	ProducerID    int64
+	ProducerEpoch int16
+}
+
+func (i *InitProducerIDResponse) encode(pe packetEncoder) error {
+	pe.putInt32(int32(i.ThrottleTime / time.Millisecond))
+	pe.putInt16(int16(i.Err))
+	pe.putInt64(i.ProducerID)
+	pe.putInt16(i.ProducerEpoch)
+
+	return nil
+}
+
+func (i *InitProducerIDResponse) decode(pd packetDecoder, version int16) (err error) {
+	throttleTime, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	i.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+
+	kerr, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+	i.Err = KError(kerr)
+
+	if i.ProducerID, err = pd.getInt64(); err != nil {
+		return err
+	}
+
+	if i.ProducerEpoch, err = pd.getInt16(); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (i *InitProducerIDResponse) key() int16 {
+	return 22
+}
+
+func (i *InitProducerIDResponse) version() int16 {
+	return 0
+}
+
+func (i *InitProducerIDResponse) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}

+ 37 - 0
init_producer_id_response_test.go

@@ -0,0 +1,37 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+var (
+	initProducerIDResponse = []byte{
+		0, 0, 0, 100,
+		0, 0,
+		0, 0, 0, 0, 0, 0, 31, 64, // producerID = 8000
+		0, 0, // epoch
+	}
+
+	initProducerIDRequestError = []byte{
+		0, 0, 0, 100,
+		0, 51,
+		255, 255, 255, 255, 255, 255, 255, 255,
+		0, 0,
+	}
+)
+
+func TestInitProducerIDResponse(t *testing.T) {
+	resp := &InitProducerIDResponse{
+		ThrottleTime:  100 * time.Millisecond,
+		ProducerID:    8000,
+		ProducerEpoch: 0,
+	}
+
+	testResponse(t, "", resp, initProducerIDResponse)
+
+	resp.Err = ErrConcurrentTransactions
+	resp.ProducerID = -1
+
+	testResponse(t, "with error", resp, initProducerIDRequestError)
+}

+ 10 - 0
request.go

@@ -118,6 +118,16 @@ func allocateBody(key, version int16) protocolBody {
 		return &CreateTopicsRequest{}
 	case 20:
 		return &DeleteTopicsRequest{}
+	case 22:
+		return &InitProducerIDRequest{}
+	case 24:
+		return &AddPartitionsToTxnRequest{}
+	case 25:
+		return &AddOffsetsToTxnRequest{}
+	case 26:
+		return &EndTxnRequest{}
+	case 28:
+		return &TxnOffsetCommitRequest{}
 	case 29:
 		return &DescribeAclsRequest{}
 	case 30:

+ 126 - 0
txn_offset_commit_request.go

@@ -0,0 +1,126 @@
+package sarama
+
+type TxnOffsetCommitRequest struct {
+	TransactionalID string
+	GroupID         string
+	ProducerID      int64
+	ProducerEpoch   int16
+	Topics          map[string][]*PartitionOffsetMetadata
+}
+
+func (t *TxnOffsetCommitRequest) encode(pe packetEncoder) error {
+	if err := pe.putString(t.TransactionalID); err != nil {
+		return err
+	}
+	if err := pe.putString(t.GroupID); err != nil {
+		return err
+	}
+	pe.putInt64(t.ProducerID)
+	pe.putInt16(t.ProducerEpoch)
+
+	if err := pe.putArrayLength(len(t.Topics)); err != nil {
+		return err
+	}
+	for topic, partitions := range t.Topics {
+		if err := pe.putString(topic); err != nil {
+			return err
+		}
+		if err := pe.putArrayLength(len(partitions)); err != nil {
+			return err
+		}
+		for _, partition := range partitions {
+			if err := partition.encode(pe); err != nil {
+				return err
+			}
+		}
+	}
+
+	return nil
+}
+
+func (t *TxnOffsetCommitRequest) decode(pd packetDecoder, version int16) (err error) {
+	if t.TransactionalID, err = pd.getString(); err != nil {
+		return err
+	}
+	if t.GroupID, err = pd.getString(); err != nil {
+		return err
+	}
+	if t.ProducerID, err = pd.getInt64(); err != nil {
+		return err
+	}
+	if t.ProducerEpoch, err = pd.getInt16(); err != nil {
+		return err
+	}
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	t.Topics = make(map[string][]*PartitionOffsetMetadata)
+	for i := 0; i < n; i++ {
+		topic, err := pd.getString()
+		if err != nil {
+			return err
+		}
+
+		m, err := pd.getArrayLength()
+		if err != nil {
+			return err
+		}
+
+		t.Topics[topic] = make([]*PartitionOffsetMetadata, m)
+
+		for j := 0; j < m; j++ {
+			partitionOffsetMetadata := new(PartitionOffsetMetadata)
+			if err := partitionOffsetMetadata.decode(pd, version); err != nil {
+				return err
+			}
+			t.Topics[topic][j] = partitionOffsetMetadata
+		}
+	}
+
+	return nil
+}
+
+func (a *TxnOffsetCommitRequest) key() int16 {
+	return 28
+}
+
+func (a *TxnOffsetCommitRequest) version() int16 {
+	return 0
+}
+
+func (a *TxnOffsetCommitRequest) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}
+
+type PartitionOffsetMetadata struct {
+	Partition int32
+	Offset    int64
+	Metadata  *string
+}
+
+func (p *PartitionOffsetMetadata) encode(pe packetEncoder) error {
+	pe.putInt32(p.Partition)
+	pe.putInt64(p.Offset)
+	if err := pe.putNullableString(p.Metadata); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (p *PartitionOffsetMetadata) decode(pd packetDecoder, version int16) (err error) {
+	if p.Partition, err = pd.getInt32(); err != nil {
+		return err
+	}
+	if p.Offset, err = pd.getInt64(); err != nil {
+		return err
+	}
+	if p.Metadata, err = pd.getNullableString(); err != nil {
+		return err
+	}
+
+	return nil
+}

+ 35 - 0
txn_offset_commit_request_test.go

@@ -0,0 +1,35 @@
+package sarama
+
+import "testing"
+
+var (
+	txnOffsetCommitRequest = []byte{
+		0, 3, 't', 'x', 'n',
+		0, 7, 'g', 'r', 'o', 'u', 'p', 'i', 'd',
+		0, 0, 0, 0, 0, 0, 31, 64, // producer ID
+		0, 1, // producer epoch
+		0, 0, 0, 1, // 1 topic
+		0, 5, 't', 'o', 'p', 'i', 'c',
+		0, 0, 0, 1, // 1 partition
+		0, 0, 0, 2, // partition no 2
+		0, 0, 0, 0, 0, 0, 0, 123,
+		255, 255, // no meta data
+	}
+)
+
+func TestTxnOffsetCommitRequest(t *testing.T) {
+	req := &TxnOffsetCommitRequest{
+		TransactionalID: "txn",
+		GroupID:         "groupid",
+		ProducerID:      8000,
+		ProducerEpoch:   1,
+		Topics: map[string][]*PartitionOffsetMetadata{
+			"topic": []*PartitionOffsetMetadata{{
+				Offset:    123,
+				Partition: 2,
+			}},
+		},
+	}
+
+	testRequest(t, "", req, txnOffsetCommitRequest)
+}

+ 83 - 0
txn_offset_commit_response.go

@@ -0,0 +1,83 @@
+package sarama
+
+import (
+	"time"
+)
+
+type TxnOffsetCommitResponse struct {
+	ThrottleTime time.Duration
+	Topics       map[string][]*PartitionError
+}
+
+func (t *TxnOffsetCommitResponse) encode(pe packetEncoder) error {
+	pe.putInt32(int32(t.ThrottleTime / time.Millisecond))
+	if err := pe.putArrayLength(len(t.Topics)); err != nil {
+		return err
+	}
+
+	for topic, e := range t.Topics {
+		if err := pe.putString(topic); err != nil {
+			return err
+		}
+		if err := pe.putArrayLength(len(e)); err != nil {
+			return err
+		}
+		for _, partitionError := range e {
+			if err := partitionError.encode(pe); err != nil {
+				return err
+			}
+		}
+	}
+
+	return nil
+}
+
+func (t *TxnOffsetCommitResponse) decode(pd packetDecoder, version int16) (err error) {
+	throttleTime, err := pd.getInt32()
+	if err != nil {
+		return err
+	}
+	t.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+
+	n, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
+
+	t.Topics = make(map[string][]*PartitionError)
+
+	for i := 0; i < n; i++ {
+		topic, err := pd.getString()
+		if err != nil {
+			return err
+		}
+
+		m, err := pd.getArrayLength()
+		if err != nil {
+			return err
+		}
+
+		t.Topics[topic] = make([]*PartitionError, m)
+
+		for j := 0; j < m; j++ {
+			t.Topics[topic][j] = new(PartitionError)
+			if err := t.Topics[topic][j].decode(pd, version); err != nil {
+				return err
+			}
+		}
+	}
+
+	return nil
+}
+
+func (a *TxnOffsetCommitResponse) key() int16 {
+	return 28
+}
+
+func (a *TxnOffsetCommitResponse) version() int16 {
+	return 0
+}
+
+func (a *TxnOffsetCommitResponse) requiredVersion() KafkaVersion {
+	return V0_11_0_0
+}

+ 31 - 0
txn_offset_commit_response_test.go

@@ -0,0 +1,31 @@
+package sarama
+
+import (
+	"testing"
+	"time"
+)
+
+var (
+	txnOffsetCommitResponse = []byte{
+		0, 0, 0, 100,
+		0, 0, 0, 1, // 1 topic
+		0, 5, 't', 'o', 'p', 'i', 'c',
+		0, 0, 0, 1, // 1 partition response
+		0, 0, 0, 2, // partition number 2
+		0, 47, // err
+	}
+)
+
+func TestTxnOffsetCommitResponse(t *testing.T) {
+	resp := &TxnOffsetCommitResponse{
+		ThrottleTime: 100 * time.Millisecond,
+		Topics: map[string][]*PartitionError{
+			"topic": []*PartitionError{{
+				Partition: 2,
+				Err:       ErrInvalidProducerEpoch,
+			}},
+		},
+	}
+
+	testResponse(t, "", resp, txnOffsetCommitResponse)
+}