|
|
@@ -209,6 +209,7 @@ func (client *Client) refreshMetadata(topics []string, retries int) error {
|
|
|
}
|
|
|
|
|
|
for broker := client.any(); broker != nil; broker = client.any() {
|
|
|
+ Logger.Printf("Fetching metadata from broker %s\n", broker.addr)
|
|
|
response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics})
|
|
|
|
|
|
switch err {
|
|
|
@@ -224,6 +225,7 @@ func (client *Client) refreshMetadata(topics []string, retries int) error {
|
|
|
if retries <= 0 {
|
|
|
return LeaderNotAvailable
|
|
|
}
|
|
|
+ Logger.Printf("Failed to fetch metadata from broker %s, waiting %dms... (%d retries remaining)\n", broker.addr, client.config.WaitForElection/time.Millisecond, retries)
|
|
|
time.Sleep(client.config.WaitForElection) // wait for leader election
|
|
|
return client.refreshMetadata(retry, retries-1)
|
|
|
}
|
|
|
@@ -238,16 +240,18 @@ func (client *Client) refreshMetadata(topics []string, retries int) error {
|
|
|
}
|
|
|
|
|
|
if retries > 0 {
|
|
|
+ Logger.Printf("Out of available brokers. Resurrecting dead brokers after %dms... (%d retries remaining)\n", client.config.WaitForElection/time.Millisecond, retries)
|
|
|
time.Sleep(client.config.WaitForElection)
|
|
|
client.resurrectDeadBrokers()
|
|
|
return client.refreshMetadata(topics, retries-1)
|
|
|
+ } else {
|
|
|
+ Logger.Printf("Out of available brokers.\n")
|
|
|
}
|
|
|
|
|
|
return OutOfBrokers
|
|
|
}
|
|
|
|
|
|
func (client *Client) resurrectDeadBrokers() {
|
|
|
- Logger.Println("Ran out of connectable brokers. Retrying with brokers marked dead.")
|
|
|
client.lock.Lock()
|
|
|
defer client.lock.Unlock()
|
|
|
|