|
|
@@ -342,10 +342,10 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
|
|
|
// new, higher, retry level; send off a chaser so that we know when everything "in between" has made it
|
|
|
// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
|
|
|
highWatermark = msg.retries
|
|
|
- Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
+ Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", topic, partition, highWatermark)
|
|
|
retryState[msg.retries].expectChaser = true
|
|
|
output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1}
|
|
|
- Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
+ Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", topic, partition, leader.ID())
|
|
|
p.unrefBrokerProducer(leader, output)
|
|
|
output = nil
|
|
|
time.Sleep(p.conf.Producer.Retry.Backoff)
|
|
|
@@ -363,17 +363,16 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
|
|
|
// this message is of the current retry level (msg.retries == highWatermark) and the chaser flag is set,
|
|
|
// meaning this retry level is done and we can go down (at least) one level and flush that
|
|
|
retryState[highWatermark].expectChaser = false
|
|
|
- Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
+ Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", topic, partition, highWatermark)
|
|
|
for {
|
|
|
highWatermark--
|
|
|
- Logger.Printf("producer/leader state change to [flushing-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
|
|
|
if output == nil {
|
|
|
if err := breaker.Run(doUpdate); err != nil {
|
|
|
p.returnErrors(retryState[highWatermark].buf, err)
|
|
|
goto flushDone
|
|
|
}
|
|
|
- Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
+ Logger.Printf("producer/leader/%s/%d selected broker %d\n", topic, partition, leader.ID())
|
|
|
}
|
|
|
|
|
|
for _, msg := range retryState[highWatermark].buf {
|
|
|
@@ -383,11 +382,11 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
|
|
|
flushDone:
|
|
|
retryState[highWatermark].buf = nil
|
|
|
if retryState[highWatermark].expectChaser {
|
|
|
- Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
+ Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", topic, partition, highWatermark)
|
|
|
break
|
|
|
} else {
|
|
|
- Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
if highWatermark == 0 {
|
|
|
+ Logger.Printf("producer/leader/%s/%d state change to [normal]\n", topic, partition)
|
|
|
break
|
|
|
}
|
|
|
}
|
|
|
@@ -406,7 +405,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
|
|
|
time.Sleep(p.conf.Producer.Retry.Backoff)
|
|
|
continue
|
|
|
}
|
|
|
- Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
+ Logger.Printf("producer/leader/%s/%d selected broker %d\n", topic, partition, leader.ID())
|
|
|
}
|
|
|
|
|
|
output <- msg
|
|
|
@@ -447,7 +446,7 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
|
|
|
if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
|
|
|
(p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) ||
|
|
|
(p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) {
|
|
|
- Logger.Println("producer/aggregator maximum request accumulated, forcing blocking flush")
|
|
|
+ Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", broker.ID())
|
|
|
output <- buffer
|
|
|
timer = nil
|
|
|
buffer = nil
|