|
|
@@ -510,8 +510,8 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
|
|
|
if currentRetries[msg.Topic] != nil && currentRetries[msg.Topic][msg.partition] != nil {
|
|
|
if msg.flags&chaser == chaser {
|
|
|
// we can start processing this topic/partition again
|
|
|
- Logger.Printf("producer/flusher state change to [normal] on %s/%d\n",
|
|
|
- msg.Topic, msg.partition)
|
|
|
+ Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
|
|
|
+ broker.ID(), msg.Topic, msg.partition)
|
|
|
currentRetries[msg.Topic][msg.partition] = nil
|
|
|
}
|
|
|
p.retryMessages([]*MessageToSend{msg}, currentRetries[msg.Topic][msg.partition])
|
|
|
@@ -543,7 +543,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
|
|
|
continue
|
|
|
default:
|
|
|
p.client.disconnectBroker(broker)
|
|
|
- Logger.Println("producer/flusher state change to [closing] because", err)
|
|
|
+ Logger.Println("producer/flusher/%d state change to [closing] because", broker.ID(), err)
|
|
|
closing = err
|
|
|
p.retryMessages(batch, err)
|
|
|
continue
|
|
|
@@ -578,8 +578,8 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
|
|
|
p.returnSuccesses(msgs)
|
|
|
}
|
|
|
case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
|
|
|
- Logger.Printf("producer/flusher state change to [retrying] on %s/%d because %v\n",
|
|
|
- topic, partition, block.Err)
|
|
|
+ Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
|
|
|
+ broker.ID(), topic, partition, block.Err)
|
|
|
if currentRetries[topic] == nil {
|
|
|
currentRetries[topic] = make(map[int32]error)
|
|
|
}
|
|
|
@@ -591,6 +591,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ Logger.Println("producer/flusher/%d shut down", broker.ID())
|
|
|
p.retries <- &MessageToSend{flags: unref}
|
|
|
}
|
|
|
|