|
|
@@ -226,7 +226,7 @@ func (p *Producer) topicDispatcher() {
|
|
|
|
|
|
for msg := range p.input {
|
|
|
if msg == nil {
|
|
|
- Logger.Printf("somebody sent a nil message to the producer, it was ignored")
|
|
|
+ Logger.Println("Something tried to send a nil message, it was ignored.")
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -252,6 +252,8 @@ func (p *Producer) topicDispatcher() {
|
|
|
handler <- msg
|
|
|
}
|
|
|
|
|
|
+ Logger.Println("Producer shutting down.")
|
|
|
+
|
|
|
for _, handler := range handlers {
|
|
|
close(handler)
|
|
|
}
|
|
|
@@ -311,12 +313,14 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
// on the very first retried message we send off a chaser so that we know when everything "in between" has made it
|
|
|
// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
|
|
|
output <- &MessageToSend{Topic: topic, partition: partition, broker: leader, flags: chaser}
|
|
|
+ Logger.Println("Producer dispatching retried messages to new leader.")
|
|
|
backlog = make([]*MessageToSend, 0)
|
|
|
p.unrefBrokerWorker(leader)
|
|
|
output = nil
|
|
|
}
|
|
|
} else {
|
|
|
// retry *and* chaser flag set, flush the backlog and return to normal processing
|
|
|
+ Logger.Println("Producer finished dispatching retried messages, processing backlog.")
|
|
|
if output == nil {
|
|
|
err := p.client.RefreshTopicMetadata(topic)
|
|
|
if err != nil {
|
|
|
@@ -339,6 +343,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
msg.broker = leader
|
|
|
output <- msg
|
|
|
}
|
|
|
+ Logger.Println("Producer backlog processsed.")
|
|
|
|
|
|
backlog = nil
|
|
|
continue
|
|
|
@@ -395,6 +400,7 @@ func (p *Producer) messageAggregator(broker *Broker, input chan *MessageToSend)
|
|
|
|
|
|
if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
|
|
|
(p.config.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.config.MaxMessageBytes) {
|
|
|
+ Logger.Println("Producer accumulated maximum request size, forcing blocking flush.")
|
|
|
flusher <- buffer
|
|
|
buffer = nil
|
|
|
doFlush = nil
|