|
|
@@ -421,24 +421,20 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
|
|
|
// groups messages together into appropriately-sized batches for sending to the broker
|
|
|
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
|
|
|
func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
|
|
|
- var ticker *time.Ticker
|
|
|
- var timer <-chan time.Time
|
|
|
- if p.conf.Producer.Flush.Frequency > 0 {
|
|
|
- ticker = time.NewTicker(p.conf.Producer.Flush.Frequency)
|
|
|
- timer = ticker.C
|
|
|
- }
|
|
|
-
|
|
|
- var buffer []*ProducerMessage
|
|
|
- var doFlush chan []*ProducerMessage
|
|
|
- var bytesAccumulated int
|
|
|
- var defaultFlush bool
|
|
|
+ var (
|
|
|
+ timer <-chan time.Time
|
|
|
+ buffer []*ProducerMessage
|
|
|
+ flushTriggered chan []*ProducerMessage
|
|
|
+ bytesAccumulated int
|
|
|
+ defaultFlush bool
|
|
|
+ )
|
|
|
|
|
|
if p.conf.Producer.Flush.Frequency == 0 && p.conf.Producer.Flush.Bytes == 0 && p.conf.Producer.Flush.Messages == 0 {
|
|
|
defaultFlush = true
|
|
|
}
|
|
|
|
|
|
- flusher := make(chan []*ProducerMessage)
|
|
|
- go withRecover(func() { p.flusher(broker, flusher) })
|
|
|
+ output := make(chan []*ProducerMessage)
|
|
|
+ go withRecover(func() { p.flusher(broker, output) })
|
|
|
|
|
|
for {
|
|
|
select {
|
|
|
@@ -451,9 +447,10 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
|
|
|
(p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) ||
|
|
|
(p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) {
|
|
|
Logger.Println("producer/aggregator maximum request accumulated, forcing blocking flush")
|
|
|
- flusher <- buffer
|
|
|
+ output <- buffer
|
|
|
+ timer = nil
|
|
|
buffer = nil
|
|
|
- doFlush = nil
|
|
|
+ flushTriggered = nil
|
|
|
bytesAccumulated = 0
|
|
|
}
|
|
|
|
|
|
@@ -464,25 +461,25 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
|
|
|
msg.flags&chaser == chaser ||
|
|
|
(p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) ||
|
|
|
(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
|
|
|
- doFlush = flusher
|
|
|
+ flushTriggered = output
|
|
|
+ } else if p.conf.Producer.Flush.Frequency > 0 && timer == nil {
|
|
|
+ timer = time.After(p.conf.Producer.Flush.Frequency)
|
|
|
}
|
|
|
case <-timer:
|
|
|
- doFlush = flusher
|
|
|
- case doFlush <- buffer:
|
|
|
+ flushTriggered = output
|
|
|
+ case flushTriggered <- buffer:
|
|
|
+ timer = nil
|
|
|
buffer = nil
|
|
|
- doFlush = nil
|
|
|
+ flushTriggered = nil
|
|
|
bytesAccumulated = 0
|
|
|
}
|
|
|
}
|
|
|
|
|
|
shutdown:
|
|
|
- if ticker != nil {
|
|
|
- ticker.Stop()
|
|
|
- }
|
|
|
if len(buffer) > 0 {
|
|
|
- flusher <- buffer
|
|
|
+ output <- buffer
|
|
|
}
|
|
|
- close(flusher)
|
|
|
+ close(output)
|
|
|
}
|
|
|
|
|
|
// one per broker
|