|
|
@@ -585,6 +585,8 @@ func (bp *brokerProducer) run() {
|
|
|
}
|
|
|
|
|
|
if msg.flags&syn == syn {
|
|
|
+ Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
|
|
|
+ bp.broker.ID(), msg.Topic, msg.Partition)
|
|
|
if bp.currentRetries[msg.Topic] == nil {
|
|
|
bp.currentRetries[msg.Topic] = make(map[int32]error)
|
|
|
}
|
|
|
@@ -599,7 +601,7 @@ func (bp *brokerProducer) run() {
|
|
|
if bp.closing == nil && msg.flags&fin == fin {
|
|
|
// we were retrying this partition but we can start processing again
|
|
|
delete(bp.currentRetries[msg.Topic], msg.Partition)
|
|
|
- Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n",
|
|
|
+ Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
|
|
|
bp.broker.ID(), msg.Topic, msg.Partition)
|
|
|
}
|
|
|
|