|
|
@@ -134,6 +134,53 @@ func TestClusterAdminCreateTopicWithDiffVersion(t *testing.T) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func TestClusterAdminListTopics(t *testing.T) {
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ defer seedBroker.Close()
|
|
|
+
|
|
|
+ seedBroker.SetHandlerByMap(map[string]MockResponse{
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
+ SetController(seedBroker.BrokerID()).
|
|
|
+ SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
|
|
|
+ SetLeader("my_topic", 0, seedBroker.BrokerID()),
|
|
|
+ "DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
|
|
|
+ })
|
|
|
+
|
|
|
+ config := NewConfig()
|
|
|
+ config.Version = V1_0_0_0
|
|
|
+ admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ entries, err := admin.ListTopics()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(entries) <= 0 {
|
|
|
+ t.Fatal(errors.New("no resource present"))
|
|
|
+ }
|
|
|
+
|
|
|
+ topic, found := entries["my_topic"]
|
|
|
+ if !found {
|
|
|
+ t.Fatal(errors.New("topic not found in response"))
|
|
|
+ }
|
|
|
+ _, found = topic.ConfigEntries["max.message.bytes"]
|
|
|
+ if found {
|
|
|
+ t.Fatal(errors.New("default topic config entry incorrectly found in response"))
|
|
|
+ }
|
|
|
+ value, _ := topic.ConfigEntries["retention.ms"]
|
|
|
+ if value == nil || *value != "5000" {
|
|
|
+ t.Fatal(errors.New("non-default topic config entry not found in response"))
|
|
|
+ }
|
|
|
+
|
|
|
+ err = admin.Close()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func TestClusterAdminDeleteTopic(t *testing.T) {
|
|
|
seedBroker := NewMockBroker(t, 1)
|
|
|
defer seedBroker.Close()
|
|
|
@@ -499,3 +546,243 @@ func TestClusterAdminDeleteAcl(t *testing.T) {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+func TestDescribeTopic(t *testing.T) {
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ defer seedBroker.Close()
|
|
|
+
|
|
|
+ seedBroker.SetHandlerByMap(map[string]MockResponse{
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
+ SetController(seedBroker.BrokerID()).
|
|
|
+ SetLeader("my_topic", 0, seedBroker.BrokerID()).
|
|
|
+ SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
|
|
|
+ })
|
|
|
+
|
|
|
+ config := NewConfig()
|
|
|
+ config.Version = V1_0_0_0
|
|
|
+
|
|
|
+ admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ topics, err := admin.DescribeTopics([]string{"my_topic"})
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(topics) != 1 {
|
|
|
+ t.Fatalf("Expected 1 result, got %v", len(topics))
|
|
|
+ }
|
|
|
+
|
|
|
+ if topics[0].Name != "my_topic" {
|
|
|
+ t.Fatalf("Incorrect topic name: %v", topics[0].Name)
|
|
|
+ }
|
|
|
+
|
|
|
+ err = admin.Close()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func TestDescribeConsumerGroup(t *testing.T) {
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ defer seedBroker.Close()
|
|
|
+
|
|
|
+ expectedGroupID := "my-group"
|
|
|
+
|
|
|
+ seedBroker.SetHandlerByMap(map[string]MockResponse{
|
|
|
+ "DescribeGroupsRequest": NewMockDescribeGroupsResponse(t).AddGroupDescription(expectedGroupID, &GroupDescription{
|
|
|
+ GroupId: expectedGroupID,
|
|
|
+ }),
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
+ SetController(seedBroker.BrokerID()).
|
|
|
+ SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
|
|
|
+ "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker),
|
|
|
+ })
|
|
|
+
|
|
|
+ config := NewConfig()
|
|
|
+ config.Version = V1_0_0_0
|
|
|
+
|
|
|
+ admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ result, err := admin.DescribeConsumerGroups([]string{expectedGroupID})
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(result) != 1 {
|
|
|
+ t.Fatalf("Expected 1 result, got %v", len(result))
|
|
|
+ }
|
|
|
+
|
|
|
+ if result[0].GroupId != expectedGroupID {
|
|
|
+ t.Fatalf("Expected groupID %v, got %v", expectedGroupID, result[0].GroupId)
|
|
|
+ }
|
|
|
+
|
|
|
+ err = admin.Close()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func TestListConsumerGroups(t *testing.T) {
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ defer seedBroker.Close()
|
|
|
+
|
|
|
+ seedBroker.SetHandlerByMap(map[string]MockResponse{
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
+ SetController(seedBroker.BrokerID()).
|
|
|
+ SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
|
|
|
+ "ListGroupsRequest": NewMockListGroupsResponse(t).
|
|
|
+ AddGroup("my-group", "consumer"),
|
|
|
+ })
|
|
|
+
|
|
|
+ config := NewConfig()
|
|
|
+ config.Version = V1_0_0_0
|
|
|
+
|
|
|
+ admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ groups, err := admin.ListConsumerGroups()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(groups) != 1 {
|
|
|
+ t.Fatalf("Expected %v results, got %v", 1, len(groups))
|
|
|
+ }
|
|
|
+
|
|
|
+ protocolType, ok := groups["my-group"]
|
|
|
+
|
|
|
+ if !ok {
|
|
|
+ t.Fatal("Expected group to be returned, but it did not")
|
|
|
+ }
|
|
|
+
|
|
|
+ if protocolType != "consumer" {
|
|
|
+ t.Fatalf("Expected protocolType %v, got %v", "consumer", protocolType)
|
|
|
+ }
|
|
|
+
|
|
|
+ err = admin.Close()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+func TestListConsumerGroupsMultiBroker(t *testing.T) {
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ defer seedBroker.Close()
|
|
|
+
|
|
|
+ secondBroker := NewMockBroker(t, 2)
|
|
|
+ defer secondBroker.Close()
|
|
|
+
|
|
|
+ firstGroup := "first"
|
|
|
+ secondGroup := "second"
|
|
|
+ nonExistingGroup := "non-existing-group"
|
|
|
+
|
|
|
+ seedBroker.SetHandlerByMap(map[string]MockResponse{
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
+ SetController(seedBroker.BrokerID()).
|
|
|
+ SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
|
|
|
+ SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
|
|
|
+ "ListGroupsRequest": NewMockListGroupsResponse(t).
|
|
|
+ AddGroup(firstGroup, "consumer"),
|
|
|
+ })
|
|
|
+
|
|
|
+ secondBroker.SetHandlerByMap(map[string]MockResponse{
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
+ SetController(seedBroker.BrokerID()).
|
|
|
+ SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
|
|
|
+ SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
|
|
|
+ "ListGroupsRequest": NewMockListGroupsResponse(t).
|
|
|
+ AddGroup(secondGroup, "consumer"),
|
|
|
+ })
|
|
|
+
|
|
|
+ config := NewConfig()
|
|
|
+ config.Version = V1_0_0_0
|
|
|
+
|
|
|
+ admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ groups, err := admin.ListConsumerGroups()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(groups) != 2 {
|
|
|
+ t.Fatalf("Expected %v results, got %v", 1, len(groups))
|
|
|
+ }
|
|
|
+
|
|
|
+ if _, found := groups[firstGroup]; !found {
|
|
|
+ t.Fatalf("Expected group %v to be present in result set, but it isn't", firstGroup)
|
|
|
+ }
|
|
|
+
|
|
|
+ if _, found := groups[secondGroup]; !found {
|
|
|
+ t.Fatalf("Expected group %v to be present in result set, but it isn't", secondGroup)
|
|
|
+ }
|
|
|
+
|
|
|
+ if _, found := groups[nonExistingGroup]; found {
|
|
|
+ t.Fatalf("Expected group %v to not exist, but it exists", nonExistingGroup)
|
|
|
+ }
|
|
|
+
|
|
|
+ err = admin.Close()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+func TestListConsumerGroupOffsets(t *testing.T) {
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ defer seedBroker.Close()
|
|
|
+
|
|
|
+ group := "my-group"
|
|
|
+ topic := "my-topic"
|
|
|
+ partition := int32(0)
|
|
|
+ expectedOffset := int64(0)
|
|
|
+
|
|
|
+ seedBroker.SetHandlerByMap(map[string]MockResponse{
|
|
|
+ "OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
|
|
|
+ "MetadataRequest": NewMockMetadataResponse(t).
|
|
|
+ SetController(seedBroker.BrokerID()).
|
|
|
+ SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
|
|
|
+ "FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
|
|
|
+ })
|
|
|
+
|
|
|
+ config := NewConfig()
|
|
|
+ config.Version = V1_0_0_0
|
|
|
+
|
|
|
+ admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ response, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{
|
|
|
+ topic: []int32{0},
|
|
|
+ })
|
|
|
+ if err != nil {
|
|
|
+ t.Fatalf("ListConsumerGroupOffsets failed with error %v", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ block := response.GetBlock(topic, partition)
|
|
|
+ if block == nil {
|
|
|
+ t.Fatalf("Expected block for topic %v and partition %v to exist, but it doesn't", topic, partition)
|
|
|
+ }
|
|
|
+
|
|
|
+ if block.Offset != expectedOffset {
|
|
|
+ t.Fatalf("Expected offset %v, got %v", expectedOffset, block.Offset)
|
|
|
+ }
|
|
|
+
|
|
|
+ err = admin.Close()
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+}
|