Don't retry messages until the broker is closed
@@ -536,9 +536,9 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
default:
Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
p.abandonBrokerConnection(broker)
- p.retryMessages(batch, err)
_ = broker.Close()
closing = err
+ p.retryMessages(batch, err)
continue
}