|
|
@@ -649,8 +649,9 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
|
|
|
|
|
|
switch err.(type) {
|
|
|
case nil:
|
|
|
+ allKnownMetaData := len(topics) == 0
|
|
|
// valid response, use it
|
|
|
- shouldRetry, err := client.updateMetadata(response)
|
|
|
+ shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
|
|
|
if shouldRetry {
|
|
|
Logger.Println("client/metadata found some partitions to be leaderless")
|
|
|
return retry(err) // note: err can be nil
|
|
|
@@ -674,7 +675,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int)
|
|
|
}
|
|
|
|
|
|
// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
|
|
|
-func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err error) {
|
|
|
+func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
|
|
|
client.lock.Lock()
|
|
|
defer client.lock.Unlock()
|
|
|
|
|
|
@@ -685,7 +686,10 @@ func (client *client) updateMetadata(data *MetadataResponse) (retry bool, err er
|
|
|
for _, broker := range data.Brokers {
|
|
|
client.registerBroker(broker)
|
|
|
}
|
|
|
-
|
|
|
+ if allKnownMetaData {
|
|
|
+ client.metadata = make(map[string]map[int32]*PartitionMetadata)
|
|
|
+ client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32)
|
|
|
+ }
|
|
|
for _, topic := range data.Topics {
|
|
|
delete(client.metadata, topic.Name)
|
|
|
delete(client.cachedPartitionsResults, topic.Name)
|