|
@@ -242,7 +242,6 @@ func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
|
|
|
if shutdownRequired = bp.flush(p); shutdownRequired {
|
|
|
goto shutdown
|
|
|
}
|
|
|
- bp.flush(p)
|
|
|
case <-timer.C:
|
|
|
if shutdownRequired = bp.flushIfAnyMessages(p); shutdownRequired {
|
|
|
goto shutdown
|
|
@@ -314,9 +313,9 @@ func (bp *brokerProducer) flush(p *Producer) (shutdownRequired bool) {
|
|
|
bp.mapM.Lock()
|
|
|
for tp, messages := range bp.messages {
|
|
|
if len(messages) > 0 && p.tryAcquireDeliveryLock(tp) {
|
|
|
- defer p.releaseDeliveryLock(tp)
|
|
|
prb = append(prb, messages...)
|
|
|
delete(bp.messages, tp)
|
|
|
+ p.releaseDeliveryLock(tp)
|
|
|
}
|
|
|
}
|
|
|
bp.mapM.Unlock()
|