|
|
@@ -477,6 +477,7 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
|
|
|
|
toRetry := make(map[string]bool)
|
|
|
|
|
|
+ var err error
|
|
|
for _, topic := range data.Topics {
|
|
|
switch topic.Err {
|
|
|
case NoError:
|
|
|
@@ -484,14 +485,11 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
|
case LeaderNotAvailable:
|
|
|
toRetry[topic.Name] = true
|
|
|
default:
|
|
|
- return nil, topic.Err
|
|
|
+ err = topic.Err
|
|
|
}
|
|
|
client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
|
|
|
for _, partition := range topic.Partitions {
|
|
|
switch partition.Err {
|
|
|
- case LeaderNotAvailable:
|
|
|
- toRetry[topic.Name] = true
|
|
|
- delete(client.metadata[topic.Name], partition.ID)
|
|
|
case NoError:
|
|
|
broker := client.brokers[partition.Leader]
|
|
|
if _, present := client.deadBrokerAddrs[broker.Addr()]; present {
|
|
|
@@ -502,12 +500,19 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
|
}
|
|
|
}
|
|
|
client.metadata[topic.Name][partition.ID] = partition
|
|
|
+ case LeaderNotAvailable:
|
|
|
+ toRetry[topic.Name] = true
|
|
|
+ delete(client.metadata[topic.Name], partition.ID)
|
|
|
default:
|
|
|
- return nil, partition.Err
|
|
|
+ err = partition.Err
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+
|
|
|
ret := make([]string, 0, len(toRetry))
|
|
|
for topic := range toRetry {
|
|
|
ret = append(ret, topic)
|