Pārlūkot izejas kodu

admin: Add some missing admin methods

DescribeCluster
DescribeTopic
DescribeConsumerGroups
ListConsumerGroups
ListConsumerGroupOffsets
Johannes Brüderl 7 gadi atpakaļ
vecāks
revīzija
3645a3453d
3 mainītis faili ar 449 papildinājumiem un 1 dzēšanām
  1. 146 1
      admin.go
  2. 240 0
      admin_test.go
  3. 63 0
      mockresponses.go

+ 146 - 1
admin.go

@@ -3,6 +3,7 @@ package sarama
 import (
 	"errors"
 	"math/rand"
+	"sync"
 )
 
 // ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
@@ -19,6 +20,9 @@ type ClusterAdmin interface {
 	// List the topics available in the cluster with the default options.
 	ListTopics() (map[string]TopicDetail, error)
 
+	// Describe some topics in the cluster
+	DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
+
 	// Delete a topic. It may take several seconds after the DeleteTopic to returns success
 	// and for all the brokers to become aware that the topics are gone.
 	// During this time, listTopics  may continue to return information about the deleted topic.
@@ -71,6 +75,18 @@ type ClusterAdmin interface {
 	// This operation is supported by brokers with version 0.11.0.0 or higher.
 	DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
 
+	// List the consumer groups available in the cluster.
+	ListConsumerGroups() (map[string]string, error)
+
+	// Describe the given consumer group
+	DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
+
+	// List the consumer group offsets available in the cluster.
+	ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
+
+	// Get information about the nodes in the cluster
+	DescribeCluster() (brokers []*Broker, controllerID int32, err error)
+
 	// Close shuts down the admin and closes underlying client.
 	Close() error
 }
@@ -156,6 +172,46 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
 	return nil
 }
 
+func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
+	controller, err := ca.Controller()
+	if err != nil {
+		return nil, err
+	}
+
+	request := &MetadataRequest{
+		Topics:                 topics,
+		AllowAutoTopicCreation: false,
+	}
+
+	if ca.conf.Version.IsAtLeast(V0_11_0_0) {
+		request.Version = 4
+	}
+
+	response, err := controller.GetMetadata(request)
+	if err != nil {
+		return nil, err
+	}
+	return response.Topics, nil
+}
+
+func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
+	controller, err := ca.Controller()
+	if err != nil {
+		return nil, int32(0), err
+	}
+
+	request := &MetadataRequest{
+		Topics: []string{},
+	}
+
+	response, err := controller.GetMetadata(request)
+	if err != nil {
+		return nil, int32(0), err
+	}
+
+	return response.Brokers, response.ControllerID, nil
+}
+
 func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
 	brokers := ca.client.Brokers()
 	if len(brokers) > 0 {
@@ -176,7 +232,7 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
 	if err != nil {
 		return nil, err
 	}
-	b.Open(ca.client.Config())
+	_ = b.Open(ca.client.Config())
 
 	metadataReq := &MetadataRequest{}
 	metadataResp, err := b.GetMetadata(metadataReq)
@@ -463,3 +519,92 @@ func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]Matchi
 	}
 	return mAcls, nil
 }
+
+func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
+	groupsPerBroker := make(map[*Broker][]string)
+
+	for _, group := range groups {
+		controller, err := ca.client.Coordinator(group)
+		if err != nil {
+			return nil, err
+		}
+		groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
+
+	}
+
+	for broker, brokerGroups := range groupsPerBroker {
+		response, err := broker.DescribeGroups(&DescribeGroupsRequest{
+			Groups: brokerGroups,
+		})
+		if err != nil {
+			return nil, err
+		}
+
+		result = append(result, response.Groups...)
+	}
+	return result, nil
+}
+
+func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
+	allGroups = make(map[string]string)
+
+	// Query brokers in parallel, since we have to query *all* brokers
+	brokers := ca.client.Brokers()
+	groupMaps := make(chan map[string]string, len(brokers))
+	errors := make(chan error, len(brokers))
+	wg := sync.WaitGroup{}
+
+	for _, b := range brokers {
+		wg.Add(1)
+		go func(b *Broker, conf *Config) {
+			defer wg.Done()
+			_ = b.Open(conf) // Ensure that broker is opened
+
+			response, err := b.ListGroups(&ListGroupsRequest{})
+			if err != nil {
+				errors <- err
+				return
+			}
+
+			groups := make(map[string]string)
+			for group, typ := range response.Groups {
+				groups[group] = typ
+			}
+
+			groupMaps <- groups
+
+		}(b, ca.conf)
+	}
+
+	wg.Wait()
+	close(groupMaps)
+	close(errors)
+
+	for groupMap := range groupMaps {
+		for group, protocolType := range groupMap {
+			allGroups[group] = protocolType
+		}
+	}
+
+	// Intentionally return only the first error for simplicity
+	err = <-errors
+	return
+}
+
+func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
+	coordinator, err := ca.client.Coordinator(group)
+	if err != nil {
+		return nil, err
+	}
+
+	request := &OffsetFetchRequest{
+		ConsumerGroup: group,
+		partitions:    topicPartitions,
+	}
+
+	if ca.conf.Version.IsAtLeast(V0_8_2_2) {
+		request.Version = 1
+	}
+
+	return coordinator.FetchOffset(request)
+}

+ 240 - 0
admin_test.go

@@ -546,3 +546,243 @@ func TestClusterAdminDeleteAcl(t *testing.T) {
 		t.Fatal(err)
 	}
 }
+
+func TestDescribeTopic(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetLeader("my_topic", 0, seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	topics, err := admin.DescribeTopics([]string{"my_topic"})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(topics) != 1 {
+		t.Fatalf("Expected 1 result, got %v", len(topics))
+	}
+
+	if topics[0].Name != "my_topic" {
+		t.Fatalf("Incorrect topic name: %v", topics[0].Name)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestDescribeConsumerGroup(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	expectedGroupID := "my-group"
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"DescribeGroupsRequest": NewMockDescribeGroupsResponse(t).AddGroupDescription(expectedGroupID, &GroupDescription{
+			GroupId: expectedGroupID,
+		}),
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, expectedGroupID, seedBroker),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	result, err := admin.DescribeConsumerGroups([]string{expectedGroupID})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(result) != 1 {
+		t.Fatalf("Expected 1 result, got %v", len(result))
+	}
+
+	if result[0].GroupId != expectedGroupID {
+		t.Fatalf("Expected groupID %v, got %v", expectedGroupID, result[0].GroupId)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+func TestListConsumerGroups(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"ListGroupsRequest": NewMockListGroupsResponse(t).
+			AddGroup("my-group", "consumer"),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	groups, err := admin.ListConsumerGroups()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(groups) != 1 {
+		t.Fatalf("Expected %v results, got %v", 1, len(groups))
+	}
+
+	protocolType, ok := groups["my-group"]
+
+	if !ok {
+		t.Fatal("Expected group to be returned, but it did not")
+	}
+
+	if protocolType != "consumer" {
+		t.Fatalf("Expected protocolType %v, got %v", "consumer", protocolType)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+}
+
+func TestListConsumerGroupsMultiBroker(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	secondBroker := NewMockBroker(t, 2)
+	defer secondBroker.Close()
+
+	firstGroup := "first"
+	secondGroup := "second"
+	nonExistingGroup := "non-existing-group"
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
+			SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
+		"ListGroupsRequest": NewMockListGroupsResponse(t).
+			AddGroup(firstGroup, "consumer"),
+	})
+
+	secondBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
+			SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
+		"ListGroupsRequest": NewMockListGroupsResponse(t).
+			AddGroup(secondGroup, "consumer"),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	groups, err := admin.ListConsumerGroups()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(groups) != 2 {
+		t.Fatalf("Expected %v results, got %v", 1, len(groups))
+	}
+
+	if _, found := groups[firstGroup]; !found {
+		t.Fatalf("Expected group %v to be present in result set, but it isn't", firstGroup)
+	}
+
+	if _, found := groups[secondGroup]; !found {
+		t.Fatalf("Expected group %v to be present in result set, but it isn't", secondGroup)
+	}
+
+	if _, found := groups[nonExistingGroup]; found {
+		t.Fatalf("Expected group %v to not exist, but it exists", nonExistingGroup)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+}
+
+func TestListConsumerGroupOffsets(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	group := "my-group"
+	topic := "my-topic"
+	partition := int32(0)
+	expectedOffset := int64(0)
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"OffsetFetchRequest": NewMockOffsetFetchResponse(t).SetOffset(group, "my-topic", partition, expectedOffset, "", ErrNoError),
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"FindCoordinatorRequest": NewMockFindCoordinatorResponse(t).SetCoordinator(CoordinatorGroup, group, seedBroker),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	response, err := admin.ListConsumerGroupOffsets(group, map[string][]int32{
+		topic: []int32{0},
+	})
+	if err != nil {
+		t.Fatalf("ListConsumerGroupOffsets failed with error %v", err)
+	}
+
+	block := response.GetBlock(topic, partition)
+	if block == nil {
+		t.Fatalf("Expected block for topic %v and partition %v to exist, but it doesn't", topic, partition)
+	}
+
+	if block.Offset != expectedOffset {
+		t.Fatalf("Expected offset %v, got %v", expectedOffset, block.Offset)
+	}
+
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+}

+ 63 - 0
mockresponses.go

@@ -66,6 +66,69 @@ func (mc *MockSequence) For(reqBody versionedDecoder) (res encoder) {
 	return res
 }
 
+type MockListGroupsResponse struct {
+	groups map[string]string
+	t      TestReporter
+}
+
+func NewMockListGroupsResponse(t TestReporter) *MockListGroupsResponse {
+	return &MockListGroupsResponse{
+		groups: make(map[string]string),
+		t:      t,
+	}
+}
+
+func (m *MockListGroupsResponse) For(reqBody versionedDecoder) encoder {
+	request := reqBody.(*ListGroupsRequest)
+	_ = request
+	response := &ListGroupsResponse{
+		Groups: m.groups,
+	}
+	return response
+}
+
+func (m *MockListGroupsResponse) AddGroup(groupID, protocolType string) *MockListGroupsResponse {
+	m.groups[groupID] = protocolType
+	return m
+}
+
+type MockDescribeGroupsResponse struct {
+	groups map[string]*GroupDescription
+	t      TestReporter
+}
+
+func NewMockDescribeGroupsResponse(t TestReporter) *MockDescribeGroupsResponse {
+	return &MockDescribeGroupsResponse{
+		t:      t,
+		groups: make(map[string]*GroupDescription),
+	}
+}
+
+func (m *MockDescribeGroupsResponse) AddGroupDescription(groupID string, description *GroupDescription) *MockDescribeGroupsResponse {
+	m.groups[groupID] = description
+	return m
+}
+
+func (m *MockDescribeGroupsResponse) For(reqBody versionedDecoder) encoder {
+	request := reqBody.(*DescribeGroupsRequest)
+
+	response := &DescribeGroupsResponse{}
+	for _, requestedGroup := range request.Groups {
+		if group, ok := m.groups[requestedGroup]; ok {
+			response.Groups = append(response.Groups, group)
+		} else {
+			// Mimic real kafka - if a group doesn't exist, return
+			// an entry with state "Dead"
+			response.Groups = append(response.Groups, &GroupDescription{
+				GroupId: requestedGroup,
+				State:   "Dead",
+			})
+		}
+	}
+
+	return response
+}
+
 // MockMetadataResponse is a `MetadataResponse` builder.
 type MockMetadataResponse struct {
 	controllerID int32