|
|
@@ -332,7 +332,7 @@ func (client *Client) Closed() bool {
|
|
|
return client.brokers == nil
|
|
|
}
|
|
|
|
|
|
-func (client *Client) refreshMetadata(topics []string, retries int) error {
|
|
|
+func (client *Client) refreshMetadata(topics []string, retriesRemaining int) error {
|
|
|
// This function is a sort of central point for most functions that create new
|
|
|
// resources. Check to see if we're dealing with a closed Client and error
|
|
|
// out immediately if so.
|
|
|
@@ -357,36 +357,34 @@ func (client *Client) refreshMetadata(topics []string, retries int) error {
|
|
|
case nil:
|
|
|
// valid response, use it
|
|
|
retry, err := client.update(response)
|
|
|
- switch {
|
|
|
- case err != nil:
|
|
|
- return err
|
|
|
- case len(retry) == 0:
|
|
|
- return nil
|
|
|
- default:
|
|
|
- if retries <= 0 {
|
|
|
- return LeaderNotAvailable
|
|
|
+
|
|
|
+ if len(retry) > 0 {
|
|
|
+ if retriesRemaining <= 0 {
|
|
|
+ return nil
|
|
|
}
|
|
|
- Logger.Printf("Failed to fetch metadata from broker %s, waiting %dms... (%d retries remaining)\n", broker.addr, client.config.WaitForElection/time.Millisecond, retries)
|
|
|
+ Logger.Printf("Some partitions are leaderless, waiting %dms for election... (%d retries remaining)\n", client.config.WaitForElection/time.Millisecond, retriesRemaining)
|
|
|
time.Sleep(client.config.WaitForElection) // wait for leader election
|
|
|
- return client.refreshMetadata(retry, retries-1)
|
|
|
+ return client.refreshMetadata(retry, retriesRemaining-1)
|
|
|
}
|
|
|
+
|
|
|
+ return err
|
|
|
case EncodingError:
|
|
|
// didn't even send, return the error
|
|
|
return err
|
|
|
+ default:
|
|
|
+ // some other error, remove that broker and try again
|
|
|
+ Logger.Println("Error from broker while fetching metadata:", err)
|
|
|
+ client.disconnectBroker(broker)
|
|
|
}
|
|
|
-
|
|
|
- // some other error, remove that broker and try again
|
|
|
- Logger.Println("Unexpected error from GetMetadata, closing broker:", err)
|
|
|
- client.disconnectBroker(broker)
|
|
|
}
|
|
|
|
|
|
- if retries > 0 {
|
|
|
- Logger.Printf("Out of available brokers. Resurrecting dead brokers after %dms... (%d retries remaining)\n", client.config.WaitForElection/time.Millisecond, retries)
|
|
|
+ Logger.Println("Out of available brokers.")
|
|
|
+
|
|
|
+ if retriesRemaining > 0 {
|
|
|
+ Logger.Printf("Resurrecting dead brokers after %dms... (%d retries remaining)\n", client.config.WaitForElection/time.Millisecond, retriesRemaining)
|
|
|
time.Sleep(client.config.WaitForElection)
|
|
|
client.resurrectDeadBrokers()
|
|
|
- return client.refreshMetadata(topics, retries-1)
|
|
|
- } else {
|
|
|
- Logger.Printf("Out of available brokers.\n")
|
|
|
+ return client.refreshMetadata(topics, retriesRemaining-1)
|
|
|
}
|
|
|
|
|
|
return OutOfBrokers
|