|
@@ -590,7 +590,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
|
|
|
}
|
|
}
|
|
|
p.returnSuccesses(msgs)
|
|
p.returnSuccesses(msgs)
|
|
|
}
|
|
}
|
|
|
- case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
|
|
|
|
|
|
|
+ case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable, RequestTimedOut:
|
|
|
Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
|
|
Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
|
|
|
broker.ID(), topic, partition, block.Err)
|
|
broker.ID(), topic, partition, block.Err)
|
|
|
if currentRetries[topic] == nil {
|
|
if currentRetries[topic] == nil {
|