瀏覽代碼

fix producer topic metadata on-demand fetch when topic error happens

Andy Xie 7 年之前
父節點
當前提交
ce245e83e7
共有 1 個文件被更改,包括 31 次插入5 次删除
  1. 31 5
      client.go

+ 31 - 5
client.go

@@ -100,10 +100,11 @@ type client struct {
 	seedBrokers []*Broker
 	deadSeeds   []*Broker
 
-	controllerID int32                                   // cluster controller broker id
-	brokers      map[int32]*Broker                       // maps broker ids to brokers
-	metadata     map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
-	coordinators map[string]int32                        // Maps consumer group names to coordinating broker IDs
+	controllerID   int32                                   // cluster controller broker id
+	brokers        map[int32]*Broker                       // maps broker ids to brokers
+	metadata       map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
+	metadataTopics map[string]none                         // topics that need to collect metadata
+	coordinators   map[string]int32                        // Maps consumer group names to coordinating broker IDs
 
 	// If the number of partitions is large, we can get some churn calling cachedPartitions,
 	// so the result is cached.  It is important to update this value whenever metadata is changed
@@ -136,6 +137,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
 		closed:                  make(chan none),
 		brokers:                 make(map[int32]*Broker),
 		metadata:                make(map[string]map[int32]*PartitionMetadata),
+		metadataTopics:          make(map[string]none),
 		cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
 		coordinators:            make(map[string]int32),
 	}
@@ -207,6 +209,7 @@ func (client *client) Close() error {
 
 	client.brokers = nil
 	client.metadata = nil
+	client.metadataTopics = nil
 
 	return nil
 }
@@ -231,6 +234,22 @@ func (client *client) Topics() ([]string, error) {
 	return ret, nil
 }
 
+func (client *client) MetadataTopics() ([]string, error) {
+	if client.Closed() {
+		return nil, ErrClosedClient
+	}
+
+	client.lock.RLock()
+	defer client.lock.RUnlock()
+
+	ret := make([]string, 0, len(client.metadataTopics))
+	for topic := range client.metadataTopics {
+		ret = append(ret, topic)
+	}
+
+	return ret, nil
+}
+
 func (client *client) Partitions(topic string) ([]int32, error) {
 	if client.Closed() {
 		return nil, ErrClosedClient
@@ -645,7 +664,7 @@ func (client *client) refreshMetadata() error {
 	topics := []string{}
 
 	if !client.conf.Metadata.Full {
-		if specificTopics, err := client.Topics(); err != nil {
+		if specificTopics, err := client.MetadataTopics(); err != nil {
 			return err
 		} else if len(specificTopics) == 0 {
 			return ErrNoTopicsToUpdateMetadata
@@ -728,9 +747,16 @@ func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bo
 
 	if allKnownMetaData {
 		client.metadata = make(map[string]map[int32]*PartitionMetadata)
+		client.metadataTopics = make(map[string]none)
 		client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32)
 	}
 	for _, topic := range data.Topics {
+		// topics must be added firstly to `metadataTopics` to guarantee that all
+		// requested topics must be recorded to keep them trackable for periodically
+		// metadata refresh.
+		if _, exists := client.metadataTopics[topic.Name]; !exists {
+			client.metadataTopics[topic.Name] = none{}
+		}
 		delete(client.metadata, topic.Name)
 		delete(client.cachedPartitionsResults, topic.Name)