Browse Source

Refactor the producer, part 2

Second stand-alone chunk extracted from #544, (first chunk: #549). This uses the
`produceSet` struct in the aggregator as well, and moves the `wouldOverflow` and
`readyToFlush` methods to methods on the `produceSet`.

Knock-on effects:
 - now that we do per-partition size tracking in the aggregator we can do much
   more precise overflow checking (see the compressed-message-batch-size-limit
   case in `wouldOverflow` which has changed) which will be more efficient in
   high-volume scenarios
 - since the produceSet encodes immediately, messages which fail to encode are
   now rejected from the aggregator and don't count towards batch size
 - we still have to iterate the messages in the flusher in order to reject those
   which need retrying due to the state machine; for simplicity I add them to a
   second produceSet still, which means all messages get encoded twice; this is
   a definite major performance regression which will go away again in part 3
   of this refactor
Evan Huus 9 years ago
parent
commit
733c74f30d
2 changed files with 80 additions and 70 deletions
  1. 78 68
      async_producer.go
  2. 2 2
      async_producer_test.go

+ 78 - 68
async_producer.go

@@ -508,13 +508,14 @@ func (pp *partitionProducer) updateLeader() error {
 // one per broker, constructs both an aggregator and a flusher
 func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
 	input := make(chan *ProducerMessage)
-	bridge := make(chan []*ProducerMessage)
+	bridge := make(chan *produceSet)
 
 	a := &aggregator{
 		parent: p,
 		broker: broker,
 		input:  input,
 		output: bridge,
+		buffer: newProduceSet(p),
 	}
 	go withRecover(a.run)
 
@@ -535,15 +536,14 @@ type aggregator struct {
 	parent *asyncProducer
 	broker *Broker
 	input  <-chan *ProducerMessage
-	output chan<- []*ProducerMessage
+	output chan<- *produceSet
 
-	buffer      []*ProducerMessage
-	bufferBytes int
-	timer       <-chan time.Time
+	buffer *produceSet
+	timer  <-chan time.Time
 }
 
 func (a *aggregator) run() {
-	var output chan<- []*ProducerMessage
+	var output chan<- *produceSet
 
 	for {
 		select {
@@ -552,17 +552,19 @@ func (a *aggregator) run() {
 				goto shutdown
 			}
 
-			if a.wouldOverflow(msg) {
+			if a.buffer.wouldOverflow(msg) {
 				Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID())
 				a.output <- a.buffer
 				a.reset()
 				output = nil
 			}
 
-			a.buffer = append(a.buffer, msg)
-			a.bufferBytes += msg.byteSize()
+			if err := a.buffer.add(msg); err != nil {
+				a.parent.returnError(msg, err)
+				continue
+			}
 
-			if a.readyToFlush(msg) {
+			if a.buffer.readyToFlush(msg) {
 				output = a.output
 			} else if a.parent.conf.Producer.Flush.Frequency > 0 && a.timer == nil {
 				a.timer = time.After(a.parent.conf.Producer.Flush.Frequency)
@@ -576,58 +578,22 @@ func (a *aggregator) run() {
 	}
 
 shutdown:
-	if len(a.buffer) > 0 {
+	if !a.buffer.empty() {
 		a.output <- a.buffer
 	}
 	close(a.output)
 }
 
-func (a *aggregator) wouldOverflow(msg *ProducerMessage) bool {
-	switch {
-	// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
-	case a.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)):
-		return true
-	// Would we overflow the size-limit of a compressed message-batch?
-	case a.parent.conf.Producer.Compression != CompressionNone && a.bufferBytes+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes:
-		return true
-	// Would we overflow simply in number of messages?
-	case a.parent.conf.Producer.Flush.MaxMessages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.MaxMessages:
-		return true
-	default:
-		return false
-	}
-}
-
-func (a *aggregator) readyToFlush(msg *ProducerMessage) bool {
-	switch {
-	// If all three config values are 0, we always flush as-fast-as-possible
-	case a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0:
-		return true
-	// If the messages is a chaser we must flush to maintain the state-machine
-	case msg.flags&chaser == chaser:
-		return true
-	// If we've  passed the message trigger-point
-	case a.parent.conf.Producer.Flush.Messages > 0 && len(a.buffer) >= a.parent.conf.Producer.Flush.Messages:
-		return true
-	// If we've  passed the byte trigger-point
-	case a.parent.conf.Producer.Flush.Bytes > 0 && a.bufferBytes >= a.parent.conf.Producer.Flush.Bytes:
-		return true
-	default:
-		return false
-	}
-}
-
 func (a *aggregator) reset() {
 	a.timer = nil
-	a.buffer = nil
-	a.bufferBytes = 0
+	a.buffer = newProduceSet(a.parent)
 }
 
 // takes a batch at a time from the aggregator and sends to the broker
 type flusher struct {
 	parent *asyncProducer
 	broker *Broker
-	input  <-chan []*ProducerMessage
+	input  <-chan *produceSet
 
 	currentRetries map[string]map[int32]error
 }
@@ -639,11 +605,13 @@ func (f *flusher) run() {
 
 	for batch := range f.input {
 		if closing != nil {
-			f.parent.retryMessages(batch, closing)
+			batch.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
+				f.parent.retryMessages(msgs, closing)
+			})
 			continue
 		}
 
-		set := f.groupAndFilter(batch)
+		set := f.filter(batch)
 		if set.empty() {
 			continue
 		}
@@ -683,30 +651,32 @@ func (f *flusher) run() {
 	Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
 }
 
-func (f *flusher) groupAndFilter(batch []*ProducerMessage) *produceSet {
+func (f *flusher) filter(batch *produceSet) *produceSet {
 	set := newProduceSet(f.parent)
 
-	for _, msg := range batch {
+	batch.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
+		for _, msg := range msgs {
 
-		if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil {
-			// we're currently retrying this partition so we need to filter out this message
-			f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])
+			if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil {
+				// we're currently retrying this partition so we need to filter out this message
+				f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])
 
-			if msg.flags&chaser == chaser {
-				// ...but now we can start processing future messages again
-				Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
-					f.broker.ID(), msg.Topic, msg.Partition)
-				delete(f.currentRetries[msg.Topic], msg.Partition)
-			}
+				if msg.flags&chaser == chaser {
+					// ...but now we can start processing future messages again
+					Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
+						f.broker.ID(), msg.Topic, msg.Partition)
+					delete(f.currentRetries[msg.Topic], msg.Partition)
+				}
 
-			continue
-		}
+				continue
+			}
 
-		if err := set.add(msg); err != nil {
-			f.parent.returnError(msg, err)
-			continue
+			if err := set.add(msg); err != nil {
+				f.parent.returnError(msg, err)
+				continue
+			}
 		}
-	}
+	})
 
 	return set
 }
@@ -874,6 +844,46 @@ func (ps *produceSet) eachPartition(cb func(topic string, partition int32, msgs
 	}
 }
 
+func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
+	switch {
+	// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
+	case ps.bufferBytes+msg.byteSize() >= int(MaxRequestSize-(10*1024)):
+		return true
+	// Would we overflow the size-limit of a compressed message-batch for this partition?
+	case ps.parent.conf.Producer.Compression != CompressionNone &&
+		ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
+		ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize() >= ps.parent.conf.Producer.MaxMessageBytes:
+		return true
+	// Would we overflow simply in number of messages?
+	case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages:
+		return true
+	default:
+		return false
+	}
+}
+
+func (ps *produceSet) readyToFlush(msg *ProducerMessage) bool {
+	switch {
+	// If we don't have any messages, nothing else matters
+	case ps.empty():
+		return false
+	// If all three config values are 0, we always flush as-fast-as-possible
+	case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0:
+		return true
+	// If the messages is ps chaser we must flush to maintain the state-machine
+	case msg.flags&chaser == chaser:
+		return true
+	// If we've passed the message trigger-point
+	case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages:
+		return true
+	// If we've passed the byte trigger-point
+	case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
+		return true
+	default:
+		return false
+	}
+}
+
 func (ps *produceSet) empty() bool {
 	return ps.bufferCount == 0
 }

+ 2 - 2
async_producer_test.go

@@ -320,7 +320,7 @@ func TestAsyncProducerEncoderFailures(t *testing.T) {
 	leader.Returns(prodSuccess)
 
 	config := NewConfig()
-	config.Producer.Flush.Messages = 3
+	config.Producer.Flush.Messages = 1
 	config.Producer.Return.Successes = true
 	config.Producer.Partitioner = NewManualPartitioner
 	producer, err := NewAsyncProducer([]string{seedBroker.Addr()}, config)
@@ -330,8 +330,8 @@ func TestAsyncProducerEncoderFailures(t *testing.T) {
 
 	for flush := 0; flush < 3; flush++ {
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(false)}
-		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
 		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(false), Value: flakyEncoder(true)}
+		producer.Input() <- &ProducerMessage{Topic: "my_topic", Key: flakyEncoder(true), Value: flakyEncoder(true)}
 		expectResults(t, producer, 1, 2)
 	}