|
|
@@ -243,6 +243,7 @@ func (p *Producer) topicDispatcher() {
|
|
|
}
|
|
|
|
|
|
if msg.flags&shutdown != 0 {
|
|
|
+ Logger.Println("Producer shutting down.")
|
|
|
break
|
|
|
}
|
|
|
|
|
|
@@ -265,8 +266,6 @@ func (p *Producer) topicDispatcher() {
|
|
|
handler <- msg
|
|
|
}
|
|
|
|
|
|
- Logger.Println("Producer shutting down.")
|
|
|
-
|
|
|
for _, handler := range handlers {
|
|
|
close(handler)
|
|
|
}
|
|
|
@@ -330,15 +329,15 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
if backlog == nil {
|
|
|
// on the very first retried message we send off a chaser so that we know when everything "in between" has made it
|
|
|
// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
|
|
|
+ Logger.Printf("producer/leader state change to [retrying] on %s/%d\n", topic, partition)
|
|
|
output <- &MessageToSend{Topic: topic, partition: partition, flags: chaser}
|
|
|
- Logger.Println("Producer dispatching retried messages to new leader.")
|
|
|
backlog = make([]*MessageToSend, 0)
|
|
|
p.unrefBrokerWorker(leader)
|
|
|
output = nil
|
|
|
}
|
|
|
} else {
|
|
|
// retry *and* chaser flag set, flush the backlog and return to normal processing
|
|
|
- Logger.Println("Producer finished dispatching retried messages, processing backlog.")
|
|
|
+ Logger.Printf("producer/leader state change to [flushing] on %s/%d\n", topic, partition)
|
|
|
if output == nil {
|
|
|
err := p.client.RefreshTopicMetadata(topic)
|
|
|
if err != nil {
|
|
|
@@ -360,7 +359,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
for _, msg := range backlog {
|
|
|
output <- msg
|
|
|
}
|
|
|
- Logger.Println("Producer backlog processsed.")
|
|
|
+ Logger.Printf("producer/leader state change to [normal] on %s/%d\n", topic, partition)
|
|
|
|
|
|
backlog = nil
|
|
|
continue
|
|
|
@@ -417,7 +416,7 @@ func (p *Producer) messageAggregator(broker *Broker, input chan *MessageToSend)
|
|
|
|
|
|
if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
|
|
|
(p.config.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.config.MaxMessageBytes) {
|
|
|
- Logger.Println("Producer accumulated maximum request size, forcing blocking flush.")
|
|
|
+ Logger.Println("producer/aggregator hit maximum request size, forcing blocking flush")
|
|
|
flusher <- buffer
|
|
|
buffer = nil
|
|
|
doFlush = nil
|
|
|
@@ -467,6 +466,8 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
|
|
|
if currentRetries[msg.Topic] != nil && currentRetries[msg.Topic][msg.partition] != nil {
|
|
|
if msg.flags&chaser == chaser {
|
|
|
// we can start processing this topic/partition again
|
|
|
+ Logger.Printf("producer/flusher state change to [normal] on %s/%d\n",
|
|
|
+ msg.Topic, msg.partition)
|
|
|
currentRetries[msg.Topic][msg.partition] = nil
|
|
|
}
|
|
|
p.retryMessages([]*MessageToSend{msg}, currentRetries[msg.Topic][msg.partition])
|
|
|
@@ -498,6 +499,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
|
|
|
continue
|
|
|
default:
|
|
|
p.client.disconnectBroker(broker)
|
|
|
+ Logger.Println("producer/flusher state change to [closing] because", err)
|
|
|
closing = err
|
|
|
p.retryMessages(batch, err)
|
|
|
continue
|
|
|
@@ -532,6 +534,8 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
|
|
|
p.returnSuccesses(msgs)
|
|
|
}
|
|
|
case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
|
|
|
+ Logger.Printf("producer/flusher state change to [retrying] on %s/%d because %v\n",
|
|
|
+ topic, partition, block.Err)
|
|
|
if currentRetries[topic] == nil {
|
|
|
currentRetries[topic] = make(map[int32]error)
|
|
|
}
|
|
|
@@ -696,7 +700,6 @@ func (p *Producer) returnSuccesses(batch []*MessageToSend) {
|
|
|
}
|
|
|
|
|
|
func (p *Producer) retryMessages(batch []*MessageToSend, err error) {
|
|
|
- Logger.Println("Producer requeueing batch of", len(batch), "messages due to error:", err)
|
|
|
for _, msg := range batch {
|
|
|
if msg == nil {
|
|
|
continue
|
|
|
@@ -708,7 +711,6 @@ func (p *Producer) retryMessages(batch []*MessageToSend, err error) {
|
|
|
p.retries <- msg
|
|
|
}
|
|
|
}
|
|
|
- Logger.Println("Messages requeued")
|
|
|
}
|
|
|
|
|
|
type brokerWorker struct {
|