Otherwise we pull it out from underneath other producers/consumers who might be talking to it.
@@ -238,7 +238,9 @@ func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
shutdown:
delete(p.brokerProducers, bp.broker)
bp.flushIfAnyMessages(p)
- p.client.disconnectBroker(bp.broker)
+ if shutdownRequired {
+ p.client.disconnectBroker(bp.broker)
+ }
close(bp.flushNow)
close(bp.hasMessages)
close(bp.done)