فهرست منبع

Clean up aggregator via helper methods

Also get rid of forceFlushThreshold it does not need its own function
Evan Huus 10 سال پیش
والد
کامیت
765b3b4f7d
2فایلهای تغییر یافته به همراه68 افزوده شده و 46 حذف شده
  1. 64 42
      async_producer.go
  2. 4 4
      config.go

+ 64 - 42
async_producer.go

@@ -9,10 +9,6 @@ import (
 	"github.com/eapache/queue"
 	"github.com/eapache/queue"
 )
 )
 
 
-func forceFlushThreshold() int {
-	return int(MaxRequestSize - (10 * 1024)) // 10KiB is safety room for misc. overhead, we might want to calculate this more precisely?
-}
-
 // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
 // AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
 // to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
 // to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
 // and parses responses for errors. You must read from the Errors() channel or the
 // and parses responses for errors. You must read from the Errors() channel or the
@@ -491,6 +487,7 @@ func (pp *partitionProducer) updateLeader() error {
 	})
 	})
 }
 }
 
 
+// one per broker, constructs both an aggregator and a flusher
 func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
 func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
 	input := make(chan *ProducerMessage)
 	input := make(chan *ProducerMessage)
 	bridge := make(chan []*ProducerMessage)
 	bridge := make(chan []*ProducerMessage)
@@ -514,7 +511,6 @@ func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessag
 	return input
 	return input
 }
 }
 
 
-// one per broker
 // groups messages together into appropriately-sized batches for sending to the broker
 // groups messages together into appropriately-sized batches for sending to the broker
 // based on https://godoc.org/github.com/eapache/channels#BatchingChannel
 // based on https://godoc.org/github.com/eapache/channels#BatchingChannel
 type aggregator struct {
 type aggregator struct {
@@ -522,20 +518,14 @@ type aggregator struct {
 	broker *Broker
 	broker *Broker
 	input  <-chan *ProducerMessage
 	input  <-chan *ProducerMessage
 	output chan<- []*ProducerMessage
 	output chan<- []*ProducerMessage
+
+	buffer      []*ProducerMessage
+	bufferBytes int
+	timer       <-chan time.Time
 }
 }
 
 
 func (a *aggregator) run() {
 func (a *aggregator) run() {
-	var (
-		timer            <-chan time.Time
-		buffer           []*ProducerMessage
-		flushTriggered   chan<- []*ProducerMessage
-		bytesAccumulated int
-		defaultFlush     bool
-	)
-
-	if a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0 {
-		defaultFlush = true
-	}
+	var output chan<- []*ProducerMessage
 
 
 	for {
 	for {
 		select {
 		select {
@@ -544,45 +534,77 @@ func (a *aggregator) run() {
 				goto shutdown
 				goto shutdown
 			}
 			}
 
 
-			if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
-				(a.parent.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes) ||
-				(a.parent.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.MaxMessages) {
+			if a.wouldOverflow(msg) {
 				Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID())
 				Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID())
-				a.output <- buffer
-				timer = nil
-				buffer = nil
-				flushTriggered = nil
-				bytesAccumulated = 0
+				a.output <- a.buffer
+				a.reset()
+				output = nil
 			}
 			}
 
 
-			buffer = append(buffer, msg)
-			bytesAccumulated += msg.byteSize()
+			a.buffer = append(a.buffer, msg)
+			a.bufferBytes += msg.byteSize()
 
 
-			if defaultFlush ||
-				msg.flags&chaser == chaser ||
-				(a.parent.conf.Producer.Flush.Messages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.Messages) ||
-				(a.parent.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= a.parent.conf.Producer.Flush.Bytes) {
-				flushTriggered = a.output
-			} else if a.parent.conf.Producer.Flush.Frequency > 0 && timer == nil {
-				timer = time.After(a.parent.conf.Producer.Flush.Frequency)
+			if a.readyToFlush(msg) {
+				output = a.output
+			} else if a.parent.conf.Producer.Flush.Frequency > 0 && a.timer == nil {
+				a.timer = time.After(a.parent.conf.Producer.Flush.Frequency)
 			}
 			}
-		case <-timer:
-			flushTriggered = a.output
-		case flushTriggered <- buffer:
-			timer = nil
-			buffer = nil
-			flushTriggered = nil
-			bytesAccumulated = 0
+		case <-a.timer:
+			output = a.output
+		case output <- a.buffer:
+			a.reset()
+			output = nil
 		}
 		}
 	}
 	}
 
 
 shutdown:
 shutdown:
-	if len(buffer) > 0 {
-		a.output <- buffer
+	if len(a.buffer) > 0 {
+		a.output <- a.buffer
 	}
 	}
 	close(a.output)
 	close(a.output)
 }
 }
 
 
+func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool {
+	switch {
+	// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
+	case a.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)):
+		return true
+	// Would we overflow the size-limit of a compressed message-batch?
+	case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes:
+		return true
+	// Would we overflow simply in number of messages?
+	case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages:
+		return true
+	default:
+		return false
+	}
+}
+
+func (a *aggregator) readyToFlush(msg *ProducerMessage) bool {
+	switch {
+	// If all three config values are 0, we always flush as-fast-as-possible
+	case a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0:
+		return true
+	// If the messages is a chaser we must flush to maintain the state-machine
+	case msg.flags&chaser == chaser:
+		return true
+	// If we've  passed the message trigger-point
+	case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages:
+		return true
+	// If we've  passed the byte trigger-point
+	case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes:
+		return true
+	default:
+		return false
+	}
+}
+
+func (a *aggregator) reset() {
+	a.timer = nil
+	a.buffer = nil
+	a.bufferBytes = 0
+}
+
 // takes a batch at a time from the aggregator and sends to the broker
 // takes a batch at a time from the aggregator and sends to the broker
 type flusher struct {
 type flusher struct {
 	parent *asyncProducer
 	parent *asyncProducer

+ 4 - 4
config.go

@@ -180,11 +180,11 @@ func (c *Config) Validate() error {
 	if c.Producer.RequiredAcks > 1 {
 	if c.Producer.RequiredAcks > 1 {
 		Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
 		Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
 	}
 	}
-	if c.Producer.MaxMessageBytes >= forceFlushThreshold() {
-		Logger.Println("Producer.MaxMessageBytes is too close to MaxRequestSize; it will be ignored.")
+	if c.Producer.MaxMessageBytes >= int(MaxRequestSize) {
+		Logger.Println("Producer.MaxMessageBytes is larger than MaxRequestSize; it will be ignored.")
 	}
 	}
-	if c.Producer.Flush.Bytes >= forceFlushThreshold() {
-		Logger.Println("Producer.Flush.Bytes is too close to MaxRequestSize; it will be ignored.")
+	if c.Producer.Flush.Bytes >= int(MaxRequestSize) {
+		Logger.Println("Producer.Flush.Bytes is larger than MaxRequestSize; it will be ignored.")
 	}
 	}
 	if c.Producer.Timeout%time.Millisecond != 0 {
 	if c.Producer.Timeout%time.Millisecond != 0 {
 		Logger.Println("Producer.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
 		Logger.Println("Producer.Timeout only supports millisecond resolution; nanoseconds will be truncated.")