Disconnect the broker when it returns us certain errors
@@ -339,6 +339,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
errorCb(err)
return false
default:
+ p.client.disconnectBroker(bp.broker)
overlimit := 0
prb.reverseEach(func(msg *produceMessage) {
if err := msg.reenqueue(p); err != nil {