Browse Source

Merge branch 'master' of https://github.com/Shopify/sarama

AJ Yoo 7 years ago
parent
commit
5a1cd7b885

+ 30 - 0
CHANGELOG.md

@@ -1,5 +1,35 @@
 # Changelog
 # Changelog
 
 
+#### Version 1.20.0 (2018-12-10)
+
+New Features:
+ - Add support for zstd compression
+   ([#1170](https://github.com/Shopify/sarama/pull/1170)).
+ - Add support for Idempotent Producer
+   ([#1152](https://github.com/Shopify/sarama/pull/1152)).
+ - Add support support for Kafka 2.1.0
+   ([#1229](https://github.com/Shopify/sarama/pull/1229)).
+ - Add support support for OffsetCommit request/response pairs versions v1 to v5
+   ([#1201](https://github.com/Shopify/sarama/pull/1201)).
+ - Add support support for OffsetFetch request/response pair up to version v5
+   ([#1198](https://github.com/Shopify/sarama/pull/1198)).
+
+Improvements:
+ - Export broker's Rack setting
+   ([#1173](https://github.com/Shopify/sarama/pull/1173)).
+ - Always use latest patch version of Go on CI
+   ([#1202](https://github.com/Shopify/sarama/pull/1202)).
+ - Add error codes 61 to 72
+   ([#1195](https://github.com/Shopify/sarama/pull/1195)).
+
+Bug Fixes:
+ - Fix build without cgo
+   ([#1182](https://github.com/Shopify/sarama/pull/1182)).
+ - Fix go vet suggestion in consumer group file
+   ([#1209](https://github.com/Shopify/sarama/pull/1209)).
+ - Fix typos in code and comments
+   ([#1228](https://github.com/Shopify/sarama/pull/1228)).
+
 #### Version 1.19.0 (2018-09-27)
 #### Version 1.19.0 (2018-09-27)
 
 
 New Features:
 New Features:

+ 27 - 6
describe_configs_request.go

@@ -1,15 +1,17 @@
 package sarama
 package sarama
 
 
+type DescribeConfigsRequest struct {
+	Version         int16
+	Resources       []*ConfigResource
+	IncludeSynonyms bool
+}
+
 type ConfigResource struct {
 type ConfigResource struct {
 	Type        ConfigResourceType
 	Type        ConfigResourceType
 	Name        string
 	Name        string
 	ConfigNames []string
 	ConfigNames []string
 }
 }
 
 
-type DescribeConfigsRequest struct {
-	Resources []*ConfigResource
-}
-
 func (r *DescribeConfigsRequest) encode(pe packetEncoder) error {
 func (r *DescribeConfigsRequest) encode(pe packetEncoder) error {
 	if err := pe.putArrayLength(len(r.Resources)); err != nil {
 	if err := pe.putArrayLength(len(r.Resources)); err != nil {
 		return err
 		return err
@@ -30,6 +32,10 @@ func (r *DescribeConfigsRequest) encode(pe packetEncoder) error {
 		}
 		}
 	}
 	}
 
 
+	if r.Version >= 1 {
+		pe.putBool(r.IncludeSynonyms)
+	}
+
 	return nil
 	return nil
 }
 }
 
 
@@ -74,6 +80,14 @@ func (r *DescribeConfigsRequest) decode(pd packetDecoder, version int16) (err er
 		}
 		}
 		r.Resources[i].ConfigNames = cfnames
 		r.Resources[i].ConfigNames = cfnames
 	}
 	}
+	r.Version = version
+	if r.Version >= 1 {
+		b, err := pd.getBool()
+		if err != nil {
+			return err
+		}
+		r.IncludeSynonyms = b
+	}
 
 
 	return nil
 	return nil
 }
 }
@@ -83,9 +97,16 @@ func (r *DescribeConfigsRequest) key() int16 {
 }
 }
 
 
 func (r *DescribeConfigsRequest) version() int16 {
 func (r *DescribeConfigsRequest) version() int16 {
-	return 0
+	return r.Version
 }
 }
 
 
 func (r *DescribeConfigsRequest) requiredVersion() KafkaVersion {
 func (r *DescribeConfigsRequest) requiredVersion() KafkaVersion {
-	return V0_11_0_0
+	switch r.Version {
+	case 1:
+		return V1_0_0_0
+	case 2:
+		return V2_0_0_0
+	default:
+		return V0_11_0_0
+	}
 }
 }

+ 30 - 1
describe_configs_request_test.go

@@ -33,23 +33,33 @@ var (
 	}
 	}
 
 
 	singleDescribeConfigsRequestAllConfigs = []byte{
 	singleDescribeConfigsRequestAllConfigs = []byte{
+		0, 0, 0, 1, // 1 config
+		2,                   // a topic
+		0, 3, 'f', 'o', 'o', // topic name: foo
+		255, 255, 255, 255, // all configs
+	}
+
+	singleDescribeConfigsRequestAllConfigsv1 = []byte{
 		0, 0, 0, 1, // 1 config
 		0, 0, 0, 1, // 1 config
 		2,                   // a topic
 		2,                   // a topic
 		0, 3, 'f', 'o', 'o', // topic name: foo
 		0, 3, 'f', 'o', 'o', // topic name: foo
 		255, 255, 255, 255, // no configs
 		255, 255, 255, 255, // no configs
+		1, //synoms
 	}
 	}
 )
 )
 
 
-func TestDescribeConfigsRequest(t *testing.T) {
+func TestDescribeConfigsRequestv0(t *testing.T) {
 	var request *DescribeConfigsRequest
 	var request *DescribeConfigsRequest
 
 
 	request = &DescribeConfigsRequest{
 	request = &DescribeConfigsRequest{
+		Version:   0,
 		Resources: []*ConfigResource{},
 		Resources: []*ConfigResource{},
 	}
 	}
 	testRequest(t, "no requests", request, emptyDescribeConfigsRequest)
 	testRequest(t, "no requests", request, emptyDescribeConfigsRequest)
 
 
 	configs := []string{"segment.ms"}
 	configs := []string{"segment.ms"}
 	request = &DescribeConfigsRequest{
 	request = &DescribeConfigsRequest{
+		Version: 0,
 		Resources: []*ConfigResource{
 		Resources: []*ConfigResource{
 			&ConfigResource{
 			&ConfigResource{
 				Type:        TopicResource,
 				Type:        TopicResource,
@@ -62,6 +72,7 @@ func TestDescribeConfigsRequest(t *testing.T) {
 	testRequest(t, "one config", request, singleDescribeConfigsRequest)
 	testRequest(t, "one config", request, singleDescribeConfigsRequest)
 
 
 	request = &DescribeConfigsRequest{
 	request = &DescribeConfigsRequest{
+		Version: 0,
 		Resources: []*ConfigResource{
 		Resources: []*ConfigResource{
 			&ConfigResource{
 			&ConfigResource{
 				Type:        TopicResource,
 				Type:        TopicResource,
@@ -78,6 +89,7 @@ func TestDescribeConfigsRequest(t *testing.T) {
 	testRequest(t, "two configs", request, doubleDescribeConfigsRequest)
 	testRequest(t, "two configs", request, doubleDescribeConfigsRequest)
 
 
 	request = &DescribeConfigsRequest{
 	request = &DescribeConfigsRequest{
+		Version: 0,
 		Resources: []*ConfigResource{
 		Resources: []*ConfigResource{
 			&ConfigResource{
 			&ConfigResource{
 				Type: TopicResource,
 				Type: TopicResource,
@@ -88,3 +100,20 @@ func TestDescribeConfigsRequest(t *testing.T) {
 
 
 	testRequest(t, "one topic, all configs", request, singleDescribeConfigsRequestAllConfigs)
 	testRequest(t, "one topic, all configs", request, singleDescribeConfigsRequestAllConfigs)
 }
 }
+
+func TestDescribeConfigsRequestv1(t *testing.T) {
+	var request *DescribeConfigsRequest
+
+	request = &DescribeConfigsRequest{
+		Version: 1,
+		Resources: []*ConfigResource{
+			{
+				Type: TopicResource,
+				Name: "foo",
+			},
+		},
+		IncludeSynonyms: true,
+	}
+
+	testRequest(t, "one topic, all configs", request, singleDescribeConfigsRequestAllConfigsv1)
+}

+ 145 - 13
describe_configs_response.go

@@ -1,8 +1,41 @@
 package sarama
 package sarama
 
 
-import "time"
+import (
+	"fmt"
+	"time"
+)
+
+type ConfigSource int8
+
+func (s ConfigSource) String() string {
+	switch s {
+	case SourceUnknown:
+		return "Unknown"
+	case SourceTopic:
+		return "Topic"
+	case SourceDynamicBroker:
+		return "DynamicBroker"
+	case SourceDynamicDefaultBroker:
+		return "DynamicDefaultBroker"
+	case SourceStaticBroker:
+		return "StaticBroker"
+	case SourceDefault:
+		return "Default"
+	}
+	return fmt.Sprintf("Source Invalid: %d", int(s))
+}
+
+const (
+	SourceUnknown              ConfigSource = 0
+	SourceTopic                ConfigSource = 1
+	SourceDynamicBroker        ConfigSource = 2
+	SourceDynamicDefaultBroker ConfigSource = 3
+	SourceStaticBroker         ConfigSource = 4
+	SourceDefault              ConfigSource = 5
+)
 
 
 type DescribeConfigsResponse struct {
 type DescribeConfigsResponse struct {
+	Version      int16
 	ThrottleTime time.Duration
 	ThrottleTime time.Duration
 	Resources    []*ResourceResponse
 	Resources    []*ResourceResponse
 }
 }
@@ -20,7 +53,15 @@ type ConfigEntry struct {
 	Value     string
 	Value     string
 	ReadOnly  bool
 	ReadOnly  bool
 	Default   bool
 	Default   bool
+	Source    ConfigSource
 	Sensitive bool
 	Sensitive bool
+	Synonyms  []*ConfigSynonym
+}
+
+type ConfigSynonym struct {
+	ConfigName  string
+	ConfigValue string
+	Source      ConfigSource
 }
 }
 
 
 func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
 func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
@@ -30,14 +71,16 @@ func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
 	}
 	}
 
 
 	for _, c := range r.Resources {
 	for _, c := range r.Resources {
-		if err = c.encode(pe); err != nil {
+		if err = c.encode(pe, r.Version); err != nil {
 			return err
 			return err
 		}
 		}
 	}
 	}
+
 	return nil
 	return nil
 }
 }
 
 
 func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) {
 func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) {
+	r.Version = version
 	throttleTime, err := pd.getInt32()
 	throttleTime, err := pd.getInt32()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -66,14 +109,21 @@ func (r *DescribeConfigsResponse) key() int16 {
 }
 }
 
 
 func (r *DescribeConfigsResponse) version() int16 {
 func (r *DescribeConfigsResponse) version() int16 {
-	return 0
+	return r.Version
 }
 }
 
 
 func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
 func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
-	return V0_11_0_0
+	switch r.Version {
+	case 1:
+		return V1_0_0_0
+	case 2:
+		return V2_0_0_0
+	default:
+		return V0_11_0_0
+	}
 }
 }
 
 
-func (r *ResourceResponse) encode(pe packetEncoder) (err error) {
+func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) {
 	pe.putInt16(r.ErrorCode)
 	pe.putInt16(r.ErrorCode)
 
 
 	if err = pe.putString(r.ErrorMsg); err != nil {
 	if err = pe.putString(r.ErrorMsg); err != nil {
@@ -91,7 +141,7 @@ func (r *ResourceResponse) encode(pe packetEncoder) (err error) {
 	}
 	}
 
 
 	for _, c := range r.Configs {
 	for _, c := range r.Configs {
-		if err = c.encode(pe); err != nil {
+		if err = c.encode(pe, version); err != nil {
 			return err
 			return err
 		}
 		}
 	}
 	}
@@ -139,7 +189,7 @@ func (r *ResourceResponse) decode(pd packetDecoder, version int16) (err error) {
 	return nil
 	return nil
 }
 }
 
 
-func (r *ConfigEntry) encode(pe packetEncoder) (err error) {
+func (r *ConfigEntry) encode(pe packetEncoder, version int16) (err error) {
 	if err = pe.putString(r.Name); err != nil {
 	if err = pe.putString(r.Name); err != nil {
 		return err
 		return err
 	}
 	}
@@ -149,12 +199,32 @@ func (r *ConfigEntry) encode(pe packetEncoder) (err error) {
 	}
 	}
 
 
 	pe.putBool(r.ReadOnly)
 	pe.putBool(r.ReadOnly)
-	pe.putBool(r.Default)
-	pe.putBool(r.Sensitive)
+
+	if version <= 0 {
+		pe.putBool(r.Default)
+		pe.putBool(r.Sensitive)
+	} else {
+		pe.putInt8(int8(r.Source))
+		pe.putBool(r.Sensitive)
+
+		if err := pe.putArrayLength(len(r.Synonyms)); err != nil {
+			return err
+		}
+		for _, c := range r.Synonyms {
+			if err = c.encode(pe, version); err != nil {
+				return err
+			}
+		}
+	}
+
 	return nil
 	return nil
 }
 }
 
 
+//https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
 func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
 func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
+	if version == 0 {
+		r.Source = SourceUnknown
+	}
 	name, err := pd.getString()
 	name, err := pd.getString()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -173,16 +243,78 @@ func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
 	}
 	}
 	r.ReadOnly = read
 	r.ReadOnly = read
 
 
-	de, err := pd.getBool()
-	if err != nil {
-		return err
+	if version == 0 {
+		defaultB, err := pd.getBool()
+		if err != nil {
+			return err
+		}
+		r.Default = defaultB
+	} else {
+		source, err := pd.getInt8()
+		if err != nil {
+			return err
+		}
+		r.Source = ConfigSource(source)
 	}
 	}
-	r.Default = de
 
 
 	sensitive, err := pd.getBool()
 	sensitive, err := pd.getBool()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 	r.Sensitive = sensitive
 	r.Sensitive = sensitive
+
+	if version > 0 {
+		n, err := pd.getArrayLength()
+		if err != nil {
+			return err
+		}
+		r.Synonyms = make([]*ConfigSynonym, n)
+
+		for i := 0; i < n; i++ {
+			s := &ConfigSynonym{}
+			if err := s.decode(pd, version); err != nil {
+				return err
+			}
+			r.Synonyms[i] = s
+		}
+
+	}
+	return nil
+}
+
+func (c *ConfigSynonym) encode(pe packetEncoder, version int16) (err error) {
+	err = pe.putString(c.ConfigName)
+	if err != nil {
+		return err
+	}
+
+	err = pe.putString(c.ConfigValue)
+	if err != nil {
+		return err
+	}
+
+	pe.putInt8(int8(c.Source))
+
+	return nil
+}
+
+func (c *ConfigSynonym) decode(pd packetDecoder, version int16) error {
+	name, err := pd.getString()
+	if err != nil {
+		return nil
+	}
+	c.ConfigName = name
+
+	value, err := pd.getString()
+	if err != nil {
+		return nil
+	}
+	c.ConfigValue = value
+
+	source, err := pd.getInt8()
+	if err != nil {
+		return nil
+	}
+	c.Source = ConfigSource(source)
 	return nil
 	return nil
 }
 }

+ 115 - 4
describe_configs_response_test.go

@@ -10,7 +10,7 @@ var (
 		0, 0, 0, 0, // no configs
 		0, 0, 0, 0, // no configs
 	}
 	}
 
 
-	describeConfigsResponsePopulated = []byte{
+	describeConfigsResponsePopulatedv0 = []byte{
 		0, 0, 0, 0, //throttle
 		0, 0, 0, 0, //throttle
 		0, 0, 0, 1, // response
 		0, 0, 0, 1, // response
 		0, 0, //errorcode
 		0, 0, //errorcode
@@ -24,9 +24,44 @@ var (
 		0, // Default
 		0, // Default
 		0, // Sensitive
 		0, // Sensitive
 	}
 	}
+
+	describeConfigsResponsePopulatedv1 = []byte{
+		0, 0, 0, 0, //throttle
+		0, 0, 0, 1, // response
+		0, 0, //errorcode
+		0, 0, //string
+		2, // topic
+		0, 3, 'f', 'o', 'o',
+		0, 0, 0, 1, //configs
+		0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
+		0, 4, '1', '0', '0', '0',
+		0,          // ReadOnly
+		4,          // Source
+		0,          // Sensitive
+		0, 0, 0, 0, // No Synonym
+	}
+
+	describeConfigsResponseWithSynonymv1 = []byte{
+		0, 0, 0, 0, //throttle
+		0, 0, 0, 1, // response
+		0, 0, //errorcode
+		0, 0, //string
+		2, // topic
+		0, 3, 'f', 'o', 'o',
+		0, 0, 0, 1, //configs
+		0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
+		0, 4, '1', '0', '0', '0',
+		0,          // ReadOnly
+		4,          // Source
+		0,          // Sensitive
+		0, 0, 0, 1, // 1 Synonym
+		0, 14, 'l', 'o', 'g', '.', 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
+		0, 4, '1', '0', '0', '0',
+		4, // Source
+	}
 )
 )
 
 
-func TestDescribeConfigsResponse(t *testing.T) {
+func TestDescribeConfigsResponsev0(t *testing.T) {
 	var response *DescribeConfigsResponse
 	var response *DescribeConfigsResponse
 
 
 	response = &DescribeConfigsResponse{
 	response = &DescribeConfigsResponse{
@@ -38,7 +73,7 @@ func TestDescribeConfigsResponse(t *testing.T) {
 	}
 	}
 
 
 	response = &DescribeConfigsResponse{
 	response = &DescribeConfigsResponse{
-		Resources: []*ResourceResponse{
+		Version: 0, Resources: []*ResourceResponse{
 			&ResourceResponse{
 			&ResourceResponse{
 				ErrorCode: 0,
 				ErrorCode: 0,
 				ErrorMsg:  "",
 				ErrorMsg:  "",
@@ -56,5 +91,81 @@ func TestDescribeConfigsResponse(t *testing.T) {
 			},
 			},
 		},
 		},
 	}
 	}
-	testResponse(t, "response with error", response, describeConfigsResponsePopulated)
+	testResponse(t, "response with error", response, describeConfigsResponsePopulatedv0)
+}
+
+func TestDescribeConfigsResponsev1(t *testing.T) {
+	var response *DescribeConfigsResponse
+
+	response = &DescribeConfigsResponse{
+		Resources: []*ResourceResponse{},
+	}
+	testVersionDecodable(t, "empty", response, describeConfigsResponseEmpty, 0)
+	if len(response.Resources) != 0 {
+		t.Error("Expected no groups")
+	}
+
+	response = &DescribeConfigsResponse{
+		Version: 1,
+		Resources: []*ResourceResponse{
+			&ResourceResponse{
+				ErrorCode: 0,
+				ErrorMsg:  "",
+				Type:      TopicResource,
+				Name:      "foo",
+				Configs: []*ConfigEntry{
+					&ConfigEntry{
+						Name:      "segment.ms",
+						Value:     "1000",
+						ReadOnly:  false,
+						Source:    SourceStaticBroker,
+						Sensitive: false,
+						Synonyms:  []*ConfigSynonym{},
+					},
+				},
+			},
+		},
+	}
+	testResponse(t, "response with error", response, describeConfigsResponsePopulatedv1)
+}
+
+func TestDescribeConfigsResponseWithSynonym(t *testing.T) {
+	var response *DescribeConfigsResponse
+
+	response = &DescribeConfigsResponse{
+		Resources: []*ResourceResponse{},
+	}
+	testVersionDecodable(t, "empty", response, describeConfigsResponseEmpty, 0)
+	if len(response.Resources) != 0 {
+		t.Error("Expected no groups")
+	}
+
+	response = &DescribeConfigsResponse{
+		Version: 1,
+		Resources: []*ResourceResponse{
+			&ResourceResponse{
+				ErrorCode: 0,
+				ErrorMsg:  "",
+				Type:      TopicResource,
+				Name:      "foo",
+				Configs: []*ConfigEntry{
+					&ConfigEntry{
+						Name:      "segment.ms",
+						Value:     "1000",
+						ReadOnly:  false,
+						Source:    SourceStaticBroker,
+						Sensitive: false,
+						Synonyms: []*ConfigSynonym{
+							{
+								ConfigName:  "log.segment.ms",
+								ConfigValue: "1000",
+								Source:      SourceStaticBroker,
+							},
+						},
+					},
+				},
+			},
+		},
+	}
+	testResponse(t, "response with error", response, describeConfigsResponseWithSynonymv1)
 }
 }

+ 1 - 1
mockresponses.go

@@ -523,7 +523,7 @@ func (mr *MockOffsetFetchResponse) SetOffset(group, topic string, partition int3
 		partitions = make(map[int32]*OffsetFetchResponseBlock)
 		partitions = make(map[int32]*OffsetFetchResponseBlock)
 		topics[topic] = partitions
 		topics[topic] = partitions
 	}
 	}
-	partitions[partition] = &OffsetFetchResponseBlock{offset, metadata, kerror}
+	partitions[partition] = &OffsetFetchResponseBlock{offset, 0, metadata, kerror}
 	return mr
 	return mr
 }
 }
 
 

+ 7 - 1
offset_commit_request.go

@@ -52,12 +52,14 @@ type OffsetCommitRequest struct {
 	// - 0 (kafka 0.8.1 and later)
 	// - 0 (kafka 0.8.1 and later)
 	// - 1 (kafka 0.8.2 and later)
 	// - 1 (kafka 0.8.2 and later)
 	// - 2 (kafka 0.9.0 and later)
 	// - 2 (kafka 0.9.0 and later)
+	// - 3 (kafka 0.11.0 and later)
+	// - 4 (kafka 2.0.0 and later)
 	Version int16
 	Version int16
 	blocks  map[string]map[int32]*offsetCommitRequestBlock
 	blocks  map[string]map[int32]*offsetCommitRequestBlock
 }
 }
 
 
 func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
 func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
-	if r.Version < 0 || r.Version > 2 {
+	if r.Version < 0 || r.Version > 4 {
 		return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"}
 		return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"}
 	}
 	}
 
 
@@ -174,6 +176,10 @@ func (r *OffsetCommitRequest) requiredVersion() KafkaVersion {
 		return V0_8_2_0
 		return V0_8_2_0
 	case 2:
 	case 2:
 		return V0_9_0_0
 		return V0_9_0_0
+	case 3:
+		return V0_11_0_0
+	case 4:
+		return V2_0_0_0
 	default:
 	default:
 		return MinVersion
 		return MinVersion
 	}
 	}

+ 16 - 11
offset_commit_request_test.go

@@ -1,6 +1,9 @@
 package sarama
 package sarama
 
 
-import "testing"
+import (
+	"fmt"
+	"testing"
+)
 
 
 var (
 var (
 	offsetCommitRequestNoBlocksV0 = []byte{
 	offsetCommitRequestNoBlocksV0 = []byte{
@@ -76,15 +79,17 @@ func TestOffsetCommitRequestV1(t *testing.T) {
 	testRequest(t, "one block v1", request, offsetCommitRequestOneBlockV1)
 	testRequest(t, "one block v1", request, offsetCommitRequestOneBlockV1)
 }
 }
 
 
-func TestOffsetCommitRequestV2(t *testing.T) {
-	request := new(OffsetCommitRequest)
-	request.ConsumerGroup = "foobar"
-	request.ConsumerID = "cons"
-	request.ConsumerGroupGeneration = 0x1122
-	request.RetentionTime = 0x4433
-	request.Version = 2
-	testRequest(t, "no blocks v2", request, offsetCommitRequestNoBlocksV2)
+func TestOffsetCommitRequestV2ToV4(t *testing.T) {
+	for version := 2; version <= 4; version++ {
+		request := new(OffsetCommitRequest)
+		request.ConsumerGroup = "foobar"
+		request.ConsumerID = "cons"
+		request.ConsumerGroupGeneration = 0x1122
+		request.RetentionTime = 0x4433
+		request.Version = int16(version)
+		testRequest(t, fmt.Sprintf("no blocks v%d", version), request, offsetCommitRequestNoBlocksV2)
 
 
-	request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, "metadata")
-	testRequest(t, "one block v2", request, offsetCommitRequestOneBlockV2)
+		request.AddBlock("topic", 0x5221, 0xDEADBEEF, 0, "metadata")
+		testRequest(t, fmt.Sprintf("one block v%d", version), request, offsetCommitRequestOneBlockV2)
+	}
 }
 }

+ 28 - 3
offset_commit_response.go

@@ -1,7 +1,9 @@
 package sarama
 package sarama
 
 
 type OffsetCommitResponse struct {
 type OffsetCommitResponse struct {
-	Errors map[string]map[int32]KError
+	Version        int16
+	ThrottleTimeMs int32
+	Errors         map[string]map[int32]KError
 }
 }
 
 
 func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) {
 func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) {
@@ -17,6 +19,9 @@ func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KE
 }
 }
 
 
 func (r *OffsetCommitResponse) encode(pe packetEncoder) error {
 func (r *OffsetCommitResponse) encode(pe packetEncoder) error {
+	if r.Version >= 3 {
+		pe.putInt32(r.ThrottleTimeMs)
+	}
 	if err := pe.putArrayLength(len(r.Errors)); err != nil {
 	if err := pe.putArrayLength(len(r.Errors)); err != nil {
 		return err
 		return err
 	}
 	}
@@ -36,6 +41,15 @@ func (r *OffsetCommitResponse) encode(pe packetEncoder) error {
 }
 }
 
 
 func (r *OffsetCommitResponse) decode(pd packetDecoder, version int16) (err error) {
 func (r *OffsetCommitResponse) decode(pd packetDecoder, version int16) (err error) {
+	r.Version = version
+
+	if version >= 3 {
+		r.ThrottleTimeMs, err = pd.getInt32()
+		if err != nil {
+			return err
+		}
+	}
+
 	numTopics, err := pd.getArrayLength()
 	numTopics, err := pd.getArrayLength()
 	if err != nil || numTopics == 0 {
 	if err != nil || numTopics == 0 {
 		return err
 		return err
@@ -77,9 +91,20 @@ func (r *OffsetCommitResponse) key() int16 {
 }
 }
 
 
 func (r *OffsetCommitResponse) version() int16 {
 func (r *OffsetCommitResponse) version() int16 {
-	return 0
+	return r.Version
 }
 }
 
 
 func (r *OffsetCommitResponse) requiredVersion() KafkaVersion {
 func (r *OffsetCommitResponse) requiredVersion() KafkaVersion {
-	return MinVersion
+	switch r.Version {
+	case 1:
+		return V0_8_2_0
+	case 2:
+		return V0_9_0_0
+	case 3:
+		return V0_11_0_0
+	case 4:
+		return V2_0_0_0
+	default:
+		return MinVersion
+	}
 }
 }

+ 15 - 0
offset_commit_response_test.go

@@ -1,6 +1,7 @@
 package sarama
 package sarama
 
 
 import (
 import (
+	"fmt"
 	"testing"
 	"testing"
 )
 )
 
 
@@ -22,3 +23,17 @@ func TestNormalOffsetCommitResponse(t *testing.T) {
 	// unpredictable map traversal order.
 	// unpredictable map traversal order.
 	testResponse(t, "normal", &response, nil)
 	testResponse(t, "normal", &response, nil)
 }
 }
+
+func TestOffsetCommitResponseWithThrottleTime(t *testing.T) {
+	for version := 3; version <= 4; version++ {
+		response := OffsetCommitResponse{
+			Version:        int16(version),
+			ThrottleTimeMs: 123,
+		}
+		response.AddError("t", 0, ErrNotLeaderForPartition)
+		response.Errors["m"] = make(map[int32]KError)
+		// The response encoded form cannot be checked for it varies due to
+		// unpredictable map traversal order.
+		testResponse(t, fmt.Sprintf("v%d with throttle time", version), &response, nil)
+	}
+}

+ 29 - 10
offset_fetch_request.go

@@ -1,28 +1,33 @@
 package sarama
 package sarama
 
 
 type OffsetFetchRequest struct {
 type OffsetFetchRequest struct {
-	ConsumerGroup string
 	Version       int16
 	Version       int16
+	ConsumerGroup string
 	partitions    map[string][]int32
 	partitions    map[string][]int32
 }
 }
 
 
 func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
 func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
-	if r.Version < 0 || r.Version > 1 {
+	if r.Version < 0 || r.Version > 5 {
 		return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
 		return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
 	}
 	}
 
 
 	if err = pe.putString(r.ConsumerGroup); err != nil {
 	if err = pe.putString(r.ConsumerGroup); err != nil {
 		return err
 		return err
 	}
 	}
-	if err = pe.putArrayLength(len(r.partitions)); err != nil {
-		return err
-	}
-	for topic, partitions := range r.partitions {
-		if err = pe.putString(topic); err != nil {
+
+	if r.Version >= 2 && r.partitions == nil {
+		pe.putInt32(-1)
+	} else {
+		if err = pe.putArrayLength(len(r.partitions)); err != nil {
 			return err
 			return err
 		}
 		}
-		if err = pe.putInt32Array(partitions); err != nil {
-			return err
+		for topic, partitions := range r.partitions {
+			if err = pe.putString(topic); err != nil {
+				return err
+			}
+			if err = pe.putInt32Array(partitions); err != nil {
+				return err
+			}
 		}
 		}
 	}
 	}
 	return nil
 	return nil
@@ -37,7 +42,7 @@ func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	if partitionCount == 0 {
+	if (partitionCount == 0 && version < 2) || partitionCount < 0 {
 		return nil
 		return nil
 	}
 	}
 	r.partitions = make(map[string][]int32)
 	r.partitions = make(map[string][]int32)
@@ -67,11 +72,25 @@ func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
 	switch r.Version {
 	switch r.Version {
 	case 1:
 	case 1:
 		return V0_8_2_0
 		return V0_8_2_0
+	case 2:
+		return V0_10_2_0
+	case 3:
+		return V0_11_0_0
+	case 4:
+		return V2_0_0_0
+	case 5:
+		return V2_1_0_0
 	default:
 	default:
 		return MinVersion
 		return MinVersion
 	}
 	}
 }
 }
 
 
+func (r *OffsetFetchRequest) ZeroPartitions() {
+	if r.partitions == nil && r.Version >= 2 {
+		r.partitions = make(map[string][]int32)
+	}
+}
+
 func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) {
 func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) {
 	if r.partitions == nil {
 	if r.partitions == nil {
 		r.partitions = make(map[string][]int32)
 		r.partitions = make(map[string][]int32)

+ 32 - 8
offset_fetch_request_test.go

@@ -1,6 +1,9 @@
 package sarama
 package sarama
 
 
-import "testing"
+import (
+	"fmt"
+	"testing"
+)
 
 
 var (
 var (
 	offsetFetchRequestNoGroupNoPartitions = []byte{
 	offsetFetchRequestNoGroupNoPartitions = []byte{
@@ -17,15 +20,36 @@ var (
 		0x00, 0x0D, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't',
 		0x00, 0x0D, 't', 'o', 'p', 'i', 'c', 'T', 'h', 'e', 'F', 'i', 'r', 's', 't',
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x00, 0x00, 0x01,
 		0x4F, 0x4F, 0x4F, 0x4F}
 		0x4F, 0x4F, 0x4F, 0x4F}
+
+	offsetFetchRequestAllPartitions = []byte{
+		0x00, 0x04, 'b', 'l', 'a', 'h',
+		0xff, 0xff, 0xff, 0xff}
 )
 )
 
 
-func TestOffsetFetchRequest(t *testing.T) {
-	request := new(OffsetFetchRequest)
-	testRequest(t, "no group, no partitions", request, offsetFetchRequestNoGroupNoPartitions)
+func TestOffsetFetchRequestNoPartitions(t *testing.T) {
+	for version := 0; version <= 5; version++ {
+		request := new(OffsetFetchRequest)
+		request.Version = int16(version)
+		request.ZeroPartitions()
+		testRequest(t, fmt.Sprintf("no group, no partitions %d", version), request, offsetFetchRequestNoGroupNoPartitions)
 
 
-	request.ConsumerGroup = "blah"
-	testRequest(t, "no partitions", request, offsetFetchRequestNoPartitions)
+		request.ConsumerGroup = "blah"
+		testRequest(t, fmt.Sprintf("no partitions %d", version), request, offsetFetchRequestNoPartitions)
+	}
+}
+func TestOffsetFetchRequest(t *testing.T) {
+	for version := 0; version <= 5; version++ {
+		request := new(OffsetFetchRequest)
+		request.Version = int16(version)
+		request.ConsumerGroup = "blah"
+		request.AddPartition("topicTheFirst", 0x4F4F4F4F)
+		testRequest(t, fmt.Sprintf("one partition %d", version), request, offsetFetchRequestOnePartition)
+	}
+}
 
 
-	request.AddPartition("topicTheFirst", 0x4F4F4F4F)
-	testRequest(t, "one partition", request, offsetFetchRequestOnePartition)
+func TestOffsetFetchRequestAllPartitions(t *testing.T) {
+	for version := 2; version <= 5; version++ {
+		request := &OffsetFetchRequest{Version: int16(version), ConsumerGroup: "blah"}
+		testRequest(t, fmt.Sprintf("all partitions %d", version), request, offsetFetchRequestAllPartitions)
+	}
 }
 }

+ 85 - 31
offset_fetch_response.go

@@ -1,17 +1,25 @@
 package sarama
 package sarama
 
 
 type OffsetFetchResponseBlock struct {
 type OffsetFetchResponseBlock struct {
-	Offset   int64
-	Metadata string
-	Err      KError
+	Offset      int64
+	LeaderEpoch int32
+	Metadata    string
+	Err         KError
 }
 }
 
 
-func (b *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
+func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
 	b.Offset, err = pd.getInt64()
 	b.Offset, err = pd.getInt64()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
 
 
+	if version >= 5 {
+		b.LeaderEpoch, err = pd.getInt32()
+		if err != nil {
+			return err
+		}
+	}
+
 	b.Metadata, err = pd.getString()
 	b.Metadata, err = pd.getString()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -26,9 +34,13 @@ func (b *OffsetFetchResponseBlock) decode(pd packetDecoder) (err error) {
 	return nil
 	return nil
 }
 }
 
 
-func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) {
+func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
 	pe.putInt64(b.Offset)
 	pe.putInt64(b.Offset)
 
 
+	if version >= 5 {
+		pe.putInt32(b.LeaderEpoch)
+	}
+
 	err = pe.putString(b.Metadata)
 	err = pe.putString(b.Metadata)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -40,10 +52,17 @@ func (b *OffsetFetchResponseBlock) encode(pe packetEncoder) (err error) {
 }
 }
 
 
 type OffsetFetchResponse struct {
 type OffsetFetchResponse struct {
-	Blocks map[string]map[int32]*OffsetFetchResponseBlock
+	Version        int16
+	ThrottleTimeMs int32
+	Blocks         map[string]map[int32]*OffsetFetchResponseBlock
+	Err            KError
 }
 }
 
 
 func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
 func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
+	if r.Version >= 3 {
+		pe.putInt32(r.ThrottleTimeMs)
+	}
+
 	if err := pe.putArrayLength(len(r.Blocks)); err != nil {
 	if err := pe.putArrayLength(len(r.Blocks)); err != nil {
 		return err
 		return err
 	}
 	}
@@ -56,51 +75,73 @@ func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
 		}
 		}
 		for partition, block := range partitions {
 		for partition, block := range partitions {
 			pe.putInt32(partition)
 			pe.putInt32(partition)
-			if err := block.encode(pe); err != nil {
+			if err := block.encode(pe, r.Version); err != nil {
 				return err
 				return err
 			}
 			}
 		}
 		}
 	}
 	}
+	if r.Version >= 2 {
+		pe.putInt16(int16(r.Err))
+	}
 	return nil
 	return nil
 }
 }
 
 
 func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) {
 func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) {
-	numTopics, err := pd.getArrayLength()
-	if err != nil || numTopics == 0 {
-		return err
-	}
-
-	r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
-	for i := 0; i < numTopics; i++ {
-		name, err := pd.getString()
-		if err != nil {
-			return err
-		}
+	r.Version = version
 
 
-		numBlocks, err := pd.getArrayLength()
+	if version >= 3 {
+		r.ThrottleTimeMs, err = pd.getInt32()
 		if err != nil {
 		if err != nil {
 			return err
 			return err
 		}
 		}
+	}
 
 
-		if numBlocks == 0 {
-			r.Blocks[name] = nil
-			continue
-		}
-		r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
+	numTopics, err := pd.getArrayLength()
+	if err != nil {
+		return err
+	}
 
 
-		for j := 0; j < numBlocks; j++ {
-			id, err := pd.getInt32()
+	if numTopics > 0 {
+		r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
+		for i := 0; i < numTopics; i++ {
+			name, err := pd.getString()
 			if err != nil {
 			if err != nil {
 				return err
 				return err
 			}
 			}
 
 
-			block := new(OffsetFetchResponseBlock)
-			err = block.decode(pd)
+			numBlocks, err := pd.getArrayLength()
 			if err != nil {
 			if err != nil {
 				return err
 				return err
 			}
 			}
-			r.Blocks[name][id] = block
+
+			if numBlocks == 0 {
+				r.Blocks[name] = nil
+				continue
+			}
+			r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
+
+			for j := 0; j < numBlocks; j++ {
+				id, err := pd.getInt32()
+				if err != nil {
+					return err
+				}
+
+				block := new(OffsetFetchResponseBlock)
+				err = block.decode(pd, version)
+				if err != nil {
+					return err
+				}
+				r.Blocks[name][id] = block
+			}
+		}
+	}
+
+	if version >= 2 {
+		kerr, err := pd.getInt16()
+		if err != nil {
+			return err
 		}
 		}
+		r.Err = KError(kerr)
 	}
 	}
 
 
 	return nil
 	return nil
@@ -111,11 +152,24 @@ func (r *OffsetFetchResponse) key() int16 {
 }
 }
 
 
 func (r *OffsetFetchResponse) version() int16 {
 func (r *OffsetFetchResponse) version() int16 {
-	return 0
+	return r.Version
 }
 }
 
 
 func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
 func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
-	return MinVersion
+	switch r.Version {
+	case 1:
+		return V0_8_2_0
+	case 2:
+		return V0_10_2_0
+	case 3:
+		return V0_11_0_0
+	case 4:
+		return V2_0_0_0
+	case 5:
+		return V2_1_0_0
+	default:
+		return MinVersion
+	}
 }
 }
 
 
 func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {
 func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {

+ 50 - 7
offset_fetch_response_test.go

@@ -1,22 +1,65 @@
 package sarama
 package sarama
 
 
-import "testing"
+import (
+	"fmt"
+	"testing"
+)
 
 
 var (
 var (
 	emptyOffsetFetchResponse = []byte{
 	emptyOffsetFetchResponse = []byte{
 		0x00, 0x00, 0x00, 0x00}
 		0x00, 0x00, 0x00, 0x00}
+
+	emptyOffsetFetchResponseV2 = []byte{
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x2A}
+
+	emptyOffsetFetchResponseV3 = []byte{
+		0x00, 0x00, 0x00, 0x09,
+		0x00, 0x00, 0x00, 0x00,
+		0x00, 0x2A}
 )
 )
 
 
 func TestEmptyOffsetFetchResponse(t *testing.T) {
 func TestEmptyOffsetFetchResponse(t *testing.T) {
-	response := OffsetFetchResponse{}
-	testResponse(t, "empty", &response, emptyOffsetFetchResponse)
+	for version := 0; version <= 1; version++ {
+		response := OffsetFetchResponse{Version: int16(version)}
+		testResponse(t, fmt.Sprintf("empty v%d", version), &response, emptyOffsetFetchResponse)
+	}
+
+	responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest}
+	testResponse(t, "empty V2", &responseV2, emptyOffsetFetchResponseV2)
+
+	for version := 3; version <= 5; version++ {
+		responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9}
+		testResponse(t, fmt.Sprintf("empty v%d", version), &responseV3, emptyOffsetFetchResponseV3)
+	}
 }
 }
 
 
 func TestNormalOffsetFetchResponse(t *testing.T) {
 func TestNormalOffsetFetchResponse(t *testing.T) {
-	response := OffsetFetchResponse{}
-	response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, "md", ErrRequestTimedOut})
-	response.Blocks["m"] = nil
 	// The response encoded form cannot be checked for it varies due to
 	// The response encoded form cannot be checked for it varies due to
 	// unpredictable map traversal order.
 	// unpredictable map traversal order.
-	testResponse(t, "normal", &response, nil)
+	// Hence the 'nil' as byte[] parameter in the 'testResponse(..)' calls
+
+	for version := 0; version <= 1; version++ {
+		response := OffsetFetchResponse{Version: int16(version)}
+		response.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
+		response.Blocks["m"] = nil
+		testResponse(t, fmt.Sprintf("Normal v%d", version), &response, nil)
+	}
+
+	responseV2 := OffsetFetchResponse{Version: 2, Err: ErrInvalidRequest}
+	responseV2.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
+	responseV2.Blocks["m"] = nil
+	testResponse(t, "normal V2", &responseV2, nil)
+
+	for version := 3; version <= 4; version++ {
+		responseV3 := OffsetFetchResponse{Version: int16(version), Err: ErrInvalidRequest, ThrottleTimeMs: 9}
+		responseV3.AddBlock("t", 0, &OffsetFetchResponseBlock{0, 0, "md", ErrRequestTimedOut})
+		responseV3.Blocks["m"] = nil
+		testResponse(t, fmt.Sprintf("Normal v%d", version), &responseV3, nil)
+	}
+
+	responseV5 := OffsetFetchResponse{Version: 5, Err: ErrInvalidRequest, ThrottleTimeMs: 9}
+	responseV5.AddBlock("t", 0, &OffsetFetchResponseBlock{Offset: 10, LeaderEpoch: 100, Metadata: "md", Err: ErrRequestTimedOut})
+	responseV5.Blocks["m"] = nil
+	testResponse(t, "normal V5", &responseV5, nil)
 }
 }

+ 29 - 5
offset_request.go

@@ -27,12 +27,20 @@ func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error)
 }
 }
 
 
 type OffsetRequest struct {
 type OffsetRequest struct {
-	Version int16
-	blocks  map[string]map[int32]*offsetRequestBlock
+	Version        int16
+	replicaID      int32
+	isReplicaIDSet bool
+	blocks         map[string]map[int32]*offsetRequestBlock
 }
 }
 
 
 func (r *OffsetRequest) encode(pe packetEncoder) error {
 func (r *OffsetRequest) encode(pe packetEncoder) error {
-	pe.putInt32(-1) // replica ID is always -1 for clients
+	if r.isReplicaIDSet {
+		pe.putInt32(r.replicaID)
+	} else {
+		// default replica ID is always -1 for clients
+		pe.putInt32(-1)
+	}
+
 	err := pe.putArrayLength(len(r.blocks))
 	err := pe.putArrayLength(len(r.blocks))
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -59,10 +67,14 @@ func (r *OffsetRequest) encode(pe packetEncoder) error {
 func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
 func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
 	r.Version = version
 	r.Version = version
 
 
-	// Ignore replica ID
-	if _, err := pd.getInt32(); err != nil {
+	replicaID, err := pd.getInt32()
+	if err != nil {
 		return err
 		return err
 	}
 	}
+	if replicaID >= 0 {
+		r.SetReplicaID(replicaID)
+	}
+
 	blockCount, err := pd.getArrayLength()
 	blockCount, err := pd.getArrayLength()
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -113,6 +125,18 @@ func (r *OffsetRequest) requiredVersion() KafkaVersion {
 	}
 	}
 }
 }
 
 
+func (r *OffsetRequest) SetReplicaID(id int32) {
+	r.replicaID = id
+	r.isReplicaIDSet = true
+}
+
+func (r *OffsetRequest) ReplicaID() int32 {
+	if r.isReplicaIDSet {
+		return r.replicaID
+	}
+	return -1
+}
+
 func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
 func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
 	if r.blocks == nil {
 	if r.blocks == nil {
 		r.blocks = make(map[string]map[int32]*offsetRequestBlock)
 		r.blocks = make(map[string]map[int32]*offsetRequestBlock)

+ 16 - 0
offset_request_test.go

@@ -23,6 +23,10 @@ var (
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x00, 0x00, 0x01,
 		0x00, 0x00, 0x00, 0x04,
 		0x00, 0x00, 0x00, 0x04,
 		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}
 		0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01}
+
+	offsetRequestReplicaID = []byte{
+		0x00, 0x00, 0x00, 0x2a,
+		0x00, 0x00, 0x00, 0x00}
 )
 )
 
 
 func TestOffsetRequest(t *testing.T) {
 func TestOffsetRequest(t *testing.T) {
@@ -41,3 +45,15 @@ func TestOffsetRequestV1(t *testing.T) {
 	request.AddBlock("bar", 4, 1, 2) // Last argument is ignored for V1
 	request.AddBlock("bar", 4, 1, 2) // Last argument is ignored for V1
 	testRequest(t, "one block", request, offsetRequestOneBlockV1)
 	testRequest(t, "one block", request, offsetRequestOneBlockV1)
 }
 }
+
+func TestOffsetRequestReplicaID(t *testing.T) {
+	request := new(OffsetRequest)
+	replicaID := int32(42)
+	request.SetReplicaID(replicaID)
+
+	if found := request.ReplicaID(); found != replicaID {
+		t.Errorf("replicaID: expected %v, found %v", replicaID, found)
+	}
+
+	testRequest(t, "with replica ID", request, offsetRequestReplicaID)
+}