|
|
@@ -322,14 +322,9 @@ func (client *client) GetOffset(topic string, partitionID int32, time int64) (in
|
|
|
|
|
|
// private broker management helpers
|
|
|
|
|
|
-// XXX: see https://github.com/Shopify/sarama/issues/15
|
|
|
-// and https://github.com/Shopify/sarama/issues/23
|
|
|
-// disconnectBroker is a bad hacky way to accomplish broker management. It should be replaced with
|
|
|
-// something sane and the replacement should be made part of the public Client API
|
|
|
func (client *client) disconnectBroker(broker *Broker) {
|
|
|
client.lock.Lock()
|
|
|
defer client.lock.Unlock()
|
|
|
- Logger.Printf("Disconnecting Broker %d\n", broker.ID())
|
|
|
|
|
|
client.deadBrokerAddrs[broker.addr] = none{}
|
|
|
|
|
|
@@ -342,12 +337,12 @@ func (client *client) disconnectBroker(broker *Broker) {
|
|
|
client.seedBroker = nil
|
|
|
}
|
|
|
} else {
|
|
|
- // we don't need to update the leaders hash, it will automatically get refreshed next time because
|
|
|
- // the broker lookup will return nil
|
|
|
+ // we do this so that our loop in `tryRefreshMetadata` doesn't go on forever,
|
|
|
+ // but we really shouldn't have to; once that loop is made better this case can be
|
|
|
+ // removed, and the function generally can be renamed from `disconnectBroker` to
|
|
|
+ // `nextSeedBroker` or something
|
|
|
delete(client.brokers, broker.ID())
|
|
|
}
|
|
|
-
|
|
|
- safeAsyncClose(broker)
|
|
|
}
|
|
|
|
|
|
func (client *client) resurrectDeadBrokers() {
|
|
|
@@ -517,6 +512,7 @@ func (client *client) tryRefreshMetadata(topics []string, retriesRemaining int)
|
|
|
default:
|
|
|
// some other error, remove that broker and try again
|
|
|
Logger.Println("Error from broker while fetching metadata:", err)
|
|
|
+ _ = broker.Close()
|
|
|
client.disconnectBroker(broker)
|
|
|
}
|
|
|
}
|