Browse Source

Add ListTopics func to ClusterAdmin interface

Signed-off-by: Dominic Evans <dominic.evans@uk.ibm.com>
Dominic Evans 7 years ago
parent
commit
dae6d49d70
3 changed files with 152 additions and 10 deletions
  1. 80 1
      admin.go
  2. 47 0
      admin_test.go
  3. 25 9
      mockresponses.go

+ 80 - 1
admin.go

@@ -1,6 +1,9 @@
 package sarama
 
-import "errors"
+import (
+	"errors"
+	"math/rand"
+)
 
 // ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
 // brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
@@ -13,6 +16,9 @@ type ClusterAdmin interface {
 	// may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
 	CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
 
+	// List the topics available in the cluster with the default options.
+	ListTopics() (map[string]TopicDetail, error)
+
 	// Delete a topic. It may take several seconds after the DeleteTopic to returns success
 	// and for all the brokers to become aware that the topics are gone.
 	// During this time, listTopics  may continue to return information about the deleted topic.
@@ -150,6 +156,79 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
 	return nil
 }
 
+func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
+	brokers := ca.client.Brokers()
+	if len(brokers) > 0 {
+		index := rand.Intn(len(brokers))
+		return brokers[index], nil
+	}
+	return nil, errors.New("no available broker")
+}
+
+func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
+	// In order to build TopicDetails we need to first get the list of all
+	// topics using a MetadataRequest and then get their configs using a
+	// DescribeConfigsRequest request. To avoid sending many requests to the
+	// broker, we use a single DescribeConfigsRequest.
+
+	// Send the all-topic MetadataRequest
+	b, err := ca.findAnyBroker()
+	if err != nil {
+		return nil, err
+	}
+
+	metadataReq := &MetadataRequest{}
+	metadataResp, err := b.GetMetadata(metadataReq)
+	if err != nil {
+		return nil, err
+	}
+
+	topicsDetailsMap := make(map[string]TopicDetail)
+
+	var describeConfigsResources []*ConfigResource
+
+	for _, topic := range metadataResp.Topics {
+		topicDetails := TopicDetail{
+			NumPartitions: int32(len(topic.Partitions)),
+		}
+		topicsDetailsMap[topic.Name] = topicDetails
+
+		// we populate the resources we want to describe from the MetadataResponse
+		topicResource := ConfigResource{
+			Type: TopicResource,
+			Name: topic.Name,
+		}
+		describeConfigsResources = append(describeConfigsResources, &topicResource)
+	}
+
+	// Send the DescribeConfigsRequest
+	describeConfigsReq := &DescribeConfigsRequest{
+		Resources: describeConfigsResources,
+	}
+	describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
+	if err != nil {
+		return nil, err
+	}
+
+	for _, resource := range describeConfigsResp.Resources {
+		topicDetails := topicsDetailsMap[resource.Name]
+		topicDetails.ConfigEntries = make(map[string]*string)
+
+		for _, entry := range resource.Configs {
+			// only include non-default non-sensitive config
+			// (don't actually think topic config will ever be sensitive)
+			if entry.Default || entry.Sensitive {
+				continue
+			}
+			topicDetails.ConfigEntries[entry.Name] = &entry.Value
+		}
+
+		topicsDetailsMap[resource.Name] = topicDetails
+	}
+
+	return topicsDetailsMap, nil
+}
+
 func (ca *clusterAdmin) DeleteTopic(topic string) error {
 
 	if topic == "" {

+ 47 - 0
admin_test.go

@@ -134,6 +134,53 @@ func TestClusterAdminCreateTopicWithDiffVersion(t *testing.T) {
 	}
 }
 
+func TestClusterAdminListTopics(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
+			SetLeader("my_topic", 0, seedBroker.BrokerID()),
+		"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	entries, err := admin.ListTopics()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(entries) <= 0 {
+		t.Fatal(errors.New("no resource present"))
+	}
+
+	topic, found := entries["my_topic"]
+	if !found {
+		t.Fatal(errors.New("topic not found in response"))
+	}
+	_, found = topic.ConfigEntries["max.message.bytes"]
+	if found {
+		t.Fatal(errors.New("default topic config entry incorrectly found in response"))
+	}
+	value, _ := topic.ConfigEntries["retention.ms"]
+	if value == nil || *value != "5000" {
+		t.Fatal(errors.New("non-default topic config entry not found in response"))
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
 func TestClusterAdminDeleteTopic(t *testing.T) {
 	seedBroker := NewMockBroker(t, 1)
 	defer seedBroker.Close()

+ 25 - 9
mockresponses.go

@@ -631,16 +631,32 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
 	req := reqBody.(*DescribeConfigsRequest)
 	res := &DescribeConfigsResponse{}
 
-	var configEntries []*ConfigEntry
-	configEntries = append(configEntries, &ConfigEntry{Name: "my_topic",
-		Value:     "my_topic",
-		ReadOnly:  true,
-		Default:   true,
-		Sensitive: false,
-	})
-
 	for _, r := range req.Resources {
-		res.Resources = append(res.Resources, &ResourceResponse{Name: r.Name, Configs: configEntries})
+		var configEntries []*ConfigEntry
+		switch r.Type {
+		case TopicResource:
+			configEntries = append(configEntries,
+				&ConfigEntry{Name: "max.message.bytes",
+					Value:     "1000000",
+					ReadOnly:  false,
+					Default:   true,
+					Sensitive: false,
+				}, &ConfigEntry{Name: "retention.ms",
+					Value:     "5000",
+					ReadOnly:  false,
+					Default:   false,
+					Sensitive: false,
+				}, &ConfigEntry{Name: "password",
+					Value:     "12345",
+					ReadOnly:  false,
+					Default:   false,
+					Sensitive: true,
+				})
+			res.Resources = append(res.Resources, &ResourceResponse{
+				Name:    r.Name,
+				Configs: configEntries,
+			})
+		}
 	}
 	return res
 }