Quellcode durchsuchen

Add DescribeLogDirs to admin client

Mickael Maison vor 4 Jahren
Ursprung
Commit
d89fa9dc14
4 geänderte Dateien mit 144 neuen und 0 gelöschten Zeilen
  1. 48 0
      admin.go
  2. 48 0
      admin_test.go
  3. 6 0
      describe_log_dirs_response.go
  4. 42 0
      mockresponses.go

+ 48 - 0
admin.go

@@ -101,6 +101,9 @@ type ClusterAdmin interface {
 	// Get information about the nodes in the cluster
 	DescribeCluster() (brokers []*Broker, controllerID int32, err error)
 
+	// Get information about all log directories on the given set of brokers
+	DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
+
 	// Close shuts down the admin and closes underlying client.
 	Close() error
 }
@@ -878,3 +881,48 @@ func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
 
 	return nil
 }
+
+func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
+	allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)
+
+	// Query brokers in parallel, since we may have to query multiple brokers
+	logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
+	errors := make(chan error, len(brokerIds))
+	wg := sync.WaitGroup{}
+
+	for _, b := range brokerIds {
+		wg.Add(1)
+		broker, err := ca.findBroker(b)
+		if err != nil {
+			Logger.Printf("Unable to find broker with ID = %v\n", b)
+			continue
+		}
+		go func(b *Broker, conf *Config) {
+			defer wg.Done()
+			_ = b.Open(conf) // Ensure that broker is opened
+
+			response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
+			if err != nil {
+				errors <- err
+				return
+			}
+			logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
+			logDirs[b.ID()] = response.LogDirs
+			logDirsMaps <- logDirs
+		}(broker, ca.conf)
+	}
+
+	wg.Wait()
+	close(logDirsMaps)
+	close(errors)
+
+	for logDirsMap := range logDirsMaps {
+		for id, logDirs := range logDirsMap {
+			allLogDirs[id] = logDirs
+		}
+	}
+
+	// Intentionally return only the first error for simplicity
+	err = <-errors
+	return
+}

+ 48 - 0
admin_test.go

@@ -1309,3 +1309,51 @@ func TestRefreshMetaDataWithDifferentController(t *testing.T) {
 			seedBroker2.BrokerID(), b.ID())
 	}
 }
+
+func TestDescribeLogDirs(t *testing.T) {
+	seedBroker := NewMockBroker(t, 1)
+	defer seedBroker.Close()
+
+	seedBroker.SetHandlerByMap(map[string]MockResponse{
+		"MetadataRequest": NewMockMetadataResponse(t).
+			SetController(seedBroker.BrokerID()).
+			SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
+		"DescribeLogDirsRequest": NewMockDescribeLogDirsResponse(t).
+			SetLogDirs("/tmp/logs", map[string]int{"topic1": 2, "topic2": 2}),
+	})
+
+	config := NewConfig()
+	config.Version = V1_0_0_0
+
+	admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	logDirsPerBroker, err := admin.DescribeLogDirs([]int32{seedBroker.BrokerID()})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(logDirsPerBroker) != 1 {
+		t.Fatalf("Expected %v results, got %v", 1, len(logDirsPerBroker))
+	}
+	logDirs := logDirsPerBroker[seedBroker.BrokerID()]
+	if len(logDirs) != 1 {
+		t.Fatalf("Expected log dirs for broker %v to be returned, but it did not, got %v", seedBroker.BrokerID(), len(logDirs))
+	}
+	logDirsBroker := logDirs[0]
+	if logDirsBroker.ErrorCode != ErrNoError {
+		t.Fatalf("Expected no error for broker %v, but it was %v", seedBroker.BrokerID(), logDirsBroker.ErrorCode)
+	}
+	if logDirsBroker.Path != "/tmp/logs" {
+		t.Fatalf("Expected log dirs for broker %v to be '/tmp/logs', but it was %v", seedBroker.BrokerID(), logDirsBroker.Path)
+	}
+	if len(logDirsBroker.Topics) != 2 {
+		t.Fatalf("Expected log dirs for broker %v to have 2 topics, but it had %v", seedBroker.BrokerID(), len(logDirsBroker.Topics))
+	}
+	err = admin.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+}

+ 6 - 0
describe_log_dirs_response.go

@@ -84,6 +84,9 @@ func (r *DescribeLogDirsResponseDirMetadata) encode(pe packetEncoder) error {
 		return err
 	}
 
+	if err := pe.putArrayLength(len(r.Topics)); err != nil {
+		return err
+	}
 	for _, topic := range r.Topics {
 		if err := topic.encode(pe); err != nil {
 			return err
@@ -137,6 +140,9 @@ func (r *DescribeLogDirsResponseTopic) encode(pe packetEncoder) error {
 		return err
 	}
 
+	if err := pe.putArrayLength(len(r.Partitions)); err != nil {
+		return err
+	}
 	for _, partition := range r.Partitions {
 		if err := partition.encode(pe); err != nil {
 			return err

+ 42 - 0
mockresponses.go

@@ -1029,3 +1029,45 @@ func (m *MockDeleteGroupsResponse) For(reqBody versionedDecoder) encoderWithHead
 	}
 	return resp
 }
+
+type MockDescribeLogDirsResponse struct {
+	t       TestReporter
+	logDirs []DescribeLogDirsResponseDirMetadata
+}
+
+func NewMockDescribeLogDirsResponse(t TestReporter) *MockDescribeLogDirsResponse {
+	return &MockDescribeLogDirsResponse{t: t}
+}
+
+func (m *MockDescribeLogDirsResponse) SetLogDirs(logDirPath string, topicPartitions map[string]int) *MockDescribeLogDirsResponse {
+	topics := []DescribeLogDirsResponseTopic{}
+	for topic := range topicPartitions {
+		partitions := []DescribeLogDirsResponsePartition{}
+		for i := 0; i < topicPartitions[topic]; i++ {
+			partitions = append(partitions, DescribeLogDirsResponsePartition{
+				PartitionID: int32(i),
+				IsTemporary: false,
+				OffsetLag:   int64(0),
+				Size:        int64(1234),
+			})
+		}
+		topics = append(topics, DescribeLogDirsResponseTopic{
+			Topic:      topic,
+			Partitions: partitions,
+		})
+	}
+	logDir := DescribeLogDirsResponseDirMetadata{
+		ErrorCode: ErrNoError,
+		Path:      logDirPath,
+		Topics:    topics,
+	}
+	m.logDirs = []DescribeLogDirsResponseDirMetadata{logDir}
+	return m
+}
+
+func (m *MockDescribeLogDirsResponse) For(reqBody versionedDecoder) encoderWithHeader {
+	resp := &DescribeLogDirsResponse{
+		LogDirs: m.logDirs,
+	}
+	return resp
+}