|
|
@@ -395,6 +395,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
// on the very first retried message we send off a chaser so that we know when everything "in between" has made it
|
|
|
// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
|
|
|
Logger.Printf("producer/leader state change to [retrying] on %s/%d\n", topic, partition)
|
|
|
+ Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
output <- &MessageToSend{Topic: topic, partition: partition, flags: chaser}
|
|
|
backlog = make([]*MessageToSend, 0)
|
|
|
p.unrefBrokerWorker(leader)
|
|
|
@@ -411,6 +412,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
backlog = nil
|
|
|
continue
|
|
|
}
|
|
|
+ Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
}
|
|
|
|
|
|
for _, msg := range backlog {
|
|
|
@@ -427,6 +429,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
p.returnError(msg, err)
|
|
|
continue
|
|
|
}
|
|
|
+ Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
}
|
|
|
|
|
|
output <- msg
|
|
|
@@ -502,6 +505,7 @@ shutdown:
|
|
|
func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
|
|
|
var closing error
|
|
|
currentRetries := make(map[string]map[int32]error)
|
|
|
+ Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
|
|
|
|
|
|
for batch := range input {
|
|
|
if closing != nil {
|