Pārlūkot izejas kodu

Adjust producer flushing semantics

If none of the three are set, flush as fast as possible, per normal. If *any* of
the three are set, *only* respect the ones that are set.
Evan Huus 10 gadi atpakaļ
vecāks
revīzija
3cd8fffe8a
1 mainītis faili ar 7 papildinājumiem un 1 dzēšanām
  1. 7 1
      async_producer.go

+ 7 - 1
async_producer.go

@@ -424,6 +424,11 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
 	var buffer []*ProducerMessage
 	var doFlush chan []*ProducerMessage
 	var bytesAccumulated int
+	var defaultFlush bool
+
+	if p.conf.Producer.Flush.Frequency == 0 && p.conf.Producer.Flush.Bytes == 0 && p.conf.Producer.Flush.Messages == 0 {
+		defaultFlush = true
+	}
 
 	flusher := make(chan []*ProducerMessage)
 	go withRecover(func() { p.flusher(broker, flusher) })
@@ -448,7 +453,8 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
 			buffer = append(buffer, msg)
 			bytesAccumulated += msg.byteSize()
 
-			if len(buffer) >= p.conf.Producer.Flush.Messages ||
+			if defaultFlush ||
+				(p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) ||
 				(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
 				doFlush = flusher
 			}