浏览代码

If we can't connect, the leader is not available

If kafka gave us a broker we can't talk to (which we suspected because it was
already dead), then mark the partition as LeaderNotAvailable so that subsequent
calls to Leader don't return it unnecessarily
Evan Huus 10 年之前
父节点
当前提交
21fb992498
共有 1 个文件被更改,包括 1 次插入0 次删除
  1. 1 0
      client.go

+ 1 - 0
client.go

@@ -549,6 +549,7 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 				broker := client.brokers[partition.Leader]
 				broker := client.brokers[partition.Leader]
 				if _, present := client.deadBrokerAddrs[broker.Addr()]; present {
 				if _, present := client.deadBrokerAddrs[broker.Addr()]; present {
 					if connected, _ := broker.Connected(); !connected {
 					if connected, _ := broker.Connected(); !connected {
+						partition.Err = LeaderNotAvailable
 						toRetry[topic.Name] = true
 						toRetry[topic.Name] = true
 					}
 					}
 				}
 				}