Просмотр исходного кода

another pull request to please lint gods

Varun 5 лет назад
Родитель
Сommit
3328055d57

+ 3 - 0
acl_bindings.go

@@ -1,5 +1,6 @@
 package sarama
 
+//Resource holds information about acl resource type
 type Resource struct {
 	ResourceType       AclResourceType
 	ResourceName       string
@@ -45,6 +46,7 @@ func (r *Resource) decode(pd packetDecoder, version int16) (err error) {
 	return nil
 }
 
+//Acl holds information about acl type
 type Acl struct {
 	Principal      string
 	Host           string
@@ -91,6 +93,7 @@ func (a *Acl) decode(pd packetDecoder, version int16) (err error) {
 	return nil
 }
 
+//ResourceAcls is an acl resource type
 type ResourceAcls struct {
 	Resource
 	Acls []*Acl

+ 7 - 5
acl_create_request.go

@@ -1,5 +1,6 @@
 package sarama
 
+//CreateAclsRequest is an acl creation request
 type CreateAclsRequest struct {
 	Version      int16
 	AclCreations []*AclCreation
@@ -38,16 +39,16 @@ func (c *CreateAclsRequest) decode(pd packetDecoder, version int16) (err error)
 	return nil
 }
 
-func (d *CreateAclsRequest) key() int16 {
+func (c *CreateAclsRequest) key() int16 {
 	return 30
 }
 
-func (d *CreateAclsRequest) version() int16 {
-	return d.Version
+func (c *CreateAclsRequest) version() int16 {
+	return c.Version
 }
 
-func (d *CreateAclsRequest) requiredVersion() KafkaVersion {
-	switch d.Version {
+func (c *CreateAclsRequest) requiredVersion() KafkaVersion {
+	switch c.Version {
 	case 1:
 		return V2_0_0_0
 	default:
@@ -55,6 +56,7 @@ func (d *CreateAclsRequest) requiredVersion() KafkaVersion {
 	}
 }
 
+//AclCreation is a wrapper around Resource and Acl type
 type AclCreation struct {
 	Resource
 	Acl

+ 5 - 3
acl_create_response.go

@@ -2,6 +2,7 @@ package sarama
 
 import "time"
 
+//CreateAclsResponse is a an acl reponse creation type
 type CreateAclsResponse struct {
 	ThrottleTime         time.Duration
 	AclCreationResponses []*AclCreationResponse
@@ -46,18 +47,19 @@ func (c *CreateAclsResponse) decode(pd packetDecoder, version int16) (err error)
 	return nil
 }
 
-func (d *CreateAclsResponse) key() int16 {
+func (c *CreateAclsResponse) key() int16 {
 	return 30
 }
 
-func (d *CreateAclsResponse) version() int16 {
+func (c *CreateAclsResponse) version() int16 {
 	return 0
 }
 
-func (d *CreateAclsResponse) requiredVersion() KafkaVersion {
+func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }
 
+//AclCreationResponse is an acl creation response type
 type AclCreationResponse struct {
 	Err    KError
 	ErrMsg *string

+ 1 - 0
acl_delete_request.go

@@ -1,5 +1,6 @@
 package sarama
 
+//DeleteAclsRequest is a delete acl request
 type DeleteAclsRequest struct {
 	Version int
 	Filters []*AclFilter

+ 13 - 10
acl_delete_response.go

@@ -2,21 +2,22 @@ package sarama
 
 import "time"
 
+//DeleteAclsResponse is a delete acl response
 type DeleteAclsResponse struct {
 	Version         int16
 	ThrottleTime    time.Duration
 	FilterResponses []*FilterResponse
 }
 
-func (a *DeleteAclsResponse) encode(pe packetEncoder) error {
-	pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
+func (d *DeleteAclsResponse) encode(pe packetEncoder) error {
+	pe.putInt32(int32(d.ThrottleTime / time.Millisecond))
 
-	if err := pe.putArrayLength(len(a.FilterResponses)); err != nil {
+	if err := pe.putArrayLength(len(d.FilterResponses)); err != nil {
 		return err
 	}
 
-	for _, filterResponse := range a.FilterResponses {
-		if err := filterResponse.encode(pe, a.Version); err != nil {
+	for _, filterResponse := range d.FilterResponses {
+		if err := filterResponse.encode(pe, d.Version); err != nil {
 			return err
 		}
 	}
@@ -24,22 +25,22 @@ func (a *DeleteAclsResponse) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (a *DeleteAclsResponse) decode(pd packetDecoder, version int16) (err error) {
+func (d *DeleteAclsResponse) decode(pd packetDecoder, version int16) (err error) {
 	throttleTime, err := pd.getInt32()
 	if err != nil {
 		return err
 	}
-	a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+	d.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
 
 	n, err := pd.getArrayLength()
 	if err != nil {
 		return err
 	}
-	a.FilterResponses = make([]*FilterResponse, n)
+	d.FilterResponses = make([]*FilterResponse, n)
 
 	for i := 0; i < n; i++ {
-		a.FilterResponses[i] = new(FilterResponse)
-		if err := a.FilterResponses[i].decode(pd, version); err != nil {
+		d.FilterResponses[i] = new(FilterResponse)
+		if err := d.FilterResponses[i].decode(pd, version); err != nil {
 			return err
 		}
 	}
@@ -59,6 +60,7 @@ func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }
 
+//FilterResponse is a filter response type
 type FilterResponse struct {
 	Err          KError
 	ErrMsg       *string
@@ -109,6 +111,7 @@ func (f *FilterResponse) decode(pd packetDecoder, version int16) (err error) {
 	return nil
 }
 
+//MatchingAcl is a matching acl type
 type MatchingAcl struct {
 	Err    KError
 	ErrMsg *string

+ 1 - 0
acl_describe_request.go

@@ -1,5 +1,6 @@
 package sarama
 
+//DescribeAclsRequest is a secribe acl request type
 type DescribeAclsRequest struct {
 	Version int
 	AclFilter

+ 1 - 0
acl_describe_response.go

@@ -2,6 +2,7 @@ package sarama
 
 import "time"
 
+//DescribeAclsResponse is a describe acl response type
 type DescribeAclsResponse struct {
 	Version      int16
 	ThrottleTime time.Duration

+ 1 - 0
add_offsets_to_txn_request.go

@@ -1,5 +1,6 @@
 package sarama
 
+//AddOffsetsToTxnRequest adds offsets to a transaction request
 type AddOffsetsToTxnRequest struct {
 	TransactionalID string
 	ProducerID      int64

+ 1 - 0
add_offsets_to_txn_response.go

@@ -4,6 +4,7 @@ import (
 	"time"
 )
 
+//AddOffsetsToTxnResponse is a response type for adding offsets to txns
 type AddOffsetsToTxnResponse struct {
 	ThrottleTime time.Duration
 	Err          KError

+ 1 - 0
add_partitions_to_txn_request.go

@@ -1,5 +1,6 @@
 package sarama
 
+//AddPartitionsToTxnRequest is a add paartition request
 type AddPartitionsToTxnRequest struct {
 	TransactionalID string
 	ProducerID      int64

+ 2 - 0
add_partitions_to_txn_response.go

@@ -4,6 +4,7 @@ import (
 	"time"
 )
 
+//AddPartitionsToTxnResponse is a partition errors to transaction type
 type AddPartitionsToTxnResponse struct {
 	ThrottleTime time.Duration
 	Errors       map[string][]*PartitionError
@@ -82,6 +83,7 @@ func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }
 
+//PartitionError is a partition error type
 type PartitionError struct {
 	Partition int32
 	Err       KError

+ 24 - 22
alter_configs_request.go

@@ -1,45 +1,47 @@
 package sarama
 
+//AlterConfigsRequest is an alter config request type
 type AlterConfigsRequest struct {
 	Resources    []*AlterConfigsResource
 	ValidateOnly bool
 }
 
+//AlterConfigsResource is an alter config resource type
 type AlterConfigsResource struct {
 	Type          ConfigResourceType
 	Name          string
 	ConfigEntries map[string]*string
 }
 
-func (acr *AlterConfigsRequest) encode(pe packetEncoder) error {
-	if err := pe.putArrayLength(len(acr.Resources)); err != nil {
+func (a *AlterConfigsRequest) encode(pe packetEncoder) error {
+	if err := pe.putArrayLength(len(a.Resources)); err != nil {
 		return err
 	}
 
-	for _, r := range acr.Resources {
+	for _, r := range a.Resources {
 		if err := r.encode(pe); err != nil {
 			return err
 		}
 	}
 
-	pe.putBool(acr.ValidateOnly)
+	pe.putBool(a.ValidateOnly)
 	return nil
 }
 
-func (acr *AlterConfigsRequest) decode(pd packetDecoder, version int16) error {
+func (a *AlterConfigsRequest) decode(pd packetDecoder, version int16) error {
 	resourceCount, err := pd.getArrayLength()
 	if err != nil {
 		return err
 	}
 
-	acr.Resources = make([]*AlterConfigsResource, resourceCount)
-	for i := range acr.Resources {
+	a.Resources = make([]*AlterConfigsResource, resourceCount)
+	for i := range a.Resources {
 		r := &AlterConfigsResource{}
 		err = r.decode(pd, version)
 		if err != nil {
 			return err
 		}
-		acr.Resources[i] = r
+		a.Resources[i] = r
 	}
 
 	validateOnly, err := pd.getBool()
@@ -47,22 +49,22 @@ func (acr *AlterConfigsRequest) decode(pd packetDecoder, version int16) error {
 		return err
 	}
 
-	acr.ValidateOnly = validateOnly
+	a.ValidateOnly = validateOnly
 
 	return nil
 }
 
-func (ac *AlterConfigsResource) encode(pe packetEncoder) error {
-	pe.putInt8(int8(ac.Type))
+func (a *AlterConfigsResource) encode(pe packetEncoder) error {
+	pe.putInt8(int8(a.Type))
 
-	if err := pe.putString(ac.Name); err != nil {
+	if err := pe.putString(a.Name); err != nil {
 		return err
 	}
 
-	if err := pe.putArrayLength(len(ac.ConfigEntries)); err != nil {
+	if err := pe.putArrayLength(len(a.ConfigEntries)); err != nil {
 		return err
 	}
-	for configKey, configValue := range ac.ConfigEntries {
+	for configKey, configValue := range a.ConfigEntries {
 		if err := pe.putString(configKey); err != nil {
 			return err
 		}
@@ -74,18 +76,18 @@ func (ac *AlterConfigsResource) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (ac *AlterConfigsResource) decode(pd packetDecoder, version int16) error {
+func (a *AlterConfigsResource) decode(pd packetDecoder, version int16) error {
 	t, err := pd.getInt8()
 	if err != nil {
 		return err
 	}
-	ac.Type = ConfigResourceType(t)
+	a.Type = ConfigResourceType(t)
 
 	name, err := pd.getString()
 	if err != nil {
 		return err
 	}
-	ac.Name = name
+	a.Name = name
 
 	n, err := pd.getArrayLength()
 	if err != nil {
@@ -93,13 +95,13 @@ func (ac *AlterConfigsResource) decode(pd packetDecoder, version int16) error {
 	}
 
 	if n > 0 {
-		ac.ConfigEntries = make(map[string]*string, n)
+		a.ConfigEntries = make(map[string]*string, n)
 		for i := 0; i < n; i++ {
 			configKey, err := pd.getString()
 			if err != nil {
 				return err
 			}
-			if ac.ConfigEntries[configKey], err = pd.getNullableString(); err != nil {
+			if a.ConfigEntries[configKey], err = pd.getNullableString(); err != nil {
 				return err
 			}
 		}
@@ -107,14 +109,14 @@ func (ac *AlterConfigsResource) decode(pd packetDecoder, version int16) error {
 	return err
 }
 
-func (acr *AlterConfigsRequest) key() int16 {
+func (a *AlterConfigsRequest) key() int16 {
 	return 33
 }
 
-func (acr *AlterConfigsRequest) version() int16 {
+func (a *AlterConfigsRequest) version() int16 {
 	return 0
 }
 
-func (acr *AlterConfigsRequest) requiredVersion() KafkaVersion {
+func (a *AlterConfigsRequest) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 22 - 20
alter_configs_response.go

@@ -2,11 +2,13 @@ package sarama
 
 import "time"
 
+//AlterConfigsResponse is a reponse type for alter config
 type AlterConfigsResponse struct {
 	ThrottleTime time.Duration
 	Resources    []*AlterConfigsResourceResponse
 }
 
+//AlterConfigsResourceResponse is a reponse type for alter config resource
 type AlterConfigsResourceResponse struct {
 	ErrorCode int16
 	ErrorMsg  string
@@ -14,21 +16,21 @@ type AlterConfigsResourceResponse struct {
 	Name      string
 }
 
-func (ct *AlterConfigsResponse) encode(pe packetEncoder) error {
-	pe.putInt32(int32(ct.ThrottleTime / time.Millisecond))
+func (a *AlterConfigsResponse) encode(pe packetEncoder) error {
+	pe.putInt32(int32(a.ThrottleTime / time.Millisecond))
 
-	if err := pe.putArrayLength(len(ct.Resources)); err != nil {
+	if err := pe.putArrayLength(len(a.Resources)); err != nil {
 		return err
 	}
 
-	for i := range ct.Resources {
-		pe.putInt16(ct.Resources[i].ErrorCode)
-		err := pe.putString(ct.Resources[i].ErrorMsg)
+	for i := range a.Resources {
+		pe.putInt16(a.Resources[i].ErrorCode)
+		err := pe.putString(a.Resources[i].ErrorMsg)
 		if err != nil {
 			return nil
 		}
-		pe.putInt8(int8(ct.Resources[i].Type))
-		err = pe.putString(ct.Resources[i].Name)
+		pe.putInt8(int8(a.Resources[i].Type))
+		err = pe.putString(a.Resources[i].Name)
 		if err != nil {
 			return nil
 		}
@@ -37,59 +39,59 @@ func (ct *AlterConfigsResponse) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (acr *AlterConfigsResponse) decode(pd packetDecoder, version int16) error {
+func (a *AlterConfigsResponse) decode(pd packetDecoder, version int16) error {
 	throttleTime, err := pd.getInt32()
 	if err != nil {
 		return err
 	}
-	acr.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
+	a.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
 
 	responseCount, err := pd.getArrayLength()
 	if err != nil {
 		return err
 	}
 
-	acr.Resources = make([]*AlterConfigsResourceResponse, responseCount)
+	a.Resources = make([]*AlterConfigsResourceResponse, responseCount)
 
-	for i := range acr.Resources {
-		acr.Resources[i] = new(AlterConfigsResourceResponse)
+	for i := range a.Resources {
+		a.Resources[i] = new(AlterConfigsResourceResponse)
 
 		errCode, err := pd.getInt16()
 		if err != nil {
 			return err
 		}
-		acr.Resources[i].ErrorCode = errCode
+		a.Resources[i].ErrorCode = errCode
 
 		e, err := pd.getString()
 		if err != nil {
 			return err
 		}
-		acr.Resources[i].ErrorMsg = e
+		a.Resources[i].ErrorMsg = e
 
 		t, err := pd.getInt8()
 		if err != nil {
 			return err
 		}
-		acr.Resources[i].Type = ConfigResourceType(t)
+		a.Resources[i].Type = ConfigResourceType(t)
 
 		name, err := pd.getString()
 		if err != nil {
 			return err
 		}
-		acr.Resources[i].Name = name
+		a.Resources[i].Name = name
 	}
 
 	return nil
 }
 
-func (r *AlterConfigsResponse) key() int16 {
+func (a *AlterConfigsResponse) key() int16 {
 	return 32
 }
 
-func (r *AlterConfigsResponse) version() int16 {
+func (a *AlterConfigsResponse) version() int16 {
 	return 0
 }
 
-func (r *AlterConfigsResponse) requiredVersion() KafkaVersion {
+func (a *AlterConfigsResponse) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 6 - 5
api_versions_request.go

@@ -1,24 +1,25 @@
 package sarama
 
+//ApiVersionsRequest ...
 type ApiVersionsRequest struct {
 }
 
-func (r *ApiVersionsRequest) encode(pe packetEncoder) error {
+func (a *ApiVersionsRequest) encode(pe packetEncoder) error {
 	return nil
 }
 
-func (r *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) {
+func (a *ApiVersionsRequest) decode(pd packetDecoder, version int16) (err error) {
 	return nil
 }
 
-func (r *ApiVersionsRequest) key() int16 {
+func (a *ApiVersionsRequest) key() int16 {
 	return 18
 }
 
-func (r *ApiVersionsRequest) version() int16 {
+func (a *ApiVersionsRequest) version() int16 {
 	return 0
 }
 
-func (r *ApiVersionsRequest) requiredVersion() KafkaVersion {
+func (a *ApiVersionsRequest) requiredVersion() KafkaVersion {
 	return V0_10_0_0
 }

+ 2 - 0
api_versions_response.go

@@ -1,5 +1,6 @@
 package sarama
 
+//ApiVersionsResponseBlock is an api version reponse block type
 type ApiVersionsResponseBlock struct {
 	ApiKey     int16
 	MinVersion int16
@@ -31,6 +32,7 @@ func (b *ApiVersionsResponseBlock) decode(pd packetDecoder) error {
 	return nil
 }
 
+//ApiVersionsResponse is an api version response type
 type ApiVersionsResponse struct {
 	Err         KError
 	ApiVersions []*ApiVersionsResponseBlock

+ 28 - 33
async_producer_test.go

@@ -1001,7 +1001,6 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
 				batchSize := len(batch.Records)
 
 				if lastSequenceWrittenToDisk == batchFirstSeq-1 { //in sequence append
-
 					if lastBatchFirstSeq == batchFirstSeq { //is a batch retry
 						if lastBatchSize == batchSize { //good retry
 							// mock write to disk
@@ -1010,43 +1009,39 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
 						}
 						t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
 						return prodOutOfSeq
-					} else { // not a retry
-						// save batch just received for future check
-						lastBatchFirstSeq = batchFirstSeq
-						lastBatchSize = batchSize
-
-						if prodCounter%2 == 1 {
-							if test.failAfterWrite {
-								// mock write to disk
-								lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
-							}
-							return prodNotLeaderResponse
+					} // not a retry
+					// save batch just received for future check
+					lastBatchFirstSeq = batchFirstSeq
+					lastBatchSize = batchSize
+
+					if prodCounter%2 == 1 {
+						if test.failAfterWrite {
+							// mock write to disk
+							lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
 						}
-						// mock write to disk
-						lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
-						return prodSuccessResponse
+						return prodNotLeaderResponse
 					}
-				} else {
-					if lastBatchFirstSeq == batchFirstSeq && lastBatchSize == batchSize { // is a good batch retry
-						if lastSequenceWrittenToDisk == (batchFirstSeq + batchSize - 1) { // we already have the messages
-							return prodDuplicate
-						}
-						// mock write to disk
-						lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
-						return prodSuccessResponse
-					} else { //out of sequence / bad retried batch
-						if lastBatchFirstSeq == batchFirstSeq && lastBatchSize != batchSize {
-							t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
-						} else if lastSequenceWrittenToDisk+1 != batchFirstSeq {
-							t.Errorf("[%s] Out of sequence message lastSequence=%d new batch starts at=%d", test.name, lastSequenceWrittenToDisk, batchFirstSeq)
-						} else {
-							t.Errorf("[%s] Unexpected error", test.name)
-						}
-
-						return prodOutOfSeq
+					// mock write to disk
+					lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
+					return prodSuccessResponse
+				}
+				if lastBatchFirstSeq == batchFirstSeq && lastBatchSize == batchSize { // is a good batch retry
+					if lastSequenceWrittenToDisk == (batchFirstSeq + batchSize - 1) { // we already have the messages
+						return prodDuplicate
 					}
+					// mock write to disk
+					lastSequenceWrittenToDisk = batchFirstSeq + batchSize - 1
+					return prodSuccessResponse
+				} //out of sequence / bad retried batch
+				if lastBatchFirstSeq == batchFirstSeq && lastBatchSize != batchSize {
+					t.Errorf("[%s] Retried Batch firstSeq=%d with different size old=%d new=%d", test.name, batchFirstSeq, lastBatchSize, batchSize)
+				} else if lastSequenceWrittenToDisk+1 != batchFirstSeq {
+					t.Errorf("[%s] Out of sequence message lastSequence=%d new batch starts at=%d", test.name, lastSequenceWrittenToDisk, batchFirstSeq)
+				} else {
+					t.Errorf("[%s] Unexpected error", test.name)
 				}
 
+				return prodOutOfSeq
 			}
 			return nil
 		}

+ 1 - 2
client.go

@@ -877,9 +877,8 @@ func (client *client) computeBackoff(attemptsRemaining int) time.Duration {
 		maxRetries := client.conf.Metadata.Retry.Max
 		retries := maxRetries - attemptsRemaining
 		return client.conf.Metadata.Retry.BackoffFunc(retries, maxRetries)
-	} else {
-		return client.conf.Metadata.Retry.Backoff
 	}
+	return client.conf.Metadata.Retry.Backoff
 }
 
 func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {

+ 7 - 0
config_resource_type.go

@@ -1,15 +1,22 @@
 package sarama
 
+//ConfigResourceType is a type for config resource
 type ConfigResourceType int8
 
 // Taken from :
 // https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+Describe+and+Alter+Configs+Admin+APIs#KIP-133:DescribeandAlterConfigsAdminAPIs-WireFormattypes
 
 const (
+	//UnknownResource constant type
 	UnknownResource ConfigResourceType = iota
+	//AnyResource constant type
 	AnyResource
+	//TopicResource constant type
 	TopicResource
+	//GroupResource constant type
 	GroupResource
+	//ClusterResource constant type
 	ClusterResource
+	//BrokerResource constant type
 	BrokerResource
 )

+ 35 - 33
control_record.go

@@ -1,10 +1,14 @@
 package sarama
 
+//ControlRecordType ...
 type ControlRecordType int
 
 const (
+	//ControlRecordAbort is a control record for abort
 	ControlRecordAbort ControlRecordType = iota
+	//ControlRecordCommit is a control record for commit
 	ControlRecordCommit
+	//ControlRecordUnknown is a control record of unknown type
 	ControlRecordUnknown
 )
 
@@ -18,40 +22,38 @@ type ControlRecord struct {
 }
 
 func (cr *ControlRecord) decode(key, value packetDecoder) error {
-	{
-		var err error
-		cr.Version, err = value.getInt16()
-		if err != nil {
-			return err
-		}
-		cr.CoordinatorEpoch, err = value.getInt32()
-		if err != nil {
-			return err
-		}
+	var err error
+	cr.Version, err = value.getInt16()
+	if err != nil {
+		return err
 	}
-	{
-		var err error
-		// There a version for the value part AND the key part. And I have no idea if they are supposed to match or not
-		// Either way, all these version can only be 0 for now
-		cr.Version, err = key.getInt16()
-		if err != nil {
-			return err
-		}
 
-		recordType, err := key.getInt16()
-		if err != nil {
-			return err
-		}
-		switch recordType {
-		case 0:
-			cr.Type = ControlRecordAbort
-		case 1:
-			cr.Type = ControlRecordCommit
-		default:
-			// from JAVA implementation:
-			// UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
-			cr.Type = ControlRecordUnknown
-		}
+	cr.CoordinatorEpoch, err = value.getInt32()
+	if err != nil {
+		return err
+	}
+
+	// There a version for the value part AND the key part. And I have no idea if they are supposed to match or not
+	// Either way, all these version can only be 0 for now
+	cr.Version, err = key.getInt16()
+	if err != nil {
+		return err
+	}
+
+	recordType, err := key.getInt16()
+	if err != nil {
+		return err
+	}
+
+	switch recordType {
+	case 0:
+		cr.Type = ControlRecordAbort
+	case 1:
+		cr.Type = ControlRecordCommit
+	default:
+		// from JAVA implementation:
+		// UNKNOWN is used to indicate a control type which the client is not aware of and should be ignored
+		cr.Type = ControlRecordUnknown
 	}
 	return nil
 }
@@ -59,8 +61,8 @@ func (cr *ControlRecord) decode(key, value packetDecoder) error {
 func (cr *ControlRecord) encode(key, value packetEncoder) {
 	value.putInt16(cr.Version)
 	value.putInt32(cr.CoordinatorEpoch)
-
 	key.putInt16(cr.Version)
+
 	switch cr.Type {
 	case ControlRecordAbort:
 		key.putInt16(0)

+ 1 - 0
go.sum

@@ -30,6 +30,7 @@ github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
 github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
 golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25 h1:jsG6UpNLt9iAsb0S2AGW28DveNzzgmbXR+ENoPjUeIU=
 golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
 golang.org/x/net v0.0.0-20190327091125-710a502c58a2 h1:17UhVDrPb40BH5k6cyeb2V/7QlBNdo/a0+r0dtK+Utw=
 golang.org/x/net v0.0.0-20190327091125-710a502c58a2/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=