Просмотр исходного кода

feat: Add alter and list partition reassignments

Implementation of KIP-455. Also includes work to make Sarama protocol
support the new optional tagged fields functionality from KIP-482

- add headerVersion for all requests (Ref: KIP-482)
- implement AlterPartitionReassignmentsRequest/Reponse protocol
- add tests for alter_partition_reassignments
- pretty print partition reassignment errors
- add ListPartitionReassignmentsRequest/Response protocol
- decode empty tagged fields in response header v1
- make sure mockbroker can handle different reponse header versions
- make sure partition reassignment can be aborted
- add Alter/ListPartitionReassignments to admin client api

https://cwiki.apache.org/confluence/display/KAFKA/KIP-455%3A+Create+an+Administrative+API+for+Replica+Reassignment
https://cwiki.apache.org/confluence/display/KAFKA/KIP-482%3A+The+Kafka+Protocol+should+Support+Optional+Tagged+Fields

Co-authored-by: Dirk Wilden <dirk.wilden@device-insight.com>
Co-authored-by: Leonid Koftun <leonid.koftun@gmail.com>
Co-authored-by: iyacontrol <gaohj2015@yeah.net>
Leonid Koftun 4 лет назад
Родитель
Сommit
2d2326efcf
93 измененных файлов с 1692 добавлено и 60 удалено
  1. 4 0
      acl_create_request.go
  2. 4 0
      acl_create_response.go
  3. 4 0
      acl_delete_request.go
  4. 4 0
      acl_delete_response.go
  5. 4 0
      acl_describe_request.go
  6. 4 0
      acl_describe_response.go
  7. 4 0
      add_offsets_to_txn_request.go
  8. 4 0
      add_offsets_to_txn_response.go
  9. 4 0
      add_partitions_to_txn_request.go
  10. 4 0
      add_partitions_to_txn_response.go
  11. 84 0
      admin.go
  12. 161 0
      admin_test.go
  13. 4 0
      alter_configs_request.go
  14. 4 0
      alter_configs_response.go
  15. 130 0
      alter_partition_reassignments_request.go
  16. 56 0
      alter_partition_reassignments_request_test.go
  17. 157 0
      alter_partition_reassignments_response.go
  18. 45 0
      alter_partition_reassignments_response_test.go
  19. 4 0
      api_versions_request.go
  20. 4 0
      api_versions_response.go
  21. 1 1
      async_producer_test.go
  22. 52 9
      broker.go
  23. 4 0
      broker_test.go
  24. 4 0
      consumer_metadata_request.go
  25. 4 0
      consumer_metadata_response.go
  26. 4 0
      create_partitions_request.go
  27. 4 0
      create_partitions_response.go
  28. 4 0
      create_topics_request.go
  29. 4 0
      create_topics_response.go
  30. 4 0
      delete_groups_request.go
  31. 4 0
      delete_groups_response.go
  32. 4 0
      delete_records_request.go
  33. 4 0
      delete_records_response.go
  34. 4 0
      delete_topics_request.go
  35. 4 0
      delete_topics_response.go
  36. 4 0
      describe_configs_request.go
  37. 4 0
      describe_configs_response.go
  38. 4 0
      describe_groups_request.go
  39. 4 0
      describe_groups_response.go
  40. 4 0
      describe_log_dirs_request.go
  41. 4 0
      describe_log_dirs_response.go
  42. 5 0
      encoder_decoder.go
  43. 4 0
      end_txn_request.go
  44. 4 0
      end_txn_response.go
  45. 16 0
      errors.go
  46. 4 0
      fetch_request.go
  47. 4 0
      fetch_response.go
  48. 4 0
      find_coordinator_request.go
  49. 4 0
      find_coordinator_response.go
  50. 4 0
      heartbeat_request.go
  51. 4 0
      heartbeat_response.go
  52. 4 0
      init_producer_id_request.go
  53. 4 0
      init_producer_id_response.go
  54. 4 0
      join_group_request.go
  55. 4 0
      join_group_response.go
  56. 4 0
      leave_group_request.go
  57. 4 0
      leave_group_response.go
  58. 4 0
      list_groups_request.go
  59. 4 0
      list_groups_response.go
  60. 98 0
      list_partition_reassignments_request.go
  61. 31 0
      list_partition_reassignments_request_test.go
  62. 169 0
      list_partition_reassignments_response.go
  63. 32 0
      list_partition_reassignments_response_test.go
  64. 4 0
      metadata_request.go
  65. 4 0
      metadata_response.go
  66. 25 9
      mockbroker.go
  67. 65 28
      mockresponses.go
  68. 4 0
      offset_commit_request.go
  69. 4 0
      offset_commit_response.go
  70. 4 0
      offset_fetch_request.go
  71. 4 0
      offset_fetch_response.go
  72. 3 3
      offset_manager_test.go
  73. 4 0
      offset_request.go
  74. 4 0
      offset_response.go
  75. 6 0
      packet_decoder.go
  76. 7 0
      packet_encoder.go
  77. 49 0
      prep_encoder.go
  78. 4 0
      produce_request.go
  79. 16 0
      produce_response.go
  80. 96 0
      real_decoder.go
  81. 52 0
      real_encoder.go
  82. 24 4
      request.go
  83. 21 2
      request_test.go
  84. 8 1
      response_header.go
  85. 19 3
      response_header_test.go
  86. 4 0
      sasl_authenticate_request.go
  87. 4 0
      sasl_authenticate_response.go
  88. 4 0
      sasl_handshake_request.go
  89. 4 0
      sasl_handshake_response.go
  90. 4 0
      sync_group_request.go
  91. 4 0
      sync_group_response.go
  92. 4 0
      txn_offset_commit_request.go
  93. 4 0
      txn_offset_commit_response.go

+ 4 - 0
acl_create_request.go

@@ -47,6 +47,10 @@ func (c *CreateAclsRequest) version() int16 {
 	return c.Version
 }
 
+func (c *CreateAclsRequest) headerVersion() int16 {
+	return 1
+}
+
 func (c *CreateAclsRequest) requiredVersion() KafkaVersion {
 	switch c.Version {
 	case 1:

+ 4 - 0
acl_create_response.go

@@ -55,6 +55,10 @@ func (c *CreateAclsResponse) version() int16 {
 	return 0
 }
 
+func (c *CreateAclsResponse) headerVersion() int16 {
+	return 0
+}
+
 func (c *CreateAclsResponse) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 4 - 0
acl_delete_request.go

@@ -48,6 +48,10 @@ func (d *DeleteAclsRequest) version() int16 {
 	return int16(d.Version)
 }
 
+func (c *DeleteAclsRequest) headerVersion() int16 {
+	return 1
+}
+
 func (d *DeleteAclsRequest) requiredVersion() KafkaVersion {
 	switch d.Version {
 	case 1:

+ 4 - 0
acl_delete_response.go

@@ -56,6 +56,10 @@ func (d *DeleteAclsResponse) version() int16 {
 	return d.Version
 }
 
+func (d *DeleteAclsResponse) headerVersion() int16 {
+	return 0
+}
+
 func (d *DeleteAclsResponse) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 4 - 0
acl_describe_request.go

@@ -25,6 +25,10 @@ func (d *DescribeAclsRequest) version() int16 {
 	return int16(d.Version)
 }
 
+func (d *DescribeAclsRequest) headerVersion() int16 {
+	return 1
+}
+
 func (d *DescribeAclsRequest) requiredVersion() KafkaVersion {
 	switch d.Version {
 	case 1:

+ 4 - 0
acl_describe_response.go

@@ -77,6 +77,10 @@ func (d *DescribeAclsResponse) version() int16 {
 	return d.Version
 }
 
+func (d *DescribeAclsResponse) headerVersion() int16 {
+	return 0
+}
+
 func (d *DescribeAclsResponse) requiredVersion() KafkaVersion {
 	switch d.Version {
 	case 1:

+ 4 - 0
add_offsets_to_txn_request.go

@@ -48,6 +48,10 @@ func (a *AddOffsetsToTxnRequest) version() int16 {
 	return 0
 }
 
+func (a *AddOffsetsToTxnRequest) headerVersion() int16 {
+	return 1
+}
+
 func (a *AddOffsetsToTxnRequest) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 4 - 0
add_offsets_to_txn_response.go

@@ -40,6 +40,10 @@ func (a *AddOffsetsToTxnResponse) version() int16 {
 	return 0
 }
 
+func (a *AddOffsetsToTxnResponse) headerVersion() int16 {
+	return 0
+}
+
 func (a *AddOffsetsToTxnResponse) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 4 - 0
add_partitions_to_txn_request.go

@@ -72,6 +72,10 @@ func (a *AddPartitionsToTxnRequest) version() int16 {
 	return 0
 }
 
+func (a *AddPartitionsToTxnRequest) headerVersion() int16 {
+	return 1
+}
+
 func (a *AddPartitionsToTxnRequest) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 4 - 0
add_partitions_to_txn_response.go

@@ -79,6 +79,10 @@ func (a *AddPartitionsToTxnResponse) version() int16 {
 	return 0
 }
 
+func (a *AddPartitionsToTxnResponse) headerVersion() int16 {
+	return 0
+}
+
 func (a *AddPartitionsToTxnResponse) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 84 - 0
admin.go

@@ -42,6 +42,14 @@ type ClusterAdmin interface {
 	// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
 	CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
 
+	// Alter the replica assignment for partitions.
+	// This operation is supported by brokers with version 2.4.0.0 or higher.
+	AlterPartitionReassignments(topic string, assignment [][]int32) error
+
+	// Provides info on ongoing partitions replica reassignments.
+	// This operation is supported by brokers with version 2.4.0.0 or higher.
+	ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
+
 	// Delete records whose offset is smaller than the given offset of the corresponding partition.
 	// This operation is supported by brokers with version 0.11.0.0 or higher.
 	DeleteRecords(topic string, partitionOffsets map[int32]int64) error
@@ -452,6 +460,82 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
 	})
 }
 
+func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
+	if topic == "" {
+		return ErrInvalidTopic
+	}
+
+	request := &AlterPartitionReassignmentsRequest{
+		TimeoutMs: int32(60000),
+		Version:   int16(0),
+	}
+
+	for i := 0; i < len(assignment); i++ {
+		request.AddBlock(topic, int32(i), assignment[i])
+	}
+
+	return ca.retryOnError(isErrNoController, func() error {
+		b, err := ca.Controller()
+		if err != nil {
+			return err
+		}
+
+		errs := make([]error, 0)
+
+		rsp, err := b.AlterPartitionReassignments(request)
+
+		if err != nil {
+			errs = append(errs, err)
+		} else {
+			if rsp.ErrorCode > 0 {
+				errs = append(errs, errors.New(rsp.ErrorCode.Error()))
+			}
+
+			for topic, topicErrors := range rsp.Errors {
+				for partition, partitionError := range topicErrors {
+					if partitionError.errorCode != ErrNoError {
+						errStr := fmt.Sprintf("[%s-%d]: %s", topic, partition, partitionError.errorCode.Error())
+						errs = append(errs, errors.New(errStr))
+					}
+				}
+			}
+		}
+
+		if len(errs) > 0 {
+			return ErrReassignPartitions{MultiError{&errs}}
+		}
+
+		return nil
+	})
+}
+
+func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
+	if topic == "" {
+		return nil, ErrInvalidTopic
+	}
+
+	request := &ListPartitionReassignmentsRequest{
+		TimeoutMs: int32(60000),
+		Version:   int16(0),
+	}
+
+	request.AddBlock(topic, partitions)
+
+	b, err := ca.Controller()
+	if err != nil {
+		return nil, err
+	}
+	_ = b.Open(ca.client.Config())
+
+	rsp, err := b.ListPartitionReassignments(request)
+
+	if err == nil && rsp != nil {
+		return rsp.TopicStatus, nil
+	} else {
+		return nil, err
+	}
+}
+
 func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
 	if topic == "" {
 		return ErrInvalidTopic

+ 161 - 0
admin_test.go

@@ -332,6 +332,167 @@ func TestClusterAdminCreatePartitionsWithoutAuthorization(t *testing.T) {
 	}
 }
 
+func TestClusterAdminAlterPartitionReassignments(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	secondBroker := NewMockBroker(t, 2)
+	defer secondBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(secondBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
+			SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
+	})
+
+	secondBroker.SetHandlerByMap(map[string]MockResponse{
+		"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V2_4_0_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	var topicAssignment = make([][]int32, 0, 3)
+
+	err = admin.AlterPartitionReassignments("my_topic", topicAssignment)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminAlterPartitionReassignmentsWithDiffVersion(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	secondBroker := NewMockBroker(t, 2)
+	defer secondBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(secondBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
+			SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
+	})
+
+	secondBroker.SetHandlerByMap(map[string]MockResponse{
+		"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V2_3_0_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	var topicAssignment = make([][]int32, 0, 3)
+
+	err = admin.AlterPartitionReassignments("my_topic", topicAssignment)
+
+	if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) {
+		t.Fatal(err)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminListPartitionReassignments(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	secondBroker := NewMockBroker(t, 2)
+	defer secondBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(secondBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
+			SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
+	})
+
+	secondBroker.SetHandlerByMap(map[string]MockResponse{
+		"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V2_4_0_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	response, err := admin.ListPartitionReassignments("my_topic", []int32{0, 1})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	partitionStatus, ok := response["my_topic"]
+	if !ok {
+		t.Fatalf("topic missing in response")
+	} else {
+		if len(partitionStatus) != 2 {
+			t.Fatalf("partition missing in response")
+		}
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestClusterAdminListPartitionReassignmentsWithDiffVersion(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	secondBroker := NewMockBroker(t, 2)
+	defer secondBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(secondBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
+			SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
+	})
+
+	secondBroker.SetHandlerByMap(map[string]MockResponse{
+		"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V2_3_0_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	var partitions = make([]int32, 0)
+
+	_, err = admin.ListPartitionReassignments("my_topic", partitions)
+
+	if !strings.ContainsAny(err.Error(), ErrUnsupportedVersion.Error()) {
+		t.Fatal(err)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
 func TestClusterAdminDeleteRecords(t *testing.T) {
 	topicName := "my_topic"
 	seedBroker := NewMockBroker(t, 1)

+ 4 - 0
alter_configs_request.go

@@ -117,6 +117,10 @@ func (a *AlterConfigsRequest) version() int16 {
 	return 0
 }
 
+func (a *AlterConfigsRequest) headerVersion() int16 {
+	return 1
+}
+
 func (a *AlterConfigsRequest) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 4 - 0
alter_configs_response.go

@@ -92,6 +92,10 @@ func (a *AlterConfigsResponse) version() int16 {
 	return 0
 }
 
+func (a *AlterConfigsResponse) headerVersion() int16 {
+	return 0
+}
+
 func (a *AlterConfigsResponse) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 130 - 0
alter_partition_reassignments_request.go

@@ -0,0 +1,130 @@
+package sarama
+
+type alterPartitionReassignmentsBlock struct {
+	replicas []int32
+}
+
+func (b *alterPartitionReassignmentsBlock) encode(pe packetEncoder) error {
+	if err := pe.putNullableCompactInt32Array(b.replicas); err != nil {
+		return err
+	}
+
+	pe.putEmptyTaggedFieldArray()
+	return nil
+}
+
+func (b *alterPartitionReassignmentsBlock) decode(pd packetDecoder) (err error) {
+	if b.replicas, err = pd.getCompactInt32Array(); err != nil {
+		return err
+	}
+	return nil
+}
+
+type AlterPartitionReassignmentsRequest struct {
+	TimeoutMs int32
+	blocks    map[string]map[int32]*alterPartitionReassignmentsBlock
+	Version   int16
+}
+
+func (r *AlterPartitionReassignmentsRequest) encode(pe packetEncoder) error {
+	pe.putInt32(r.TimeoutMs)
+
+	pe.putCompactArrayLength(len(r.blocks))
+
+	for topic, partitions := range r.blocks {
+		if err := pe.putCompactString(topic); err != nil {
+			return err
+		}
+		pe.putCompactArrayLength(len(partitions))
+		for partition, block := range partitions {
+			pe.putInt32(partition)
+			if err := block.encode(pe); err != nil {
+				return err
+			}
+		}
+		pe.putEmptyTaggedFieldArray()
+	}
+
+	pe.putEmptyTaggedFieldArray()
+
+	return nil
+}
+
+func (r *AlterPartitionReassignmentsRequest) decode(pd packetDecoder, version int16) (err error) {
+	r.Version = version
+
+	if r.TimeoutMs, err = pd.getInt32(); err != nil {
+		return err
+	}
+
+	topicCount, err := pd.getCompactArrayLength()
+	if err != nil {
+		return err
+	}
+	if topicCount > 0 {
+		r.blocks = make(map[string]map[int32]*alterPartitionReassignmentsBlock)
+		for i := 0; i < topicCount; i++ {
+			topic, err := pd.getCompactString()
+			if err != nil {
+				return err
+			}
+			partitionCount, err := pd.getCompactArrayLength()
+			if err != nil {
+				return err
+			}
+			r.blocks[topic] = make(map[int32]*alterPartitionReassignmentsBlock)
+			for j := 0; j < partitionCount; j++ {
+				partition, err := pd.getInt32()
+				if err != nil {
+					return err
+				}
+				block := &alterPartitionReassignmentsBlock{}
+				if err := block.decode(pd); err != nil {
+					return err
+				}
+				r.blocks[topic][partition] = block
+
+				if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+					return err
+				}
+			}
+			if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+				return err
+			}
+		}
+	}
+
+	if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+		return err
+	}
+
+	return
+}
+
+func (r *AlterPartitionReassignmentsRequest) key() int16 {
+	return 45
+}
+
+func (r *AlterPartitionReassignmentsRequest) version() int16 {
+	return r.Version
+}
+
+func (r *AlterPartitionReassignmentsRequest) headerVersion() int16 {
+	return 2
+}
+
+func (r *AlterPartitionReassignmentsRequest) requiredVersion() KafkaVersion {
+	return V2_4_0_0
+}
+
+func (r *AlterPartitionReassignmentsRequest) AddBlock(topic string, partitionID int32, replicas []int32) {
+	if r.blocks == nil {
+		r.blocks = make(map[string]map[int32]*alterPartitionReassignmentsBlock)
+	}
+
+	if r.blocks[topic] == nil {
+		r.blocks[topic] = make(map[int32]*alterPartitionReassignmentsBlock)
+	}
+
+	r.blocks[topic][partitionID] = &alterPartitionReassignmentsBlock{replicas}
+}

+ 56 - 0
alter_partition_reassignments_request_test.go

@@ -0,0 +1,56 @@
+package sarama
+
+import "testing"
+
+var (
+	alterPartitionReassignmentsRequestNoBlock = []byte{
+		0, 0, 39, 16, // timout 10000
+		1, // 1-1=0 blocks
+		0, // empty tagged fields
+	}
+
+	alterPartitionReassignmentsRequestOneBlock = []byte{
+		0, 0, 39, 16, // timout 10000
+		2,                         // 2-1=1 block
+		6, 116, 111, 112, 105, 99, // topic name "topic" as compact string
+		2,          // 2-1=1 partitions
+		0, 0, 0, 0, // partitionId
+		3,            // 3-1=2 replica array size
+		0, 0, 3, 232, // replica 1000
+		0, 0, 3, 233, // replica 1001
+		0, 0, 0, // empty tagged fields
+	}
+
+	alterPartitionReassignmentsAbortRequest = []byte{
+		0, 0, 39, 16, // timout 10000
+		2,                         // 2-1=1 block
+		6, 116, 111, 112, 105, 99, // topic name "topic" as compact string
+		2,          // 2-1=1 partitions
+		0, 0, 0, 0, // partitionId
+		0,       // replica array is null (indicates that a pending reassignment should be aborted)
+		0, 0, 0, // empty tagged fields
+	}
+)
+
+func TestAlterPartitionReassignmentRequest(t *testing.T) {
+	var request *AlterPartitionReassignmentsRequest
+
+	request = &AlterPartitionReassignmentsRequest{
+		TimeoutMs: int32(10000),
+		Version:   int16(0),
+	}
+
+	testRequest(t, "no block", request, alterPartitionReassignmentsRequestNoBlock)
+
+	request.AddBlock("topic", 0, []int32{1000, 1001})
+
+	testRequest(t, "one block", request, alterPartitionReassignmentsRequestOneBlock)
+
+	request = &AlterPartitionReassignmentsRequest{
+		TimeoutMs: int32(10000),
+		Version:   int16(0),
+	}
+	request.AddBlock("topic", 0, nil)
+
+	testRequest(t, "abort assignment", request, alterPartitionReassignmentsAbortRequest)
+}

+ 157 - 0
alter_partition_reassignments_response.go

@@ -0,0 +1,157 @@
+package sarama
+
+type alterPartitionReassignmentsErrorBlock struct {
+	errorCode    KError
+	errorMessage *string
+}
+
+func (b *alterPartitionReassignmentsErrorBlock) encode(pe packetEncoder) error {
+	pe.putInt16(int16(b.errorCode))
+	if err := pe.putNullableCompactString(b.errorMessage); err != nil {
+		return err
+	}
+	pe.putEmptyTaggedFieldArray()
+
+	return nil
+}
+
+func (b *alterPartitionReassignmentsErrorBlock) decode(pd packetDecoder) (err error) {
+	errorCode, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+	b.errorCode = KError(errorCode)
+	b.errorMessage, err = pd.getCompactNullableString()
+
+	if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+		return err
+	}
+	return err
+}
+
+type AlterPartitionReassignmentsResponse struct {
+	Version        int16
+	ThrottleTimeMs int32
+	ErrorCode      KError
+	ErrorMessage   *string
+	Errors         map[string]map[int32]*alterPartitionReassignmentsErrorBlock
+}
+
+func (r *AlterPartitionReassignmentsResponse) AddError(topic string, partition int32, kerror KError, message *string) {
+	if r.Errors == nil {
+		r.Errors = make(map[string]map[int32]*alterPartitionReassignmentsErrorBlock)
+	}
+	partitions := r.Errors[topic]
+	if partitions == nil {
+		partitions = make(map[int32]*alterPartitionReassignmentsErrorBlock)
+		r.Errors[topic] = partitions
+	}
+
+	partitions[partition] = &alterPartitionReassignmentsErrorBlock{errorCode: kerror, errorMessage: message}
+}
+
+func (r *AlterPartitionReassignmentsResponse) encode(pe packetEncoder) error {
+	pe.putInt32(r.ThrottleTimeMs)
+	pe.putInt16(int16(r.ErrorCode))
+	if err := pe.putNullableCompactString(r.ErrorMessage); err != nil {
+		return err
+	}
+
+	pe.putCompactArrayLength(len(r.Errors))
+	for topic, partitions := range r.Errors {
+		if err := pe.putCompactString(topic); err != nil {
+			return err
+		}
+		pe.putCompactArrayLength(len(partitions))
+		for partition, block := range partitions {
+			pe.putInt32(partition)
+
+			if err := block.encode(pe); err != nil {
+				return err
+			}
+		}
+		pe.putEmptyTaggedFieldArray()
+	}
+
+	pe.putEmptyTaggedFieldArray()
+	return nil
+}
+
+func (r *AlterPartitionReassignmentsResponse) decode(pd packetDecoder, version int16) (err error) {
+	r.Version = version
+
+	if r.ThrottleTimeMs, err = pd.getInt32(); err != nil {
+		return err
+	}
+
+	kerr, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+
+	r.ErrorCode = KError(kerr)
+
+	if r.ErrorMessage, err = pd.getCompactNullableString(); err != nil {
+		return err
+	}
+
+	numTopics, err := pd.getCompactArrayLength()
+	if err != nil {
+		return err
+	}
+
+	if numTopics > 0 {
+		r.Errors = make(map[string]map[int32]*alterPartitionReassignmentsErrorBlock, numTopics)
+		for i := 0; i < numTopics; i++ {
+			topic, err := pd.getCompactString()
+			if err != nil {
+				return err
+			}
+
+			ongoingPartitionReassignments, err := pd.getCompactArrayLength()
+			if err != nil {
+				return err
+			}
+
+			r.Errors[topic] = make(map[int32]*alterPartitionReassignmentsErrorBlock, ongoingPartitionReassignments)
+
+			for j := 0; j < ongoingPartitionReassignments; j++ {
+				partition, err := pd.getInt32()
+				if err != nil {
+					return err
+				}
+				block := &alterPartitionReassignmentsErrorBlock{}
+				if err := block.decode(pd); err != nil {
+					return err
+				}
+
+				r.Errors[topic][partition] = block
+			}
+			if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
+				return err
+			}
+		}
+	}
+
+	if _, err = pd.getEmptyTaggedFieldArray(); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (r *AlterPartitionReassignmentsResponse) key() int16 {
+	return 45
+}
+
+func (r *AlterPartitionReassignmentsResponse) version() int16 {
+	return r.Version
+}
+
+func (r *AlterPartitionReassignmentsResponse) headerVersion() int16 {
+	return 1
+}
+
+func (r *AlterPartitionReassignmentsResponse) requiredVersion() KafkaVersion {
+	return V2_4_0_0
+}

+ 45 - 0
alter_partition_reassignments_response_test.go

@@ -0,0 +1,45 @@
+package sarama
+
+import "testing"
+
+var (
+	alterPartitionReassignmentsResponseNoError = []byte{
+		0, 0, 39, 16, // ThrottleTimeMs 10000
+		0, 0, // errorcode
+		0, // null string
+		1, // empty errors array
+		0, // empty tagged fields
+	}
+
+	alterPartitionReassignmentsResponseWithError = []byte{
+		0, 0, 39, 16, // ThrottleTimeMs 10000
+		0, 12, // errorcode
+		6, 101, 114, 114, 111, 114, // error string "error"
+		2,                         // errors array length 1
+		6, 116, 111, 112, 105, 99, // topic name "topic"
+		2,          // partition array length 1
+		0, 0, 0, 1, // partitionId
+		0, 3, //kerror
+		7, 101, 114, 114, 111, 114, 50, // error string "error2"
+		0, 0, 0, // empty tagged fields
+	}
+)
+
+func TestAlterPartitionReassignmentResponse(t *testing.T) {
+	var response *AlterPartitionReassignmentsResponse
+
+	response = &AlterPartitionReassignmentsResponse{
+		ThrottleTimeMs: int32(10000),
+		Version:        int16(0),
+	}
+
+	testResponse(t, "no error", response, alterPartitionReassignmentsResponseNoError)
+
+	errorMessage := "error"
+	partitionError := "error2"
+	response.ErrorCode = 12
+	response.ErrorMessage = &errorMessage
+	response.AddError("topic", 1, 3, &partitionError)
+
+	testResponse(t, "with error", response, alterPartitionReassignmentsResponseWithError)
+}

+ 4 - 0
api_versions_request.go

@@ -20,6 +20,10 @@ func (a *ApiVersionsRequest) version() int16 {
 	return 0
 }
 
+func (a *ApiVersionsRequest) headerVersion() int16 {
+	return 1
+}
+
 func (a *ApiVersionsRequest) requiredVersion() KafkaVersion {
 	return V0_10_0_0
 }

+ 4 - 0
api_versions_response.go

@@ -84,6 +84,10 @@ func (r *ApiVersionsResponse) version() int16 {
 	return 0
 }
 
+func (a *ApiVersionsResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *ApiVersionsResponse) requiredVersion() KafkaVersion {
 	return V0_10_0_0
 }

+ 1 - 1
async_producer_test.go

@@ -988,7 +988,7 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) {
 		lastBatchFirstSeq := -1
 		lastBatchSize := -1
 		lastSequenceWrittenToDisk := -1
-		handlerFailBeforeWrite := func(req *request) (res encoder) {
+		handlerFailBeforeWrite := func(req *request) (res encoderWithHeader) {
 			switch req.body.key() {
 			case 3:
 				return metadataResponse

+ 52 - 9
broker.go

@@ -119,6 +119,7 @@ type SCRAMClient interface {
 type responsePromise struct {
 	requestTime   time.Time
 	correlationID int32
+	headerVersion int16
 	packets       chan []byte
 	errors        chan error
 }
@@ -513,6 +514,32 @@ func (b *Broker) CreatePartitions(request *CreatePartitionsRequest) (*CreatePart
 	return response, nil
 }
 
+//AlterPartitionReassignments sends a alter partition reassignments request and
+//returns alter partition reassignments response
+func (b *Broker) AlterPartitionReassignments(request *AlterPartitionReassignmentsRequest) (*AlterPartitionReassignmentsResponse, error) {
+	response := new(AlterPartitionReassignmentsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
+//ListPartitionReassignments sends a list partition reassignments request and
+//returns list partition reassignments response
+func (b *Broker) ListPartitionReassignments(request *ListPartitionReassignmentsRequest) (*ListPartitionReassignmentsResponse, error) {
+	response := new(ListPartitionReassignmentsResponse)
+
+	err := b.sendAndReceive(request, response)
+	if err != nil {
+		return nil, err
+	}
+
+	return response, nil
+}
+
 //DeleteRecords send a request to delete records and return delete record
 //response or error
 func (b *Broker) DeleteRecords(request *DeleteRecordsRequest) (*DeleteRecordsResponse, error) {
@@ -693,7 +720,7 @@ func (b *Broker) write(buf []byte) (n int, err error) {
 	return b.conn.Write(buf)
 }
 
-func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise, error) {
+func (b *Broker) send(rb protocolBody, promiseResponse bool, responseHeaderVersion int16) (*responsePromise, error) {
 	b.lock.Lock()
 	defer b.lock.Unlock()
 
@@ -731,14 +758,19 @@ func (b *Broker) send(rb protocolBody, promiseResponse bool) (*responsePromise,
 		return nil, nil
 	}
 
-	promise := responsePromise{requestTime, req.correlationID, make(chan []byte), make(chan error)}
+	promise := responsePromise{requestTime, req.correlationID, responseHeaderVersion, make(chan []byte), make(chan error)}
 	b.responses <- promise
 
 	return &promise, nil
 }
 
-func (b *Broker) sendAndReceive(req protocolBody, res versionedDecoder) error {
-	promise, err := b.send(req, res != nil)
+func (b *Broker) sendAndReceive(req protocolBody, res protocolBody) error {
+	responseHeaderVersion := int16(-1)
+	if res != nil {
+		responseHeaderVersion = res.headerVersion()
+	}
+
+	promise, err := b.send(req, res != nil, responseHeaderVersion)
 	if err != nil {
 		return err
 	}
@@ -818,7 +850,6 @@ func (b *Broker) encode(pe packetEncoder, version int16) (err error) {
 
 func (b *Broker) responseReceiver() {
 	var dead error
-	header := make([]byte, 8)
 
 	for response := range b.responses {
 		if dead != nil {
@@ -829,6 +860,9 @@ func (b *Broker) responseReceiver() {
 			continue
 		}
 
+		var headerLength = getHeaderLength(response.headerVersion)
+		header := make([]byte, headerLength)
+
 		bytesReadHeader, err := b.readFull(header)
 		requestLatency := time.Since(response.requestTime)
 		if err != nil {
@@ -839,7 +873,7 @@ func (b *Broker) responseReceiver() {
 		}
 
 		decodedHeader := responseHeader{}
-		err = decode(header, &decodedHeader)
+		err = versionedDecode(header, &decodedHeader, response.headerVersion)
 		if err != nil {
 			b.updateIncomingCommunicationMetrics(bytesReadHeader, requestLatency)
 			dead = err
@@ -855,7 +889,7 @@ func (b *Broker) responseReceiver() {
 			continue
 		}
 
-		buf := make([]byte, decodedHeader.length-4)
+		buf := make([]byte, decodedHeader.length-int32(headerLength)+4)
 		bytesReadBody, err := b.readFull(buf)
 		b.updateIncomingCommunicationMetrics(bytesReadHeader+bytesReadBody, requestLatency)
 		if err != nil {
@@ -869,6 +903,15 @@ func (b *Broker) responseReceiver() {
 	close(b.done)
 }
 
+func getHeaderLength(headerVersion int16) int8 {
+	if headerVersion < 1 {
+		return 8
+	} else {
+		// header contains additional tagged field length (0), we don't support actual tags yet.
+		return 9
+	}
+}
+
 func (b *Broker) authenticateViaSASL() error {
 	switch b.conf.Net.SASL.Mechanism {
 	case SASLTypeOAuth:
@@ -1180,7 +1223,7 @@ func (b *Broker) receiveSaslAuthenticateResponse(correlationID int32) ([]byte, e
 	}
 
 	header := responseHeader{}
-	err = decode(buf, &header)
+	err = versionedDecode(buf, &header, 0)
 	if err != nil {
 		return nil, err
 	}
@@ -1269,7 +1312,7 @@ func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correl
 	}
 
 	header := responseHeader{}
-	err = decode(buf, &header)
+	err = versionedDecode(buf, &header, 0)
 	if err != nil {
 		return bytesRead, err
 	}

+ 4 - 0
broker_test.go

@@ -42,6 +42,10 @@ func (m mockEncoder) encode(pe packetEncoder) error {
 	return pe.putRawBytes(m.bytes)
 }
 
+func (m mockEncoder) headerVersion() int16 {
+	return 0
+}
+
 type brokerMetrics struct {
 	bytesRead    int
 	bytesWritten int

+ 4 - 0
consumer_metadata_request.go

@@ -29,6 +29,10 @@ func (r *ConsumerMetadataRequest) version() int16 {
 	return 0
 }
 
+func (r *ConsumerMetadataRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *ConsumerMetadataRequest) requiredVersion() KafkaVersion {
 	return V0_8_2_0
 }

+ 4 - 0
consumer_metadata_response.go

@@ -73,6 +73,10 @@ func (r *ConsumerMetadataResponse) version() int16 {
 	return 0
 }
 
+func (r *ConsumerMetadataResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *ConsumerMetadataResponse) requiredVersion() KafkaVersion {
 	return V0_8_2_0
 }

+ 4 - 0
create_partitions_request.go

@@ -67,6 +67,10 @@ func (r *CreatePartitionsRequest) version() int16 {
 	return 0
 }
 
+func (r *CreatePartitionsRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *CreatePartitionsRequest) requiredVersion() KafkaVersion {
 	return V1_0_0_0
 }

+ 4 - 0
create_partitions_response.go

@@ -63,6 +63,10 @@ func (r *CreatePartitionsResponse) version() int16 {
 	return 0
 }
 
+func (r *CreatePartitionsResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *CreatePartitionsResponse) requiredVersion() KafkaVersion {
 	return V1_0_0_0
 }

+ 4 - 0
create_topics_request.go

@@ -79,6 +79,10 @@ func (c *CreateTopicsRequest) version() int16 {
 	return c.Version
 }
 
+func (r *CreateTopicsRequest) headerVersion() int16 {
+	return 1
+}
+
 func (c *CreateTopicsRequest) requiredVersion() KafkaVersion {
 	switch c.Version {
 	case 2:

+ 4 - 0
create_topics_response.go

@@ -70,6 +70,10 @@ func (c *CreateTopicsResponse) version() int16 {
 	return c.Version
 }
 
+func (c *CreateTopicsResponse) headerVersion() int16 {
+	return 0
+}
+
 func (c *CreateTopicsResponse) requiredVersion() KafkaVersion {
 	switch c.Version {
 	case 2:

+ 4 - 0
delete_groups_request.go

@@ -21,6 +21,10 @@ func (r *DeleteGroupsRequest) version() int16 {
 	return 0
 }
 
+func (r *DeleteGroupsRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *DeleteGroupsRequest) requiredVersion() KafkaVersion {
 	return V1_1_0_0
 }

+ 4 - 0
delete_groups_response.go

@@ -65,6 +65,10 @@ func (r *DeleteGroupsResponse) version() int16 {
 	return 0
 }
 
+func (r *DeleteGroupsResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *DeleteGroupsResponse) requiredVersion() KafkaVersion {
 	return V1_1_0_0
 }

+ 4 - 0
delete_records_request.go

@@ -77,6 +77,10 @@ func (d *DeleteRecordsRequest) version() int16 {
 	return 0
 }
 
+func (d *DeleteRecordsRequest) headerVersion() int16 {
+	return 1
+}
+
 func (d *DeleteRecordsRequest) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 4 - 0
delete_records_response.go

@@ -80,6 +80,10 @@ func (d *DeleteRecordsResponse) version() int16 {
 	return 0
 }
 
+func (d *DeleteRecordsResponse) headerVersion() int16 {
+	return 0
+}
+
 func (d *DeleteRecordsResponse) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 4 - 0
delete_topics_request.go

@@ -38,6 +38,10 @@ func (d *DeleteTopicsRequest) version() int16 {
 	return d.Version
 }
 
+func (d *DeleteTopicsRequest) headerVersion() int16 {
+	return 1
+}
+
 func (d *DeleteTopicsRequest) requiredVersion() KafkaVersion {
 	switch d.Version {
 	case 1:

+ 4 - 0
delete_topics_response.go

@@ -68,6 +68,10 @@ func (d *DeleteTopicsResponse) version() int16 {
 	return d.Version
 }
 
+func (d *DeleteTopicsResponse) headerVersion() int16 {
+	return 0
+}
+
 func (d *DeleteTopicsResponse) requiredVersion() KafkaVersion {
 	switch d.Version {
 	case 1:

+ 4 - 0
describe_configs_request.go

@@ -100,6 +100,10 @@ func (r *DescribeConfigsRequest) version() int16 {
 	return r.Version
 }
 
+func (r *DescribeConfigsRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *DescribeConfigsRequest) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 1:

+ 4 - 0
describe_configs_response.go

@@ -112,6 +112,10 @@ func (r *DescribeConfigsResponse) version() int16 {
 	return r.Version
 }
 
+func (r *DescribeConfigsResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 1:

+ 4 - 0
describe_groups_request.go

@@ -21,6 +21,10 @@ func (r *DescribeGroupsRequest) version() int16 {
 	return 0
 }
 
+func (r *DescribeGroupsRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *DescribeGroupsRequest) requiredVersion() KafkaVersion {
 	return V0_9_0_0
 }

+ 4 - 0
describe_groups_response.go

@@ -43,6 +43,10 @@ func (r *DescribeGroupsResponse) version() int16 {
 	return 0
 }
 
+func (r *DescribeGroupsResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *DescribeGroupsResponse) requiredVersion() KafkaVersion {
 	return V0_9_0_0
 }

+ 4 - 0
describe_log_dirs_request.go

@@ -78,6 +78,10 @@ func (r *DescribeLogDirsRequest) version() int16 {
 	return r.Version
 }
 
+func (r *DescribeLogDirsRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *DescribeLogDirsRequest) requiredVersion() KafkaVersion {
 	return V1_0_0_0
 }

+ 4 - 0
describe_log_dirs_response.go

@@ -61,6 +61,10 @@ func (r *DescribeLogDirsResponse) version() int16 {
 	return r.Version
 }
 
+func (r *DescribeLogDirsResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *DescribeLogDirsResponse) requiredVersion() KafkaVersion {
 	return V1_0_0_0
 }

+ 5 - 0
encoder_decoder.go

@@ -12,6 +12,11 @@ type encoder interface {
 	encode(pe packetEncoder) error
 }
 
+type encoderWithHeader interface {
+	encoder
+	headerVersion() int16
+}
+
 // Encode takes an Encoder and turns it into bytes while potentially recording metrics.
 func encode(e encoder, metricRegistry metrics.Registry) ([]byte, error) {
 	if e == nil {

+ 4 - 0
end_txn_request.go

@@ -45,6 +45,10 @@ func (a *EndTxnRequest) version() int16 {
 	return 0
 }
 
+func (r *EndTxnRequest) headerVersion() int16 {
+	return 1
+}
+
 func (a *EndTxnRequest) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 4 - 0
end_txn_response.go

@@ -39,6 +39,10 @@ func (e *EndTxnResponse) version() int16 {
 	return 0
 }
 
+func (r *EndTxnResponse) headerVersion() int16 {
+	return 0
+}
+
 func (e *EndTxnResponse) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 16 - 0
errors.go

@@ -94,6 +94,14 @@ func (mErr MultiError) Error() string {
 	return errString
 }
 
+func (mErr MultiError) PrettyError() string {
+	var errString = ""
+	for _, err := range *mErr.Errors {
+		errString += err.Error() + "\n"
+	}
+	return errString
+}
+
 // ErrDeleteRecords is the type of error returned when fail to delete the required records
 type ErrDeleteRecords struct {
 	MultiError
@@ -103,6 +111,14 @@ func (err ErrDeleteRecords) Error() string {
 	return "kafka server: failed to delete records " + err.MultiError.Error()
 }
 
+type ErrReassignPartitions struct {
+	MultiError
+}
+
+func (err ErrReassignPartitions) Error() string {
+	return fmt.Sprintf("failed to reassign partitions for topic: \n%s", err.MultiError.PrettyError())
+}
+
 // Numeric error codes returned by the Kafka server.
 const (
 	ErrNoError                            KError = 0

+ 4 - 0
fetch_request.go

@@ -239,6 +239,10 @@ func (r *FetchRequest) version() int16 {
 	return r.Version
 }
 
+func (r *FetchRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *FetchRequest) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 0:

+ 4 - 0
fetch_response.go

@@ -335,6 +335,10 @@ func (r *FetchResponse) version() int16 {
 	return r.Version
 }
 
+func (r *FetchResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *FetchResponse) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 0:

+ 4 - 0
find_coordinator_request.go

@@ -51,6 +51,10 @@ func (f *FindCoordinatorRequest) version() int16 {
 	return f.Version
 }
 
+func (r *FindCoordinatorRequest) headerVersion() int16 {
+	return 1
+}
+
 func (f *FindCoordinatorRequest) requiredVersion() KafkaVersion {
 	switch f.Version {
 	case 1:

+ 4 - 0
find_coordinator_response.go

@@ -82,6 +82,10 @@ func (f *FindCoordinatorResponse) version() int16 {
 	return f.Version
 }
 
+func (r *FindCoordinatorResponse) headerVersion() int16 {
+	return 0
+}
+
 func (f *FindCoordinatorResponse) requiredVersion() KafkaVersion {
 	switch f.Version {
 	case 1:

+ 4 - 0
heartbeat_request.go

@@ -42,6 +42,10 @@ func (r *HeartbeatRequest) version() int16 {
 	return 0
 }
 
+func (r *HeartbeatRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *HeartbeatRequest) requiredVersion() KafkaVersion {
 	return V0_9_0_0
 }

+ 4 - 0
heartbeat_response.go

@@ -27,6 +27,10 @@ func (r *HeartbeatResponse) version() int16 {
 	return 0
 }
 
+func (r *HeartbeatResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *HeartbeatResponse) requiredVersion() KafkaVersion {
 	return V0_9_0_0
 }

+ 4 - 0
init_producer_id_request.go

@@ -38,6 +38,10 @@ func (i *InitProducerIDRequest) version() int16 {
 	return 0
 }
 
+func (i *InitProducerIDRequest) headerVersion() int16 {
+	return 1
+}
+
 func (i *InitProducerIDRequest) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 4 - 0
init_producer_id_response.go

@@ -50,6 +50,10 @@ func (i *InitProducerIDResponse) version() int16 {
 	return 0
 }
 
+func (i *InitProducerIDResponse) headerVersion() int16 {
+	return 0
+}
+
 func (i *InitProducerIDResponse) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 4 - 0
join_group_request.go

@@ -134,6 +134,10 @@ func (r *JoinGroupRequest) version() int16 {
 	return r.Version
 }
 
+func (r *JoinGroupRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *JoinGroupRequest) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 2:

+ 4 - 0
join_group_response.go

@@ -123,6 +123,10 @@ func (r *JoinGroupResponse) version() int16 {
 	return r.Version
 }
 
+func (r *JoinGroupResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *JoinGroupResponse) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 2:

+ 4 - 0
leave_group_request.go

@@ -35,6 +35,10 @@ func (r *LeaveGroupRequest) version() int16 {
 	return 0
 }
 
+func (r *LeaveGroupRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *LeaveGroupRequest) requiredVersion() KafkaVersion {
 	return V0_9_0_0
 }

+ 4 - 0
leave_group_response.go

@@ -27,6 +27,10 @@ func (r *LeaveGroupResponse) version() int16 {
 	return 0
 }
 
+func (r *LeaveGroupResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *LeaveGroupResponse) requiredVersion() KafkaVersion {
 	return V0_9_0_0
 }

+ 4 - 0
list_groups_request.go

@@ -19,6 +19,10 @@ func (r *ListGroupsRequest) version() int16 {
 	return 0
 }
 
+func (r *ListGroupsRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *ListGroupsRequest) requiredVersion() KafkaVersion {
 	return V0_9_0_0
 }

+ 4 - 0
list_groups_response.go

@@ -64,6 +64,10 @@ func (r *ListGroupsResponse) version() int16 {
 	return 0
 }
 
+func (r *ListGroupsResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *ListGroupsResponse) requiredVersion() KafkaVersion {
 	return V0_9_0_0
 }

+ 98 - 0
list_partition_reassignments_request.go

@@ -0,0 +1,98 @@
+package sarama
+
+type ListPartitionReassignmentsRequest struct {
+	TimeoutMs int32
+	blocks    map[string][]int32
+	Version   int16
+}
+
+func (r *ListPartitionReassignmentsRequest) encode(pe packetEncoder) error {
+	pe.putInt32(r.TimeoutMs)
+
+	pe.putCompactArrayLength(len(r.blocks))
+
+	for topic, partitions := range r.blocks {
+		if err := pe.putCompactString(topic); err != nil {
+			return err
+		}
+
+		if err := pe.putCompactInt32Array(partitions); err != nil {
+			return err
+		}
+
+		pe.putEmptyTaggedFieldArray()
+	}
+
+	pe.putEmptyTaggedFieldArray()
+
+	return nil
+}
+
+func (r *ListPartitionReassignmentsRequest) decode(pd packetDecoder, version int16) (err error) {
+	r.Version = version
+
+	if r.TimeoutMs, err = pd.getInt32(); err != nil {
+		return err
+	}
+
+	topicCount, err := pd.getCompactArrayLength()
+	if err != nil {
+		return err
+	}
+	if topicCount > 0 {
+		r.blocks = make(map[string][]int32)
+		for i := 0; i < topicCount; i++ {
+			topic, err := pd.getCompactString()
+			if err != nil {
+				return err
+			}
+			partitionCount, err := pd.getCompactArrayLength()
+			if err != nil {
+				return err
+			}
+			r.blocks[topic] = make([]int32, partitionCount)
+			for j := 0; j < partitionCount; j++ {
+				partition, err := pd.getInt32()
+				if err != nil {
+					return err
+				}
+				r.blocks[topic][j] = partition
+			}
+			if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+				return err
+			}
+		}
+	}
+
+	if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+		return err
+	}
+
+	return
+}
+
+func (r *ListPartitionReassignmentsRequest) key() int16 {
+	return 46
+}
+
+func (r *ListPartitionReassignmentsRequest) version() int16 {
+	return r.Version
+}
+
+func (r *ListPartitionReassignmentsRequest) headerVersion() int16 {
+	return 2
+}
+
+func (r *ListPartitionReassignmentsRequest) requiredVersion() KafkaVersion {
+	return V2_4_0_0
+}
+
+func (r *ListPartitionReassignmentsRequest) AddBlock(topic string, partitionIDs []int32) {
+	if r.blocks == nil {
+		r.blocks = make(map[string][]int32)
+	}
+
+	if r.blocks[topic] == nil {
+		r.blocks[topic] = partitionIDs
+	}
+}

+ 31 - 0
list_partition_reassignments_request_test.go

@@ -0,0 +1,31 @@
+package sarama
+
+import "testing"
+
+var (
+	listPartitionReassignmentsRequestOneBlock = []byte{
+		0, 0, 39, 16, // timout 10000
+		2,                         // 2-1=1 block
+		6, 116, 111, 112, 105, 99, // topic name "topic" as compact string
+		2,          // 2-1=1 partitions
+		0, 0, 0, 0, // partitionId
+		0, 0, // empty tagged fields
+	}
+)
+
+func TestListPartitionReassignmentRequest(t *testing.T) {
+	var request *ListPartitionReassignmentsRequest
+
+	request = &ListPartitionReassignmentsRequest{
+		TimeoutMs: int32(10000),
+		Version:   int16(0),
+	}
+
+	request.AddBlock("topic", []int32{0})
+
+	testRequest(t, "one block", request, listPartitionReassignmentsRequestOneBlock)
+
+	request.AddBlock("topic2", []int32{1, 2})
+
+	testRequestWithoutByteComparison(t, "two blocks", request)
+}

+ 169 - 0
list_partition_reassignments_response.go

@@ -0,0 +1,169 @@
+package sarama
+
+type PartitionReplicaReassignmentsStatus struct {
+	replicas         []int32
+	addingReplicas   []int32
+	removingReplicas []int32
+}
+
+func (b *PartitionReplicaReassignmentsStatus) encode(pe packetEncoder) error {
+	if err := pe.putCompactInt32Array(b.replicas); err != nil {
+		return err
+	}
+	if err := pe.putCompactInt32Array(b.addingReplicas); err != nil {
+		return err
+	}
+	if err := pe.putCompactInt32Array(b.removingReplicas); err != nil {
+		return err
+	}
+
+	pe.putEmptyTaggedFieldArray()
+
+	return nil
+}
+
+func (b *PartitionReplicaReassignmentsStatus) decode(pd packetDecoder) (err error) {
+	if b.replicas, err = pd.getCompactInt32Array(); err != nil {
+		return err
+	}
+
+	if b.addingReplicas, err = pd.getCompactInt32Array(); err != nil {
+		return err
+	}
+
+	if b.removingReplicas, err = pd.getCompactInt32Array(); err != nil {
+		return err
+	}
+
+	if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+		return err
+	}
+
+	return err
+}
+
+type ListPartitionReassignmentsResponse struct {
+	Version        int16
+	ThrottleTimeMs int32
+	ErrorCode      KError
+	ErrorMessage   *string
+	TopicStatus    map[string]map[int32]*PartitionReplicaReassignmentsStatus
+}
+
+func (r *ListPartitionReassignmentsResponse) AddBlock(topic string, partition int32, replicas, addingReplicas, removingReplicas []int32) {
+	if r.TopicStatus == nil {
+		r.TopicStatus = make(map[string]map[int32]*PartitionReplicaReassignmentsStatus)
+	}
+	partitions := r.TopicStatus[topic]
+	if partitions == nil {
+		partitions = make(map[int32]*PartitionReplicaReassignmentsStatus)
+		r.TopicStatus[topic] = partitions
+	}
+
+	partitions[partition] = &PartitionReplicaReassignmentsStatus{replicas: replicas, addingReplicas: addingReplicas, removingReplicas: removingReplicas}
+}
+
+func (r *ListPartitionReassignmentsResponse) encode(pe packetEncoder) error {
+	pe.putInt32(r.ThrottleTimeMs)
+	pe.putInt16(int16(r.ErrorCode))
+	if err := pe.putNullableCompactString(r.ErrorMessage); err != nil {
+		return err
+	}
+
+	pe.putCompactArrayLength(len(r.TopicStatus))
+	for topic, partitions := range r.TopicStatus {
+		if err := pe.putCompactString(topic); err != nil {
+			return err
+		}
+		pe.putCompactArrayLength(len(partitions))
+		for partition, block := range partitions {
+			pe.putInt32(partition)
+
+			if err := block.encode(pe); err != nil {
+				return err
+			}
+		}
+		pe.putEmptyTaggedFieldArray()
+	}
+
+	pe.putEmptyTaggedFieldArray()
+
+	return nil
+}
+
+func (r *ListPartitionReassignmentsResponse) decode(pd packetDecoder, version int16) (err error) {
+	r.Version = version
+
+	if r.ThrottleTimeMs, err = pd.getInt32(); err != nil {
+		return err
+	}
+
+	kerr, err := pd.getInt16()
+	if err != nil {
+		return err
+	}
+
+	r.ErrorCode = KError(kerr)
+
+	if r.ErrorMessage, err = pd.getCompactNullableString(); err != nil {
+		return err
+	}
+
+	numTopics, err := pd.getCompactArrayLength()
+	if err != nil || numTopics == 0 {
+		return err
+	}
+
+	r.TopicStatus = make(map[string]map[int32]*PartitionReplicaReassignmentsStatus, numTopics)
+	for i := 0; i < numTopics; i++ {
+		topic, err := pd.getCompactString()
+		if err != nil {
+			return err
+		}
+
+		ongoingPartitionReassignments, err := pd.getCompactArrayLength()
+		if err != nil {
+			return err
+		}
+
+		r.TopicStatus[topic] = make(map[int32]*PartitionReplicaReassignmentsStatus, ongoingPartitionReassignments)
+
+		for j := 0; j < ongoingPartitionReassignments; j++ {
+			partition, err := pd.getInt32()
+			if err != nil {
+				return err
+			}
+
+			block := &PartitionReplicaReassignmentsStatus{}
+			if err := block.decode(pd); err != nil {
+				return err
+			}
+			r.TopicStatus[topic][partition] = block
+		}
+
+		if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+			return err
+		}
+	}
+	if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func (r *ListPartitionReassignmentsResponse) key() int16 {
+	return 46
+}
+
+func (r *ListPartitionReassignmentsResponse) version() int16 {
+	return r.Version
+}
+
+func (r *ListPartitionReassignmentsResponse) headerVersion() int16 {
+	return 1
+}
+
+func (r *ListPartitionReassignmentsResponse) requiredVersion() KafkaVersion {
+	return V2_4_0_0
+}

+ 32 - 0
list_partition_reassignments_response_test.go

@@ -0,0 +1,32 @@
+package sarama
+
+import "testing"
+
+var (
+	listPartitionReassignmentsResponse = []byte{
+		0, 0, 39, 16, // ThrottleTimeMs 10000
+		0, 0, // errorcode
+		0,                         // null string
+		2,                         // block array length 1
+		6, 116, 111, 112, 105, 99, // topic name "topic"
+		2,          // partition array length 1
+		0, 0, 0, 1, // partitionId
+		3, 0, 0, 3, 232, 0, 0, 3, 233, // replicas [1000, 1001]
+		3, 0, 0, 3, 234, 0, 0, 3, 235, // addingReplicas [1002, 1003]
+		3, 0, 0, 3, 236, 0, 0, 3, 237, // addingReplicas [1004, 1005]
+		0, 0, 0, // empty tagged fields
+	}
+)
+
+func TestListPartitionReassignmentResponse(t *testing.T) {
+	var response *ListPartitionReassignmentsResponse
+
+	response = &ListPartitionReassignmentsResponse{
+		ThrottleTimeMs: int32(10000),
+		Version:        int16(0),
+	}
+
+	response.AddBlock("topic", 1, []int32{1000, 1001}, []int32{1002, 1003}, []int32{1004, 1005})
+
+	testResponse(t, "one topic", response, listPartitionReassignmentsResponse)
+}

+ 4 - 0
metadata_request.go

@@ -65,6 +65,10 @@ func (r *MetadataRequest) version() int16 {
 	return r.Version
 }
 
+func (r *MetadataRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *MetadataRequest) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 1:

+ 4 - 0
metadata_response.go

@@ -255,6 +255,10 @@ func (r *MetadataResponse) version() int16 {
 	return r.Version
 }
 
+func (r *MetadataResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *MetadataResponse) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 1:

+ 25 - 9
mockbroker.go

@@ -20,7 +20,7 @@ const (
 
 type GSSApiHandlerFunc func([]byte) []byte
 
-type requestHandlerFunc func(req *request) (res encoder)
+type requestHandlerFunc func(req *request) (res encoderWithHeader)
 
 // RequestNotifierFunc is invoked when a mock broker processes a request successfully
 // and will provides the number of bytes read and written.
@@ -55,7 +55,7 @@ type MockBroker struct {
 	port          int32
 	closing       chan none
 	stopper       chan none
-	expectations  chan encoder
+	expectations  chan encoderWithHeader
 	listener      net.Listener
 	t             TestReporter
 	latency       time.Duration
@@ -83,7 +83,7 @@ func (b *MockBroker) SetLatency(latency time.Duration) {
 // and uses the found MockResponse instance to generate an appropriate reply.
 // If the request type is not found in the map then nothing is sent.
 func (b *MockBroker) SetHandlerByMap(handlerMap map[string]MockResponse) {
-	b.setHandler(func(req *request) (res encoder) {
+	b.setHandler(func(req *request) (res encoderWithHeader) {
 		reqTypeName := reflect.TypeOf(req.body).Elem().Name()
 		mockResponse := handlerMap[reqTypeName]
 		if mockResponse == nil {
@@ -231,7 +231,6 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W
 		}
 	}()
 
-	resHeader := make([]byte, 8)
 	var bytesWritten int
 	var bytesRead int
 	for {
@@ -281,8 +280,7 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W
 				continue
 			}
 
-			binary.BigEndian.PutUint32(resHeader, uint32(len(encodedRes)+4))
-			binary.BigEndian.PutUint32(resHeader[4:], uint32(req.correlationID))
+			resHeader := b.encodeHeader(res.headerVersion(), req.correlationID, uint32(len(encodedRes)))
 			if _, err = conn.Write(resHeader); err != nil {
 				b.serverError(err)
 				break
@@ -318,7 +316,25 @@ func (b *MockBroker) handleRequests(conn io.ReadWriteCloser, idx int, wg *sync.W
 	Logger.Printf("*** mockbroker/%d/%d: connection closed, err=%v", b.BrokerID(), idx, err)
 }
 
-func (b *MockBroker) defaultRequestHandler(req *request) (res encoder) {
+func (b *MockBroker) encodeHeader(headerVersion int16, correlationId int32, payloadLength uint32) []byte {
+	headerLength := uint32(8)
+
+	if headerVersion >= 1 {
+		headerLength = 9
+	}
+
+	resHeader := make([]byte, headerLength)
+	binary.BigEndian.PutUint32(resHeader, payloadLength+headerLength-4)
+	binary.BigEndian.PutUint32(resHeader[4:], uint32(correlationId))
+
+	if headerVersion >= 1 {
+		binary.PutUvarint(resHeader[8:], 0)
+	}
+
+	return resHeader
+}
+
+func (b *MockBroker) defaultRequestHandler(req *request) (res encoderWithHeader) {
 	select {
 	case res, ok := <-b.expectations:
 		if !ok {
@@ -373,7 +389,7 @@ func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener
 		stopper:      make(chan none),
 		t:            t,
 		brokerID:     brokerID,
-		expectations: make(chan encoder, 512),
+		expectations: make(chan encoderWithHeader, 512),
 		listener:     listener,
 	}
 	broker.handler = broker.defaultRequestHandler
@@ -394,6 +410,6 @@ func NewMockBrokerListener(t TestReporter, brokerID int32, listener net.Listener
 	return broker
 }
 
-func (b *MockBroker) Returns(e encoder) {
+func (b *MockBroker) Returns(e encoderWithHeader) {
 	b.expectations <- e
 }

+ 65 - 28
mockresponses.go

@@ -18,20 +18,20 @@ type TestReporter interface {
 // allows generating a response based on a request body. MockResponses are used
 // to program behavior of MockBroker in tests.
 type MockResponse interface {
-	For(reqBody versionedDecoder) (res encoder)
+	For(reqBody versionedDecoder) (res encoderWithHeader)
 }
 
 // MockWrapper is a mock response builder that returns a particular concrete
 // response regardless of the actual request passed to the `For` method.
 type MockWrapper struct {
-	res encoder
+	res encoderWithHeader
 }
 
-func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoder) {
+func (mw *MockWrapper) For(reqBody versionedDecoder) (res encoderWithHeader) {
 	return mw.res
 }
 
-func NewMockWrapper(res encoder) *MockWrapper {
+func NewMockWrapper(res encoderWithHeader) *MockWrapper {
 	return &MockWrapper{res: res}
 }
 
@@ -50,7 +50,7 @@ func NewMockSequence(responses ...interface{}) *MockSequence {
 		switch res := res.(type) {
 		case MockResponse:
 			ms.responses[i] = res
-		case encoder:
+		case encoderWithHeader:
 			ms.responses[i] = NewMockWrapper(res)
 		default:
 			panic(fmt.Sprintf("Unexpected response type: %T", res))
@@ -59,7 +59,7 @@ func NewMockSequence(responses ...interface{}) *MockSequence {
 	return ms
 }
 
-func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
+func (mc *MockSequence) For(reqBody versionedDecoder) (res encoderWithHeader) {
 	res = mc.responses[0].For(reqBody)
 	if len(mc.responses) > 1 {
 		mc.responses = mc.responses[1:]
@@ -79,7 +79,7 @@ func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
 	}
 }
 
-func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoder {
+func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	request := reqBody.(*ListGroupsRequest)
 	_ = request
 	response := &ListGroupsResponse{
@@ -110,7 +110,7 @@ func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, descrip
 	return m
 }
 
-func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoder {
+func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	request := reqBody.(*DescribeGroupsRequest)
 
 	response := &DescribeGroupsResponse{}
@@ -166,7 +166,7 @@ func (mmr *MockMetadataResponse) SetController(brokerID int32) *MockMetadataResp
 	return mmr
 }
 
-func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
+func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	metadataRequest := reqBody.(*MetadataRequest)
 	metadataResponse := &MetadataResponse{
 		Version:      metadataRequest.version(),
@@ -233,7 +233,7 @@ func (mor *MockOffsetResponse) SetOffset(topic string, partition int32, time, of
 	return mor
 }
 
-func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoder {
+func (mor *MockOffsetResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	offsetRequest := reqBody.(*OffsetRequest)
 	offsetResponse := &OffsetResponse{Version: mor.version}
 	for topic, partitions := range offsetRequest.blocks {
@@ -309,7 +309,7 @@ func (mfr *MockFetchResponse) SetHighWaterMark(topic string, partition int32, of
 	return mfr
 }
 
-func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoder {
+func (mfr *MockFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	fetchRequest := reqBody.(*FetchRequest)
 	res := &FetchResponse{
 		Version: mfr.version,
@@ -393,7 +393,7 @@ func (mr *MockConsumerMetadataResponse) SetError(group string, kerror KError) *M
 	return mr
 }
 
-func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockConsumerMetadataResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*ConsumerMetadataRequest)
 	group := req.ConsumerGroup
 	res := &ConsumerMetadataResponse{}
@@ -442,7 +442,7 @@ func (mr *MockFindCoordinatorResponse) SetError(coordinatorType CoordinatorType,
 	return mr
 }
 
-func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockFindCoordinatorResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*FindCoordinatorRequest)
 	res := &FindCoordinatorResponse{}
 	var v interface{}
@@ -489,7 +489,7 @@ func (mr *MockOffsetCommitResponse) SetError(group, topic string, partition int3
 	return mr
 }
 
-func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockOffsetCommitResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*OffsetCommitRequest)
 	group := req.ConsumerGroup
 	res := &OffsetCommitResponse{}
@@ -546,7 +546,7 @@ func (mr *MockProduceResponse) SetError(topic string, partition int32, kerror KE
 	return mr
 }
 
-func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockProduceResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*ProduceRequest)
 	res := &ProduceResponse{
 		Version: mr.version,
@@ -605,7 +605,7 @@ func (mr *MockOffsetFetchResponse) SetError(kerror KError) *MockOffsetFetchRespo
 	return mr
 }
 
-func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockOffsetFetchResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*OffsetFetchRequest)
 	group := req.ConsumerGroup
 	res := &OffsetFetchResponse{Version: req.Version}
@@ -630,7 +630,7 @@ func NewMockCreateTopicsResponse(t TestReporter) *MockCreateTopicsResponse {
 	return &MockCreateTopicsResponse{t: t}
 }
 
-func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockCreateTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*CreateTopicsRequest)
 	res := &CreateTopicsResponse{
 		Version: req.Version,
@@ -659,7 +659,7 @@ func NewMockDeleteTopicsResponse(t TestReporter) *MockDeleteTopicsResponse {
 	return &MockDeleteTopicsResponse{t: t}
 }
 
-func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockDeleteTopicsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*DeleteTopicsRequest)
 	res := &DeleteTopicsResponse{}
 	res.TopicErrorCodes = make(map[string]KError)
@@ -679,7 +679,7 @@ func NewMockCreatePartitionsResponse(t TestReporter) *MockCreatePartitionsRespon
 	return &MockCreatePartitionsResponse{t: t}
 }
 
-func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*CreatePartitionsRequest)
 	res := &CreatePartitionsResponse{}
 	res.TopicPartitionErrors = make(map[string]*TopicPartitionError)
@@ -698,6 +698,43 @@ func (mr *MockCreatePartitionsResponse) For(reqBody versionedDecoder) encoder {
 	return res
 }
 
+type MockAlterPartitionReassignmentsResponse struct {
+	t TestReporter
+}
+
+func NewMockAlterPartitionReassignmentsResponse(t TestReporter) *MockAlterPartitionReassignmentsResponse {
+	return &MockAlterPartitionReassignmentsResponse{t: t}
+}
+
+func (mr *MockAlterPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
+	req := reqBody.(*AlterPartitionReassignmentsRequest)
+	_ = req
+	res := &AlterPartitionReassignmentsResponse{}
+	return res
+}
+
+type MockListPartitionReassignmentsResponse struct {
+	t TestReporter
+}
+
+func NewMockListPartitionReassignmentsResponse(t TestReporter) *MockListPartitionReassignmentsResponse {
+	return &MockListPartitionReassignmentsResponse{t: t}
+}
+
+func (mr *MockListPartitionReassignmentsResponse) For(reqBody versionedDecoder) encoderWithHeader {
+	req := reqBody.(*ListPartitionReassignmentsRequest)
+	_ = req
+	res := &ListPartitionReassignmentsResponse{}
+
+	for topic, partitions := range req.blocks {
+		for _, partition := range partitions {
+			res.AddBlock(topic, partition, []int32{0}, []int32{1}, []int32{2})
+		}
+	}
+
+	return res
+}
+
 type MockDeleteRecordsResponse struct {
 	t TestReporter
 }
@@ -706,7 +743,7 @@ func NewMockDeleteRecordsResponse(t TestReporter) *MockDeleteRecordsResponse {
 	return &MockDeleteRecordsResponse{t: t}
 }
 
-func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockDeleteRecordsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*DeleteRecordsRequest)
 	res := &DeleteRecordsResponse{}
 	res.Topics = make(map[string]*DeleteRecordsResponseTopic)
@@ -729,7 +766,7 @@ func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse
 	return &MockDescribeConfigsResponse{t: t}
 }
 
-func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*DescribeConfigsRequest)
 	res := &DescribeConfigsResponse{
 		Version: req.Version,
@@ -824,7 +861,7 @@ func NewMockAlterConfigsResponse(t TestReporter) *MockAlterConfigsResponse {
 	return &MockAlterConfigsResponse{t: t}
 }
 
-func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*AlterConfigsRequest)
 	res := &AlterConfigsResponse{}
 
@@ -845,7 +882,7 @@ func NewMockCreateAclsResponse(t TestReporter) *MockCreateAclsResponse {
 	return &MockCreateAclsResponse{t: t}
 }
 
-func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockCreateAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*CreateAclsRequest)
 	res := &CreateAclsResponse{}
 
@@ -863,7 +900,7 @@ func NewMockListAclsResponse(t TestReporter) *MockListAclsResponse {
 	return &MockListAclsResponse{t: t}
 }
 
-func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockListAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*DescribeAclsRequest)
 	res := &DescribeAclsResponse{}
 	res.Err = ErrNoError
@@ -905,7 +942,7 @@ func NewMockSaslAuthenticateResponse(t TestReporter) *MockSaslAuthenticateRespon
 	return &MockSaslAuthenticateResponse{t: t}
 }
 
-func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoder {
+func (msar *MockSaslAuthenticateResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	res := &SaslAuthenticateResponse{}
 	res.Err = msar.kerror
 	res.SaslAuthBytes = msar.saslAuthBytes
@@ -936,7 +973,7 @@ func NewMockSaslHandshakeResponse(t TestReporter) *MockSaslHandshakeResponse {
 	return &MockSaslHandshakeResponse{t: t}
 }
 
-func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoder {
+func (mshr *MockSaslHandshakeResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	res := &SaslHandshakeResponse{}
 	res.Err = mshr.kerror
 	res.EnabledMechanisms = mshr.enabledMechanisms
@@ -957,7 +994,7 @@ func NewMockDeleteAclsResponse(t TestReporter) *MockDeleteAclsResponse {
 	return &MockDeleteAclsResponse{t: t}
 }
 
-func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoder {
+func (mr *MockDeleteAclsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	req := reqBody.(*DeleteAclsRequest)
 	res := &DeleteAclsResponse{}
 
@@ -983,7 +1020,7 @@ func (m *MockDeleteGroupsResponse) SetDeletedGroups(groups []string) *MockDelete
 	return m
 }
 
-func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoder {
+func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHeader {
 	resp := &DeleteGroupsResponse{
 		GroupErrorCodes: map[string]KError{},
 	}

+ 4 - 0
offset_commit_request.go

@@ -170,6 +170,10 @@ func (r *OffsetCommitRequest) version() int16 {
 	return r.Version
 }
 
+func (r *OffsetCommitRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *OffsetCommitRequest) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 1:

+ 4 - 0
offset_commit_response.go

@@ -94,6 +94,10 @@ func (r *OffsetCommitResponse) version() int16 {
 	return r.Version
 }
 
+func (r *OffsetCommitResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *OffsetCommitResponse) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 1:

+ 4 - 0
offset_fetch_request.go

@@ -68,6 +68,10 @@ func (r *OffsetFetchRequest) version() int16 {
 	return r.Version
 }
 
+func (r *OffsetFetchRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 1:

+ 4 - 0
offset_fetch_response.go

@@ -155,6 +155,10 @@ func (r *OffsetFetchResponse) version() int16 {
 	return r.Version
 }
 
+func (r *OffsetFetchResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 1:

+ 3 - 3
offset_manager_test.go

@@ -134,7 +134,7 @@ func TestNewOffsetManagerOffsetsAutoCommit(t *testing.T) {
 
 			ocResponse := new(OffsetCommitResponse)
 			ocResponse.AddError("my_topic", 0, ErrNoError)
-			handler := func(req *request) (res encoder) {
+			handler := func(req *request) (res encoderWithHeader) {
 				close(called)
 				return ocResponse
 			}
@@ -329,7 +329,7 @@ func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
 
 	ocResponse := new(OffsetCommitResponse)
 	ocResponse.AddError("my_topic", 0, ErrNoError)
-	handler := func(req *request) (res encoder) {
+	handler := func(req *request) (res encoderWithHeader) {
 		if req.body.version() != 2 {
 			t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
 		}
@@ -390,7 +390,7 @@ func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
 
 	ocResponse := new(OffsetCommitResponse)
 	ocResponse.AddError("my_topic", 0, ErrNoError)
-	handler := func(req *request) (res encoder) {
+	handler := func(req *request) (res encoderWithHeader) {
 		if req.body.version() != 2 {
 			t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
 		}

+ 4 - 0
offset_request.go

@@ -116,6 +116,10 @@ func (r *OffsetRequest) version() int16 {
 	return r.Version
 }
 
+func (r *OffsetRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *OffsetRequest) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 1:

+ 4 - 0
offset_response.go

@@ -150,6 +150,10 @@ func (r *OffsetResponse) version() int16 {
 	return r.Version
 }
 
+func (r *OffsetResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *OffsetResponse) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 1:

+ 6 - 0
packet_decoder.go

@@ -10,8 +10,11 @@ type packetDecoder interface {
 	getInt32() (int32, error)
 	getInt64() (int64, error)
 	getVarint() (int64, error)
+	getUVarint() (uint64, error)
 	getArrayLength() (int, error)
+	getCompactArrayLength() (int, error)
 	getBool() (bool, error)
+	getEmptyTaggedFieldArray() (int, error)
 
 	// Collections
 	getBytes() ([]byte, error)
@@ -19,6 +22,9 @@ type packetDecoder interface {
 	getRawBytes(length int) ([]byte, error)
 	getString() (string, error)
 	getNullableString() (*string, error)
+	getCompactString() (string, error)
+	getCompactNullableString() (*string, error)
+	getCompactInt32Array() ([]int32, error)
 	getInt32Array() ([]int32, error)
 	getInt64Array() ([]int64, error)
 	getStringArray() ([]string, error)

+ 7 - 0
packet_encoder.go

@@ -12,6 +12,8 @@ type packetEncoder interface {
 	putInt32(in int32)
 	putInt64(in int64)
 	putVarint(in int64)
+	putUVarint(in uint64)
+	putCompactArrayLength(in int)
 	putArrayLength(in int) error
 	putBool(in bool)
 
@@ -19,11 +21,16 @@ type packetEncoder interface {
 	putBytes(in []byte) error
 	putVarintBytes(in []byte) error
 	putRawBytes(in []byte) error
+	putCompactString(in string) error
+	putNullableCompactString(in *string) error
 	putString(in string) error
 	putNullableString(in *string) error
 	putStringArray(in []string) error
+	putCompactInt32Array(in []int32) error
+	putNullableCompactInt32Array(in []int32) error
 	putInt32Array(in []int32) error
 	putInt64Array(in []int64) error
+	putEmptyTaggedFieldArray()
 
 	// Provide the current offset to record the batch size metric
 	offset() int

+ 49 - 0
prep_encoder.go

@@ -2,6 +2,7 @@ package sarama
 
 import (
 	"encoding/binary"
+	"errors"
 	"fmt"
 	"math"
 
@@ -36,6 +37,11 @@ func (pe *prepEncoder) putVarint(in int64) {
 	pe.length += binary.PutVarint(buf[:], in)
 }
 
+func (pe *prepEncoder) putUVarint(in uint64) {
+	var buf [binary.MaxVarintLen64]byte
+	pe.length += binary.PutUvarint(buf[:], in)
+}
+
 func (pe *prepEncoder) putArrayLength(in int) error {
 	if in > math.MaxInt32 {
 		return PacketEncodingError{fmt.Sprintf("array too long (%d)", in)}
@@ -44,6 +50,10 @@ func (pe *prepEncoder) putArrayLength(in int) error {
 	return nil
 }
 
+func (pe *prepEncoder) putCompactArrayLength(in int) {
+	pe.putUVarint(uint64(in + 1))
+}
+
 func (pe *prepEncoder) putBool(in bool) {
 	pe.length++
 }
@@ -67,6 +77,20 @@ func (pe *prepEncoder) putVarintBytes(in []byte) error {
 	return pe.putRawBytes(in)
 }
 
+func (pe *prepEncoder) putCompactString(in string) error {
+	pe.putCompactArrayLength(len(in))
+	return pe.putRawBytes([]byte(in))
+}
+
+func (pe *prepEncoder) putNullableCompactString(in *string) error {
+	if in == nil {
+		pe.putUVarint(0)
+		return nil
+	} else {
+		return pe.putCompactString(*in)
+	}
+}
+
 func (pe *prepEncoder) putRawBytes(in []byte) error {
 	if len(in) > math.MaxInt32 {
 		return PacketEncodingError{fmt.Sprintf("byteslice too long (%d)", len(in))}
@@ -107,6 +131,27 @@ func (pe *prepEncoder) putStringArray(in []string) error {
 	return nil
 }
 
+func (pe *prepEncoder) putCompactInt32Array(in []int32) error {
+	if in == nil {
+		return errors.New("expected int32 array to be non null")
+	}
+
+	pe.putUVarint(uint64(len(in)) + 1)
+	pe.length += 4 * len(in)
+	return nil
+}
+
+func (pe *prepEncoder) putNullableCompactInt32Array(in []int32) error {
+	if in == nil {
+		pe.putUVarint(0)
+		return nil
+	}
+
+	pe.putUVarint(uint64(len(in)) + 1)
+	pe.length += 4 * len(in)
+	return nil
+}
+
 func (pe *prepEncoder) putInt32Array(in []int32) error {
 	err := pe.putArrayLength(len(in))
 	if err != nil {
@@ -125,6 +170,10 @@ func (pe *prepEncoder) putInt64Array(in []int64) error {
 	return nil
 }
 
+func (pe *prepEncoder) putEmptyTaggedFieldArray() {
+	pe.putUVarint(0)
+}
+
 func (pe *prepEncoder) offset() int {
 	return pe.length
 }

+ 4 - 0
produce_request.go

@@ -206,6 +206,10 @@ func (r *ProduceRequest) version() int16 {
 	return r.Version
 }
 
+func (r *ProduceRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *ProduceRequest) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 1:

+ 16 - 0
produce_response.go

@@ -163,6 +163,22 @@ func (r *ProduceResponse) encode(pe packetEncoder) error {
 	return nil
 }
 
+func (r *ProduceResponse) key() int16 {
+	return 0
+}
+
+func (r *ProduceResponse) version() int16 {
+	return r.Version
+}
+
+func (r *ProduceResponse) headerVersion() int16 {
+	return 0
+}
+
+func (r *ProduceResponse) requiredVersion() KafkaVersion {
+	return MinVersion
+}
+
 func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {
 	if r.Blocks == nil {
 		return nil

+ 96 - 0
real_decoder.go

@@ -9,7 +9,9 @@ var errInvalidArrayLength = PacketDecodingError{"invalid array length"}
 var errInvalidByteSliceLength = PacketDecodingError{"invalid byteslice length"}
 var errInvalidStringLength = PacketDecodingError{"invalid string length"}
 var errVarintOverflow = PacketDecodingError{"varint overflow"}
+var errUVarintOverflow = PacketDecodingError{"uvarint overflow"}
 var errInvalidBool = PacketDecodingError{"invalid bool"}
+var errUnsupportedTaggedFields = PacketDecodingError{"non-empty tagged fields are not supported yet"}
 
 type realDecoder struct {
 	raw   []byte
@@ -73,6 +75,22 @@ func (rd *realDecoder) getVarint() (int64, error) {
 	return tmp, nil
 }
 
+func (rd *realDecoder) getUVarint() (uint64, error) {
+	tmp, n := binary.Uvarint(rd.raw[rd.off:])
+	if n == 0 {
+		rd.off = len(rd.raw)
+		return 0, ErrInsufficientData
+	}
+
+	if n < 0 {
+		rd.off -= n
+		return 0, errUVarintOverflow
+	}
+
+	rd.off += n
+	return tmp, nil
+}
+
 func (rd *realDecoder) getArrayLength() (int, error) {
 	if rd.remaining() < 4 {
 		rd.off = len(rd.raw)
@@ -89,6 +107,19 @@ func (rd *realDecoder) getArrayLength() (int, error) {
 	return tmp, nil
 }
 
+func (rd *realDecoder) getCompactArrayLength() (int, error) {
+	n, err := rd.getUVarint()
+	if err != nil {
+		return 0, err
+	}
+
+	if n == 0 {
+		return 0, nil
+	}
+
+	return int(n) - 1, nil
+}
+
 func (rd *realDecoder) getBool() (bool, error) {
 	b, err := rd.getInt8()
 	if err != nil || b == 0 {
@@ -100,6 +131,19 @@ func (rd *realDecoder) getBool() (bool, error) {
 	return true, nil
 }
 
+func (rd *realDecoder) getEmptyTaggedFieldArray() (int, error) {
+	tagCount, err := rd.getUVarint()
+	if err != nil {
+		return 0, err
+	}
+
+	if tagCount != 0 {
+		return 0, errUnsupportedTaggedFields
+	}
+
+	return 0, nil
+}
+
 // collections
 
 func (rd *realDecoder) getBytes() ([]byte, error) {
@@ -167,6 +211,58 @@ func (rd *realDecoder) getNullableString() (*string, error) {
 	return &tmpStr, err
 }
 
+func (rd *realDecoder) getCompactString() (string, error) {
+	n, err := rd.getUVarint()
+	if err != nil {
+		return "", err
+	}
+
+	var length = int(n - 1)
+
+	tmpStr := string(rd.raw[rd.off : rd.off+length])
+	rd.off += length
+	return tmpStr, nil
+}
+
+func (rd *realDecoder) getCompactNullableString() (*string, error) {
+	n, err := rd.getUVarint()
+
+	if err != nil {
+		return nil, err
+	}
+
+	var length = int(n - 1)
+
+	if length < 0 {
+		return nil, err
+	}
+
+	tmpStr := string(rd.raw[rd.off : rd.off+length])
+	rd.off += length
+	return &tmpStr, err
+}
+
+func (rd *realDecoder) getCompactInt32Array() ([]int32, error) {
+	n, err := rd.getUVarint()
+	if err != nil {
+		return nil, err
+	}
+
+	if n == 0 {
+		return nil, nil
+	}
+
+	arrayLength := int(n) - 1
+
+	ret := make([]int32, arrayLength)
+
+	for i := range ret {
+		ret[i] = int32(binary.BigEndian.Uint32(rd.raw[rd.off:]))
+		rd.off += 4
+	}
+	return ret, nil
+}
+
 func (rd *realDecoder) getInt32Array() ([]int32, error) {
 	if rd.remaining() < 4 {
 		rd.off = len(rd.raw)

+ 52 - 0
real_encoder.go

@@ -2,6 +2,7 @@ package sarama
 
 import (
 	"encoding/binary"
+	"errors"
 
 	"github.com/rcrowley/go-metrics"
 )
@@ -39,11 +40,20 @@ func (re *realEncoder) putVarint(in int64) {
 	re.off += binary.PutVarint(re.raw[re.off:], in)
 }
 
+func (re *realEncoder) putUVarint(in uint64) {
+	re.off += binary.PutUvarint(re.raw[re.off:], in)
+}
+
 func (re *realEncoder) putArrayLength(in int) error {
 	re.putInt32(int32(in))
 	return nil
 }
 
+func (re *realEncoder) putCompactArrayLength(in int) {
+	// 0 represents a null array, so +1 has to be added
+	re.putUVarint(uint64(in + 1))
+}
+
 func (re *realEncoder) putBool(in bool) {
 	if in {
 		re.putInt8(1)
@@ -78,6 +88,19 @@ func (re *realEncoder) putVarintBytes(in []byte) error {
 	return re.putRawBytes(in)
 }
 
+func (re *realEncoder) putCompactString(in string) error {
+	re.putCompactArrayLength(len(in))
+	return re.putRawBytes([]byte(in))
+}
+
+func (re *realEncoder) putNullableCompactString(in *string) error {
+	if in == nil {
+		re.putInt8(0)
+		return nil
+	}
+	return re.putCompactString(*in)
+}
+
 func (re *realEncoder) putString(in string) error {
 	re.putInt16(int16(len(in)))
 	copy(re.raw[re.off:], in)
@@ -108,6 +131,31 @@ func (re *realEncoder) putStringArray(in []string) error {
 	return nil
 }
 
+func (re *realEncoder) putCompactInt32Array(in []int32) error {
+	if in == nil {
+		return errors.New("expected int32 array to be non null")
+	}
+	// 0 represents a null array, so +1 has to be added
+	re.putUVarint(uint64(len(in)) + 1)
+	for _, val := range in {
+		re.putInt32(val)
+	}
+	return nil
+}
+
+func (re *realEncoder) putNullableCompactInt32Array(in []int32) error {
+	if in == nil {
+		re.putUVarint(0)
+		return nil
+	}
+	// 0 represents a null array, so +1 has to be added
+	re.putUVarint(uint64(len(in)) + 1)
+	for _, val := range in {
+		re.putInt32(val)
+	}
+	return nil
+}
+
 func (re *realEncoder) putInt32Array(in []int32) error {
 	err := re.putArrayLength(len(in))
 	if err != nil {
@@ -130,6 +178,10 @@ func (re *realEncoder) putInt64Array(in []int64) error {
 	return nil
 }
 
+func (re *realEncoder) putEmptyTaggedFieldArray() {
+	re.putUVarint(0)
+}
+
 func (re *realEncoder) offset() int {
 	return re.off
 }

+ 24 - 4
request.go

@@ -11,6 +11,7 @@ type protocolBody interface {
 	versionedDecoder
 	key() int16
 	version() int16
+	headerVersion() int16
 	requiredVersion() KafkaVersion
 }
 
@@ -26,12 +27,19 @@ func (r *request) encode(pe packetEncoder) error {
 	pe.putInt16(r.body.version())
 	pe.putInt32(r.correlationID)
 
-	err := pe.putString(r.clientID)
-	if err != nil {
-		return err
+	if r.body.headerVersion() >= 1 {
+		err := pe.putString(r.clientID)
+		if err != nil {
+			return err
+		}
+	}
+
+	if r.body.headerVersion() >= 2 {
+		// we don't use tag headers at the moment so we just put an array length of 0
+		pe.putUVarint(0)
 	}
 
-	err = r.body.encode(pe)
+	err := r.body.encode(pe)
 	if err != nil {
 		return err
 	}
@@ -65,6 +73,14 @@ func (r *request) decode(pd packetDecoder) (err error) {
 		return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
 	}
 
+	if r.body.headerVersion() >= 2 {
+		// tagged field
+		_, err = pd.getUVarint()
+		if err != nil {
+			return err
+		}
+	}
+
 	return r.body.decode(pd, version)
 }
 
@@ -166,6 +182,10 @@ func allocateBody(key, version int16) protocolBody {
 		return &CreatePartitionsRequest{}
 	case 42:
 		return &DeleteGroupsRequest{}
+	case 45:
+		return &AlterPartitionReassignmentsRequest{}
+	case 46:
+		return &ListPartitionReassignmentsRequest{}
 	}
 	return nil
 }

+ 21 - 2
request_test.go

@@ -42,13 +42,32 @@ func testRequest(t *testing.T, name string, rb protocolBody, expected []byte) {
 	testRequestDecode(t, name, rb, packet)
 }
 
+func testRequestWithoutByteComparison(t *testing.T, name string, rb protocolBody) {
+	if !rb.requiredVersion().IsAtLeast(MinVersion) {
+		t.Errorf("Request %s has invalid required version", name)
+	}
+	packet := testRequestEncode(t, name, rb, nil)
+	testRequestDecode(t, name, rb, packet)
+}
+
 func testRequestEncode(t *testing.T, name string, rb protocolBody, expected []byte) []byte {
 	req := &request{correlationID: 123, clientID: "foo", body: rb}
 	packet, err := encode(req, nil)
-	headerSize := 14 + len("foo")
+
+	headerSize := 0
+
+	switch rb.headerVersion() {
+	case 1:
+		headerSize = 14 + len("foo")
+	case 2:
+		headerSize = 14 + len("foo") + 1
+	default:
+		t.Error("Encoding", name, "failed\nheaderVersion", rb.headerVersion(), "not implemented")
+	}
+
 	if err != nil {
 		t.Error(err)
-	} else if !bytes.Equal(packet[headerSize:], expected) {
+	} else if expected != nil && !bytes.Equal(packet[headerSize:], expected) {
 		t.Error("Encoding", name, "failed\ngot ", packet[headerSize:], "\nwant", expected)
 	}
 	return packet

+ 8 - 1
response_header.go

@@ -10,7 +10,7 @@ type responseHeader struct {
 	correlationID int32
 }
 
-func (r *responseHeader) decode(pd packetDecoder) (err error) {
+func (r *responseHeader) decode(pd packetDecoder, version int16) (err error) {
 	r.length, err = pd.getInt32()
 	if err != nil {
 		return err
@@ -20,5 +20,12 @@ func (r *responseHeader) decode(pd packetDecoder) (err error) {
 	}
 
 	r.correlationID, err = pd.getInt32()
+
+	if version >= 1 {
+		if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+			return err
+		}
+	}
+
 	return err
 }

+ 19 - 3
response_header_test.go

@@ -3,15 +3,31 @@ package sarama
 import "testing"
 
 var (
-	responseHeaderBytes = []byte{
+	responseHeaderBytesV0 = []byte{
 		0x00, 0x00, 0x0f, 0x00,
 		0x0a, 0xbb, 0xcc, 0xff}
+
+	responseHeaderBytesV1 = []byte{
+		0x00, 0x00, 0x0f, 0x00,
+		0x0a, 0xbb, 0xcc, 0xff, 0x00}
 )
 
-func TestResponseHeader(t *testing.T) {
+func TestResponseHeaderV0(t *testing.T) {
+	header := responseHeader{}
+
+	testVersionDecodable(t, "response header", &header, responseHeaderBytesV0, 0)
+	if header.length != 0xf00 {
+		t.Error("Decoding header length failed, got", header.length)
+	}
+	if header.correlationID != 0x0abbccff {
+		t.Error("Decoding header correlation id failed, got", header.correlationID)
+	}
+}
+
+func TestResponseHeaderV1(t *testing.T) {
 	header := responseHeader{}
 
-	testDecodable(t, "response header", &header, responseHeaderBytes)
+	testVersionDecodable(t, "response header", &header, responseHeaderBytesV1, 1)
 	if header.length != 0xf00 {
 		t.Error("Decoding header length failed, got", header.length)
 	}

+ 4 - 0
sasl_authenticate_request.go

@@ -24,6 +24,10 @@ func (r *SaslAuthenticateRequest) version() int16 {
 	return 0
 }
 
+func (r *SaslAuthenticateRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *SaslAuthenticateRequest) requiredVersion() KafkaVersion {
 	return V1_0_0_0
 }

+ 4 - 0
sasl_authenticate_response.go

@@ -39,6 +39,10 @@ func (r *SaslAuthenticateResponse) version() int16 {
 	return 0
 }
 
+func (r *SaslAuthenticateResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *SaslAuthenticateResponse) requiredVersion() KafkaVersion {
 	return V1_0_0_0
 }

+ 4 - 0
sasl_handshake_request.go

@@ -29,6 +29,10 @@ func (r *SaslHandshakeRequest) version() int16 {
 	return r.Version
 }
 
+func (r *SaslHandshakeRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *SaslHandshakeRequest) requiredVersion() KafkaVersion {
 	return V0_10_0_0
 }

+ 4 - 0
sasl_handshake_response.go

@@ -33,6 +33,10 @@ func (r *SaslHandshakeResponse) version() int16 {
 	return 0
 }
 
+func (r *SaslHandshakeResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *SaslHandshakeResponse) requiredVersion() KafkaVersion {
 	return V0_10_0_0
 }

+ 4 - 0
sync_group_request.go

@@ -77,6 +77,10 @@ func (r *SyncGroupRequest) version() int16 {
 	return 0
 }
 
+func (r *SyncGroupRequest) headerVersion() int16 {
+	return 1
+}
+
 func (r *SyncGroupRequest) requiredVersion() KafkaVersion {
 	return V0_9_0_0
 }

+ 4 - 0
sync_group_response.go

@@ -36,6 +36,10 @@ func (r *SyncGroupResponse) version() int16 {
 	return 0
 }
 
+func (r *SyncGroupResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *SyncGroupResponse) requiredVersion() KafkaVersion {
 	return V0_9_0_0
 }

+ 4 - 0
txn_offset_commit_request.go

@@ -91,6 +91,10 @@ func (a *TxnOffsetCommitRequest) version() int16 {
 	return 0
 }
 
+func (a *TxnOffsetCommitRequest) headerVersion() int16 {
+	return 1
+}
+
 func (a *TxnOffsetCommitRequest) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }

+ 4 - 0
txn_offset_commit_response.go

@@ -78,6 +78,10 @@ func (a *TxnOffsetCommitResponse) version() int16 {
 	return 0
 }
 
+func (a *TxnOffsetCommitResponse) headerVersion() int16 {
+	return 0
+}
+
 func (a *TxnOffsetCommitResponse) requiredVersion() KafkaVersion {
 	return V0_11_0_0
 }