Browse Source

Merge pull request #1274 from dnwe/include-replica-assignment-in-topicdetail

admin: include ReplicaAssignment in ListTopics()
Varun 6 years ago
parent
commit
ba151e16de
3 changed files with 18 additions and 2 deletions
  1. 4 0
      admin.go
  2. 4 0
      admin_test.go
  3. 10 2
      mockresponses.go

+ 4 - 0
admin.go

@@ -249,6 +249,10 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
 			NumPartitions: int32(len(topic.Partitions)),
 		}
 		if len(topic.Partitions) > 0 {
+			topicDetails.ReplicaAssignment = map[int32][]int32{}
+			for _, partition := range topic.Partitions {
+				topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
+			}
 			topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
 		}
 		topicsDetailsMap[topic.Name] = topicDetails

+ 4 - 0
admin_test.go

@@ -179,6 +179,10 @@ func TestClusterAdminListTopics(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
+
+	if topic.ReplicaAssignment == nil || topic.ReplicaAssignment[0][0] != 1 {
+		t.Fatal(errors.New("replica assignment not found in response"))
+	}
 }
 
 func TestClusterAdminDeleteTopic(t *testing.T) {

+ 10 - 2
mockresponses.go

@@ -174,17 +174,25 @@ func (mmr *MockMetadataResponse) For(reqBody versionedDecoder) encoder {
 	for addr, brokerID := range mmr.brokers {
 		metadataResponse.AddBroker(addr, brokerID)
 	}
+
+	// Generate set of replicas
+	replicas := []int32{}
+
+	for _, brokerID := range mmr.brokers {
+		replicas = append(replicas, brokerID)
+	}
+
 	if len(metadataRequest.Topics) == 0 {
 		for topic, partitions := range mmr.leaders {
 			for partition, brokerID := range partitions {
-				metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
+				metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, ErrNoError)
 			}
 		}
 		return metadataResponse
 	}
 	for _, topic := range metadataRequest.Topics {
 		for partition, brokerID := range mmr.leaders[topic] {
-			metadataResponse.AddTopicPartition(topic, partition, brokerID, nil, nil, ErrNoError)
+			metadataResponse.AddTopicPartition(topic, partition, brokerID, replicas, replicas, ErrNoError)
 		}
 	}
 	return metadataResponse