|
@@ -2,10 +2,6 @@ package sarama
|
|
|
|
|
|
|
|
import (
|
|
import (
|
|
|
"errors"
|
|
"errors"
|
|
|
- "fmt"
|
|
|
|
|
- "io/ioutil"
|
|
|
|
|
- "log"
|
|
|
|
|
- "os"
|
|
|
|
|
"strings"
|
|
"strings"
|
|
|
"testing"
|
|
"testing"
|
|
|
)
|
|
)
|
|
@@ -492,35 +488,64 @@ func TestClusterAdminDescribeConfig(t *testing.T) {
|
|
|
"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
|
|
"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
- config := NewConfig()
|
|
|
|
|
- config.Version = V1_0_0_0
|
|
|
|
|
- admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Fatal(err)
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ var tests = []struct {
|
|
|
|
|
+ saramaVersion KafkaVersion
|
|
|
|
|
+ requestVersion int16
|
|
|
|
|
+ includeSynonyms bool
|
|
|
|
|
+ }{
|
|
|
|
|
+ {V1_0_0_0, 0, false},
|
|
|
|
|
+ {V1_1_0_0, 1, true},
|
|
|
|
|
+ {V1_1_1_0, 1, true},
|
|
|
|
|
+ {V2_0_0_0, 2, true},
|
|
|
|
|
+ }
|
|
|
|
|
+ for _, tt := range tests {
|
|
|
|
|
+ config := NewConfig()
|
|
|
|
|
+ config.Version = tt.saramaVersion
|
|
|
|
|
+ admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ t.Fatal(err)
|
|
|
|
|
+ }
|
|
|
|
|
+ defer func() {
|
|
|
|
|
+ _ = admin.Close()
|
|
|
|
|
+ }()
|
|
|
|
|
+
|
|
|
|
|
+ resource := ConfigResource{
|
|
|
|
|
+ Name: "r1",
|
|
|
|
|
+ Type: TopicResource,
|
|
|
|
|
+ ConfigNames: []string{"my_topic"},
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- resource := ConfigResource{Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}}
|
|
|
|
|
- entries, err := admin.DescribeConfig(resource)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Fatal(err)
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ entries, err := admin.DescribeConfig(resource)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ t.Fatal(err)
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- if len(entries) <= 0 {
|
|
|
|
|
- t.Fatal(errors.New("no resource present"))
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ history := seedBroker.History()
|
|
|
|
|
+ describeReq, ok := history[len(history)-1].Request.(*DescribeConfigsRequest)
|
|
|
|
|
+ if !ok {
|
|
|
|
|
+ t.Fatal("failed to find DescribeConfigsRequest in mockBroker history")
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- err = admin.Close()
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Fatal(err)
|
|
|
|
|
|
|
+ if describeReq.Version != tt.requestVersion {
|
|
|
|
|
+ t.Fatalf(
|
|
|
|
|
+ "requestVersion %v did not match expected %v",
|
|
|
|
|
+ describeReq.Version, tt.requestVersion)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if len(entries) <= 0 {
|
|
|
|
|
+ t.Fatal(errors.New("no resource present"))
|
|
|
|
|
+ }
|
|
|
|
|
+ if tt.includeSynonyms {
|
|
|
|
|
+ if len(entries[0].Synonyms) == 0 {
|
|
|
|
|
+ t.Fatal("expected synonyms to have been included")
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// TestClusterAdminDescribeBrokerConfig ensures that a describe broker config
|
|
// TestClusterAdminDescribeBrokerConfig ensures that a describe broker config
|
|
|
// is sent to the broker in the resource struct, _not_ the controller
|
|
// is sent to the broker in the resource struct, _not_ the controller
|
|
|
func TestClusterAdminDescribeBrokerConfig(t *testing.T) {
|
|
func TestClusterAdminDescribeBrokerConfig(t *testing.T) {
|
|
|
- Logger = log.New(os.Stdout, fmt.Sprintf("[%s] ", t.Name()), log.LstdFlags)
|
|
|
|
|
- defer func() { Logger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags) }()
|
|
|
|
|
-
|
|
|
|
|
controllerBroker := NewMockBroker(t, 1)
|
|
controllerBroker := NewMockBroker(t, 1)
|
|
|
defer controllerBroker.Close()
|
|
defer controllerBroker.Close()
|
|
|
configBroker := NewMockBroker(t, 2)
|
|
configBroker := NewMockBroker(t, 2)
|