|
|
@@ -441,6 +441,14 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
|
toRetry[topic.Name] = true
|
|
|
delete(client.leaders[topic.Name], partition.ID)
|
|
|
case NoError:
|
|
|
+ broker := client.brokers[partition.Leader]
|
|
|
+ if _, present := client.deadBrokerAddrs[broker.Addr()]; present {
|
|
|
+ if connected, _ := broker.Connected(); !connected {
|
|
|
+ toRetry[topic.Name] = true
|
|
|
+ delete(client.leaders[topic.Name], partition.ID)
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ }
|
|
|
client.leaders[topic.Name][partition.ID] = partition.Leader
|
|
|
default:
|
|
|
return nil, partition.Err
|