|
|
@@ -424,7 +424,7 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
|
|
|
var (
|
|
|
timer <-chan time.Time
|
|
|
buffer []*ProducerMessage
|
|
|
- doFlush chan []*ProducerMessage
|
|
|
+ flushTriggered chan []*ProducerMessage
|
|
|
bytesAccumulated int
|
|
|
defaultFlush bool
|
|
|
)
|
|
|
@@ -433,8 +433,8 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
|
|
|
defaultFlush = true
|
|
|
}
|
|
|
|
|
|
- flusher := make(chan []*ProducerMessage)
|
|
|
- go withRecover(func() { p.flusher(broker, flusher) })
|
|
|
+ output := make(chan []*ProducerMessage)
|
|
|
+ go withRecover(func() { p.flusher(broker, output) })
|
|
|
|
|
|
for {
|
|
|
select {
|
|
|
@@ -447,10 +447,10 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
|
|
|
(p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) ||
|
|
|
(p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) {
|
|
|
Logger.Println("producer/aggregator maximum request accumulated, forcing blocking flush")
|
|
|
- flusher <- buffer
|
|
|
- buffer = nil
|
|
|
- doFlush = nil
|
|
|
+ output <- buffer
|
|
|
timer = nil
|
|
|
+ buffer = nil
|
|
|
+ flushTriggered = nil
|
|
|
bytesAccumulated = 0
|
|
|
}
|
|
|
|
|
|
@@ -461,25 +461,25 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
|
|
|
msg.flags&chaser == chaser ||
|
|
|
(p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) ||
|
|
|
(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
|
|
|
- doFlush = flusher
|
|
|
+ flushTriggered = output
|
|
|
} else if p.conf.Producer.Flush.Frequency > 0 && timer == nil {
|
|
|
timer = time.After(p.conf.Producer.Flush.Frequency)
|
|
|
}
|
|
|
case <-timer:
|
|
|
- doFlush = flusher
|
|
|
- case doFlush <- buffer:
|
|
|
- buffer = nil
|
|
|
- doFlush = nil
|
|
|
+ flushTriggered = output
|
|
|
+ case flushTriggered <- buffer:
|
|
|
timer = nil
|
|
|
+ buffer = nil
|
|
|
+ flushTriggered = nil
|
|
|
bytesAccumulated = 0
|
|
|
}
|
|
|
}
|
|
|
|
|
|
shutdown:
|
|
|
if len(buffer) > 0 {
|
|
|
- flusher <- buffer
|
|
|
+ output <- buffer
|
|
|
}
|
|
|
- close(flusher)
|
|
|
+ close(output)
|
|
|
}
|
|
|
|
|
|
// one per broker
|