Browse Source

producer: make Flush.Frequency behaviour better

Previously the timer would tick every Flush.Frequency regardless of if a message
was actually queued. This meant that if no message arrived for Flush.Frequency
then the next message would be sent immediately, which is unexpected if not
*exactly* wrong.

With this change the timer is only started when the first message arrives, and
is cleared when a flush occurs. This should result in slightly better batching
for low-volume topics at the result of slightly higher latency (although the
delay will still never be more than Flush.Frequency).
Evan Huus 10 years ago
parent
commit
f86d40d963
1 changed files with 11 additions and 14 deletions
  1. 11 14
      async_producer.go

+ 11 - 14
async_producer.go

@@ -421,17 +421,13 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
 // groups messages together into appropriately-sized batches for sending to the broker
 // groups messages together into appropriately-sized batches for sending to the broker
 // based on https://godoc.org/github.com/eapache/channels#BatchingChannel
 // based on https://godoc.org/github.com/eapache/channels#BatchingChannel
 func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
 func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
-	var ticker *time.Ticker
-	var timer <-chan time.Time
-	if p.conf.Producer.Flush.Frequency > 0 {
-		ticker = time.NewTicker(p.conf.Producer.Flush.Frequency)
-		timer = ticker.C
-	}
-
-	var buffer []*ProducerMessage
-	var doFlush chan []*ProducerMessage
-	var bytesAccumulated int
-	var defaultFlush bool
+	var (
+		timer            <-chan time.Time
+		buffer           []*ProducerMessage
+		doFlush          chan []*ProducerMessage
+		bytesAccumulated int
+		defaultFlush     bool
+	)
 
 
 	if p.conf.Producer.Flush.Frequency == 0 && p.conf.Producer.Flush.Bytes == 0 && p.conf.Producer.Flush.Messages == 0 {
 	if p.conf.Producer.Flush.Frequency == 0 && p.conf.Producer.Flush.Bytes == 0 && p.conf.Producer.Flush.Messages == 0 {
 		defaultFlush = true
 		defaultFlush = true
@@ -454,6 +450,7 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
 				flusher <- buffer
 				flusher <- buffer
 				buffer = nil
 				buffer = nil
 				doFlush = nil
 				doFlush = nil
+				timer = nil
 				bytesAccumulated = 0
 				bytesAccumulated = 0
 			}
 			}
 
 
@@ -465,20 +462,20 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
 				(p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) ||
 				(p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) ||
 				(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
 				(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
 				doFlush = flusher
 				doFlush = flusher
+			} else if p.conf.Producer.Flush.Frequency > 0 && timer == nil {
+				timer = time.After(p.conf.Producer.Flush.Frequency)
 			}
 			}
 		case <-timer:
 		case <-timer:
 			doFlush = flusher
 			doFlush = flusher
 		case doFlush <- buffer:
 		case doFlush <- buffer:
 			buffer = nil
 			buffer = nil
 			doFlush = nil
 			doFlush = nil
+			timer = nil
 			bytesAccumulated = 0
 			bytesAccumulated = 0
 		}
 		}
 	}
 	}
 
 
 shutdown:
 shutdown:
-	if ticker != nil {
-		ticker.Stop()
-	}
 	if len(buffer) > 0 {
 	if len(buffer) > 0 {
 		flusher <- buffer
 		flusher <- buffer
 	}
 	}