Browse Source

Implement DescribeConfigsRequest v1 & v2

Conor Mongey 7 years ago
parent
commit
fbd8338718
2 changed files with 57 additions and 7 deletions
  1. 27 6
      describe_configs_request.go
  2. 30 1
      describe_configs_request_test.go

+ 27 - 6
describe_configs_request.go

@@ -1,15 +1,17 @@
 package sarama
 
+type DescribeConfigsRequest struct {
+	Version         int16
+	Resources       []*ConfigResource
+	IncludeSynonyms bool
+}
+
 type ConfigResource struct {
 	Type        ConfigResourceType
 	Name        string
 	ConfigNames []string
 }
 
-type DescribeConfigsRequest struct {
-	Resources []*ConfigResource
-}
-
 func (r *DescribeConfigsRequest) encode(pe packetEncoder) error {
 	if err := pe.putArrayLength(len(r.Resources)); err != nil {
 		return err
@@ -30,6 +32,10 @@ func (r *DescribeConfigsRequest) encode(pe packetEncoder) error {
 		}
 	}
 
+	if r.Version >= 1 {
+		pe.putBool(r.IncludeSynonyms)
+	}
+
 	return nil
 }
 
@@ -74,6 +80,14 @@ func (r *DescribeConfigsRequest) decode(pd packetDecoder, version int16) (err er
 		}
 		r.Resources[i].ConfigNames = cfnames
 	}
+	r.Version = version
+	if r.Version >= 1 {
+		b, err := pd.getBool()
+		if err != nil {
+			return err
+		}
+		r.IncludeSynonyms = b
+	}
 
 	return nil
 }
@@ -83,9 +97,16 @@ func (r *DescribeConfigsRequest) key() int16 {
 }
 
 func (r *DescribeConfigsRequest) version() int16 {
-	return 0
+	return r.Version
 }
 
 func (r *DescribeConfigsRequest) requiredVersion() KafkaVersion {
-	return V0_11_0_0
+	switch r.Version {
+	case 1:
+		return V1_0_0_0
+	case 2:
+		return V2_0_0_0
+	default:
+		return V0_11_0_0
+	}
 }

+ 30 - 1
describe_configs_request_test.go

@@ -33,23 +33,33 @@ var (
 	}
 
 	singleDescribeConfigsRequestAllConfigs = []byte{
+		0, 0, 0, 1, // 1 config
+		2,                   // a topic
+		0, 3, 'f', 'o', 'o', // topic name: foo
+		255, 255, 255, 255, // all configs
+	}
+
+	singleDescribeConfigsRequestAllConfigsv1 = []byte{
 		0, 0, 0, 1, // 1 config
 		2,                   // a topic
 		0, 3, 'f', 'o', 'o', // topic name: foo
 		255, 255, 255, 255, // no configs
+		1, //synoms
 	}
 )
 
-func TestDescribeConfigsRequest(t *testing.T) {
+func TestDescribeConfigsRequestv0(t *testing.T) {
 	var request *DescribeConfigsRequest
 
 	request = &DescribeConfigsRequest{
+		Version:   0,
 		Resources: []*ConfigResource{},
 	}
 	testRequest(t, "no requests", request, emptyDescribeConfigsRequest)
 
 	configs := []string{"segment.ms"}
 	request = &DescribeConfigsRequest{
+		Version: 0,
 		Resources: []*ConfigResource{
 			&ConfigResource{
 				Type:        TopicResource,
@@ -62,6 +72,7 @@ func TestDescribeConfigsRequest(t *testing.T) {
 	testRequest(t, "one config", request, singleDescribeConfigsRequest)
 
 	request = &DescribeConfigsRequest{
+		Version: 0,
 		Resources: []*ConfigResource{
 			&ConfigResource{
 				Type:        TopicResource,
@@ -78,6 +89,7 @@ func TestDescribeConfigsRequest(t *testing.T) {
 	testRequest(t, "two configs", request, doubleDescribeConfigsRequest)
 
 	request = &DescribeConfigsRequest{
+		Version: 0,
 		Resources: []*ConfigResource{
 			&ConfigResource{
 				Type: TopicResource,
@@ -88,3 +100,20 @@ func TestDescribeConfigsRequest(t *testing.T) {
 
 	testRequest(t, "one topic, all configs", request, singleDescribeConfigsRequestAllConfigs)
 }
+
+func TestDescribeConfigsRequestv1(t *testing.T) {
+	var request *DescribeConfigsRequest
+
+	request = &DescribeConfigsRequest{
+		Version: 1,
+		Resources: []*ConfigResource{
+			{
+				Type: TopicResource,
+				Name: "foo",
+			},
+		},
+		IncludeSynonyms: true,
+	}
+
+	testRequest(t, "one topic, all configs", request, singleDescribeConfigsRequestAllConfigsv1)
+}