|
@@ -406,6 +406,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
Logger.Printf("producer/leader state change to [flushing] on %s/%d\n", topic, partition)
|
|
Logger.Printf("producer/leader state change to [flushing] on %s/%d\n", topic, partition)
|
|
|
if output == nil {
|
|
if output == nil {
|
|
|
if err := breaker.Run(doUpdate); err != nil {
|
|
if err := breaker.Run(doUpdate); err != nil {
|
|
|
|
|
+ Logger.Printf("producer/leader state change to [normal] after \"%s\" on %s/%d\n", err, topic, partition)
|
|
|
p.returnErrors(backlog, err)
|
|
p.returnErrors(backlog, err)
|
|
|
backlog = nil
|
|
backlog = nil
|
|
|
continue
|
|
continue
|