瀏覽代碼

Sets ConfigEntry.Default flag in addition to the ConfigEntry.Source for Kafka versions > V1_1_0_0 (#1594)

* Set describeConfigsRequest.Version in ListTopics for consistency with DescribeConfig

This breaks the output of ListTopics for newer request versions, it now includes default configuration settings.

* Set ConfigEntry.Default for KafkaVersions > 0

Clients can now rely on the `Default` flag again and don't have to check the `Source` for higher Kafka versions.

* Set ConfigEntry.Source to default for KafkaVersions <= 0 when applicable

* Add tests for default flag/source
Leonid 4 年之前
父節點
當前提交
6d92277014
共有 5 個文件被更改,包括 123 次插入2 次删除
  1. 9 0
      admin.go
  2. 1 1
      admin_test.go
  3. 4 0
      describe_configs_response.go
  4. 104 0
      describe_configs_response_test.go
  5. 5 1
      mockresponses.go

+ 9 - 0
admin.go

@@ -338,6 +338,15 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
 	describeConfigsReq := &DescribeConfigsRequest{
 		Resources: describeConfigsResources,
 	}
+
+	if ca.conf.Version.IsAtLeast(V1_1_0_0) {
+		describeConfigsReq.Version = 1
+	}
+
+	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		describeConfigsReq.Version = 2
+	}
+
 	describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
 	if err != nil {
 		return nil, err

+ 1 - 1
admin_test.go

@@ -149,7 +149,7 @@ func TestClusterAdminListTopics(t *testing.T) {
 	})
 
 	config := NewConfig()
-	config.Version = V1_0_0_0
+	config.Version = V1_1_0_0
 	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
 	if err != nil {
 		t.Fatal(err)

+ 4 - 0
describe_configs_response.go

@@ -249,12 +249,16 @@ func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
 			return err
 		}
 		r.Default = defaultB
+		if defaultB {
+			r.Source = SourceDefault
+		}
 	} else {
 		source, err := pd.getInt8()
 		if err != nil {
 			return err
 		}
 		r.Source = ConfigSource(source)
+		r.Default = r.Source == SourceDefault
 	}
 
 	sensitive, err := pd.getBool()

+ 104 - 0
describe_configs_response_test.go

@@ -25,6 +25,21 @@ var (
 		0, // Sensitive
 	}
 
+	describeConfigsResponseWithDefaultv0 = []byte{
+		0, 0, 0, 0, //throttle
+		0, 0, 0, 1, // response
+		0, 0, //errorcode
+		0, 0, //string
+		2, // topic
+		0, 3, 'f', 'o', 'o',
+		0, 0, 0, 1, //configs
+		0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
+		0, 4, '1', '0', '0', '0',
+		0, // ReadOnly
+		1, // Default
+		0, // Sensitive
+	}
+
 	describeConfigsResponsePopulatedv1 = []byte{
 		0, 0, 0, 0, //throttle
 		0, 0, 0, 1, // response
@@ -59,6 +74,22 @@ var (
 		0, 4, '1', '0', '0', '0',
 		4, // Source
 	}
+
+	describeConfigsResponseWithDefaultv1 = []byte{
+		0, 0, 0, 0, //throttle
+		0, 0, 0, 1, // response
+		0, 0, //errorcode
+		0, 0, //string
+		2, // topic
+		0, 3, 'f', 'o', 'o',
+		0, 0, 0, 1, //configs
+		0, 10, 's', 'e', 'g', 'm', 'e', 'n', 't', '.', 'm', 's',
+		0, 4, '1', '0', '0', '0',
+		0,          // ReadOnly
+		5,          // Source
+		0,          // Sensitive
+		0, 0, 0, 0, // No Synonym
+	}
 )
 
 func TestDescribeConfigsResponsev0(t *testing.T) {
@@ -86,6 +117,7 @@ func TestDescribeConfigsResponsev0(t *testing.T) {
 						ReadOnly:  false,
 						Default:   false,
 						Sensitive: false,
+						Source:    SourceUnknown,
 					},
 				},
 			},
@@ -94,6 +126,40 @@ func TestDescribeConfigsResponsev0(t *testing.T) {
 	testResponse(t, "response with error", response, describeConfigsResponsePopulatedv0)
 }
 
+func TestDescribeConfigsResponseWithDefaultv0(t *testing.T) {
+	var response *DescribeConfigsResponse
+
+	response = &DescribeConfigsResponse{
+		Resources: []*ResourceResponse{},
+	}
+	testVersionDecodable(t, "empty", response, describeConfigsResponseEmpty, 0)
+	if len(response.Resources) != 0 {
+		t.Error("Expected no groups")
+	}
+
+	response = &DescribeConfigsResponse{
+		Version: 0, Resources: []*ResourceResponse{
+			{
+				ErrorCode: 0,
+				ErrorMsg:  "",
+				Type:      TopicResource,
+				Name:      "foo",
+				Configs: []*ConfigEntry{
+					{
+						Name:      "segment.ms",
+						Value:     "1000",
+						ReadOnly:  false,
+						Default:   true,
+						Sensitive: false,
+						Source:    SourceDefault,
+					},
+				},
+			},
+		},
+	}
+	testResponse(t, "response with default", response, describeConfigsResponseWithDefaultv0)
+}
+
 func TestDescribeConfigsResponsev1(t *testing.T) {
 	var response *DescribeConfigsResponse
 
@@ -119,6 +185,7 @@ func TestDescribeConfigsResponsev1(t *testing.T) {
 						Value:     "1000",
 						ReadOnly:  false,
 						Source:    SourceStaticBroker,
+						Default:   false,
 						Sensitive: false,
 						Synonyms:  []*ConfigSynonym{},
 					},
@@ -154,6 +221,7 @@ func TestDescribeConfigsResponseWithSynonym(t *testing.T) {
 						Value:     "1000",
 						ReadOnly:  false,
 						Source:    SourceStaticBroker,
+						Default:   false,
 						Sensitive: false,
 						Synonyms: []*ConfigSynonym{
 							{
@@ -169,3 +237,39 @@ func TestDescribeConfigsResponseWithSynonym(t *testing.T) {
 	}
 	testResponse(t, "response with error", response, describeConfigsResponseWithSynonymv1)
 }
+
+func TestDescribeConfigsResponseWithDefaultv1(t *testing.T) {
+	var response *DescribeConfigsResponse
+
+	response = &DescribeConfigsResponse{
+		Resources: []*ResourceResponse{},
+	}
+	testVersionDecodable(t, "empty", response, describeConfigsResponseEmpty, 0)
+	if len(response.Resources) != 0 {
+		t.Error("Expected no groups")
+	}
+
+	response = &DescribeConfigsResponse{
+		Version: 1,
+		Resources: []*ResourceResponse{
+			{
+				ErrorCode: 0,
+				ErrorMsg:  "",
+				Type:      TopicResource,
+				Name:      "foo",
+				Configs: []*ConfigEntry{
+					{
+						Name:      "segment.ms",
+						Value:     "1000",
+						ReadOnly:  false,
+						Source:    SourceDefault,
+						Default:   true,
+						Sensitive: false,
+						Synonyms:  []*ConfigSynonym{},
+					},
+				},
+			},
+		},
+	}
+	testResponse(t, "response with error", response, describeConfigsResponseWithDefaultv1)
+}

+ 5 - 1
mockresponses.go

@@ -736,6 +736,7 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
 	}
 
 	includeSynonyms := (req.Version > 0)
+	includeSource := (req.Version > 0)
 
 	for _, r := range req.Resources {
 		var configEntries []*ConfigEntry
@@ -770,9 +771,12 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
 			maxMessageBytes := &ConfigEntry{Name: "max.message.bytes",
 				Value:     "1000000",
 				ReadOnly:  false,
-				Default:   true,
+				Default:   !includeSource,
 				Sensitive: false,
 			}
+			if includeSource {
+				maxMessageBytes.Source = SourceDefault
+			}
 			if includeSynonyms {
 				maxMessageBytes.Synonyms = []*ConfigSynonym{
 					{