Quellcode durchsuchen

Merge pull request #360 from Shopify/nicer-producer-semantics

Adjust producer flushing semantics
Evan Huus vor 10 Jahren
Ursprung
Commit
2320623e85
1 geänderte Dateien mit 7 neuen und 1 gelöschten Zeilen
  1. 7 1
      async_producer.go

+ 7 - 1
async_producer.go

@@ -424,6 +424,11 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
 	var buffer []*ProducerMessage
 	var doFlush chan []*ProducerMessage
 	var bytesAccumulated int
+	var defaultFlush bool
+
+	if p.conf.Producer.Flush.Frequency == 0 && p.conf.Producer.Flush.Bytes == 0 && p.conf.Producer.Flush.Messages == 0 {
+		defaultFlush = true
+	}
 
 	flusher := make(chan []*ProducerMessage)
 	go withRecover(func() { p.flusher(broker, flusher) })
@@ -448,7 +453,8 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
 			buffer = append(buffer, msg)
 			bytesAccumulated += msg.byteSize()
 
-			if len(buffer) >= p.conf.Producer.Flush.Messages ||
+			if defaultFlush ||
+				(p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) ||
 				(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
 				doFlush = flusher
 			}