Browse Source

fix: set DescribeConfigRequest Version field

Although fbd8338 had added protocol support for DescribeConfigsRequest
v1 and v2, nothing in the admin client was actually setting the Version
field to utilise these. Set the Version based on the selected
sarama.Config version.

Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
Dominic Evans 6 năm trước cách đây
mục cha
commit
5de8dba37c
3 tập tin đã thay đổi với 98 bổ sung36 xóa
  1. 8 1
      admin.go
  2. 49 17
      admin_test.go
  3. 41 18
      mockresponses.go

+ 8 - 1
admin.go

@@ -452,7 +452,6 @@ func dependsOnSpecificNode(resource ConfigResource) bool {
 }
 
 func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
-
 	var entries []ConfigEntry
 	var resources []*ConfigResource
 	resources = append(resources, &resource)
@@ -461,6 +460,14 @@ func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry,
 		Resources: resources,
 	}
 
+	if ca.conf.Version.IsAtLeast(V1_1_0_0) {
+		request.Version = 1
+	}
+
+	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		request.Version = 2
+	}
+
 	var (
 		b   *Broker
 		err error

+ 49 - 17
admin_test.go

@@ -492,26 +492,58 @@ func TestClusterAdminDescribeConfig(t *testing.T) {
 		"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
 	})
 
-	config := NewConfig()
-	config.Version = V1_0_0_0
-	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
-	if err != nil {
-		t.Fatal(err)
-	}
+	var tests = []struct {
+		saramaVersion   KafkaVersion
+		requestVersion  int16
+		includeSynonyms bool
+	}{
+		{V1_0_0_0, 0, false},
+		{V1_1_0_0, 1, true},
+		{V1_1_1_0, 1, true},
+		{V2_0_0_0, 2, true},
+	}
+	for _, tt := range tests {
+		config := NewConfig()
+		config.Version = tt.saramaVersion
+		admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+		if err != nil {
+			t.Fatal(err)
+		}
+		defer func() {
+			_ = admin.Close()
+		}()
+
+		resource := ConfigResource{
+			Name:        "r1",
+			Type:        TopicResource,
+			ConfigNames: []string{"my_topic"},
+		}
 
-	resource := ConfigResource{Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}}
-	entries, err := admin.DescribeConfig(resource)
-	if err != nil {
-		t.Fatal(err)
-	}
+		entries, err := admin.DescribeConfig(resource)
+		if err != nil {
+			t.Fatal(err)
+		}
 
-	if len(entries) <= 0 {
-		t.Fatal(errors.New("no resource present"))
-	}
+		history := seedBroker.History()
+		describeReq, ok := history[len(history)-1].Request.(*DescribeConfigsRequest)
+		if !ok {
+			t.Fatal("failed to find DescribeConfigsRequest in mockBroker history")
+		}
 
-	err = admin.Close()
-	if err != nil {
-		t.Fatal(err)
+		if describeReq.Version != tt.requestVersion {
+			t.Fatalf(
+				"requestVersion %v did not match expected %v",
+				describeReq.Version, tt.requestVersion)
+		}
+
+		if len(entries) <= 0 {
+			t.Fatal(errors.New("no resource present"))
+		}
+		if tt.includeSynonyms {
+			if len(entries[0].Synonyms) == 0 {
+				t.Fatal("expected synonyms to have been included")
+			}
+		}
 	}
 }
 

+ 41 - 18
mockresponses.go

@@ -731,7 +731,11 @@ func NewMockDescribeConfigsResponse(t TestReporter) *MockDescribeConfigsResponse
 
 func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
 	req := reqBody.(*DescribeConfigsRequest)
-	res := &DescribeConfigsResponse{}
+	res := &DescribeConfigsResponse{
+		Version: req.Version,
+	}
+
+	includeSynonyms := (req.Version > 0)
 
 	for _, r := range req.Resources {
 		var configEntries []*ConfigEntry
@@ -763,23 +767,42 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
 				Configs: configEntries,
 			})
 		case TopicResource:
-			configEntries = append(configEntries,
-				&ConfigEntry{Name: "max.message.bytes",
-					Value:     "1000000",
-					ReadOnly:  false,
-					Default:   true,
-					Sensitive: false,
-				}, &ConfigEntry{Name: "retention.ms",
-					Value:     "5000",
-					ReadOnly:  false,
-					Default:   false,
-					Sensitive: false,
-				}, &ConfigEntry{Name: "password",
-					Value:     "12345",
-					ReadOnly:  false,
-					Default:   false,
-					Sensitive: true,
-				})
+			maxMessageBytes := &ConfigEntry{Name: "max.message.bytes",
+				Value:     "1000000",
+				ReadOnly:  false,
+				Default:   true,
+				Sensitive: false,
+			}
+			if includeSynonyms {
+				maxMessageBytes.Synonyms = []*ConfigSynonym{
+					{
+						ConfigName:  "max.message.bytes",
+						ConfigValue: "500000",
+					},
+				}
+			}
+			retentionMs := &ConfigEntry{Name: "retention.ms",
+				Value:     "5000",
+				ReadOnly:  false,
+				Default:   false,
+				Sensitive: false,
+			}
+			if includeSynonyms {
+				retentionMs.Synonyms = []*ConfigSynonym{
+					{
+						ConfigName:  "log.retention.ms",
+						ConfigValue: "2500",
+					},
+				}
+			}
+			password := &ConfigEntry{Name: "password",
+				Value:     "12345",
+				ReadOnly:  false,
+				Default:   false,
+				Sensitive: true,
+			}
+			configEntries = append(
+				configEntries, maxMessageBytes, retentionMs, password)
 			res.Resources = append(res.Resources, &ResourceResponse{
 				Name:    r.Name,
 				Configs: configEntries,