浏览代码

Structure the aggregator and flusher goroutines

Another chunk of #300. Once this lands the final step will be to break out the
existing massive `run()` methods into useful helper functions now that
parameter-passing isn't such a pain.

In theory no functional changes; just slightly nicer code and stricter typing
around channel direction.
Evan Huus 10 年之前
父节点
当前提交
9a9cdfe8b6
共有 1 个文件被更改,包括 79 次插入48 次删除
  1. 79 48
      async_producer.go

+ 79 - 48
async_producer.go

@@ -56,8 +56,8 @@ type asyncProducer struct {
 	input, successes, retries chan *ProducerMessage
 	inFlight                  sync.WaitGroup
 
-	brokers    map[*Broker]chan *ProducerMessage
-	brokerRefs map[chan *ProducerMessage]int
+	brokers    map[*Broker]chan<- *ProducerMessage
+	brokerRefs map[chan<- *ProducerMessage]int
 	brokerLock sync.Mutex
 }
 
@@ -91,8 +91,8 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
 		input:      make(chan *ProducerMessage),
 		successes:  make(chan *ProducerMessage),
 		retries:    make(chan *ProducerMessage),
-		brokers:    make(map[*Broker]chan *ProducerMessage),
-		brokerRefs: make(map[chan *ProducerMessage]int),
+		brokers:    make(map[*Broker]chan<- *ProducerMessage),
+		brokerRefs: make(map[chan<- *ProducerMessage]int),
 	}
 
 	// launch our singleton dispatchers
@@ -347,7 +347,7 @@ type partitionProducer struct {
 
 	leader  *Broker
 	breaker *breaker.Breaker
-	output  chan *ProducerMessage
+	output  chan<- *ProducerMessage
 
 	// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
 	// all other messages get buffered in retryState[msg.retries].buf to preserve ordering
@@ -491,37 +491,63 @@ func (pp *partitionProducer) updateLeader() error {
 	})
 }
 
+func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
+	input := make(chan *ProducerMessage)
+	bridge := make(chan []*ProducerMessage)
+
+	a := &aggregator{
+		parent: p,
+		broker: broker,
+		input:  input,
+		output: bridge,
+	}
+	go withRecover(a.run)
+
+	f := &flusher{
+		parent: p,
+		broker: broker,
+		input:  bridge,
+	}
+	go withRecover(f.run)
+
+	return input
+}
+
 // one per broker
 // groups messages together into appropriately-sized batches for sending to the broker
 // based on https://godoc.org/github.com/eapache/channels#BatchingChannel
-func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *ProducerMessage) {
+type aggregator struct {
+	parent *asyncProducer
+	broker *Broker
+	input  <-chan *ProducerMessage
+	output chan<- []*ProducerMessage
+}
+
+func (a *aggregator) run() {
 	var (
 		timer            <-chan time.Time
 		buffer           []*ProducerMessage
-		flushTriggered   chan []*ProducerMessage
+		flushTriggered   chan<- []*ProducerMessage
 		bytesAccumulated int
 		defaultFlush     bool
 	)
 
-	if p.conf.Producer.Flush.Frequency == 0 && p.conf.Producer.Flush.Bytes == 0 && p.conf.Producer.Flush.Messages == 0 {
+	if a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0 {
 		defaultFlush = true
 	}
 
-	output := make(chan []*ProducerMessage)
-	go withRecover(func() { p.flusher(broker, output) })
-
 	for {
 		select {
-		case msg := <-input:
+		case msg := <-a.input:
 			if msg == nil {
 				goto shutdown
 			}
 
 			if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
-				(p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) ||
-				(p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) {
-				Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", broker.ID())
-				output <- buffer
+				(a.parent.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes) ||
+				(a.parent.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.MaxMessages) {
+				Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID())
+				a.output <- buffer
 				timer = nil
 				buffer = nil
 				flushTriggered = nil
@@ -533,14 +559,14 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *Producer
 
 			if defaultFlush ||
 				msg.flags&chaser == chaser ||
-				(p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) ||
-				(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
-				flushTriggered = output
-			} else if p.conf.Producer.Flush.Frequency > 0 && timer == nil {
-				timer = time.After(p.conf.Producer.Flush.Frequency)
+				(a.parent.conf.Producer.Flush.Messages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.Messages) ||
+				(a.parent.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= a.parent.conf.Producer.Flush.Bytes) {
+				flushTriggered = a.output
+			} else if a.parent.conf.Producer.Flush.Frequency > 0 && timer == nil {
+				timer = time.After(a.parent.conf.Producer.Flush.Frequency)
 			}
 		case <-timer:
-			flushTriggered = output
+			flushTriggered = a.output
 		case flushTriggered <- buffer:
 			timer = nil
 			buffer = nil
@@ -551,21 +577,27 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *Producer
 
 shutdown:
 	if len(buffer) > 0 {
-		output <- buffer
+		a.output <- buffer
 	}
-	close(output)
+	close(a.output)
 }
 
 // one per broker
 // takes a batch at a time from the messageAggregator and sends to the broker
-func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) {
+type flusher struct {
+	parent *asyncProducer
+	broker *Broker
+	input  <-chan []*ProducerMessage
+}
+
+func (f *flusher) run() {
 	var closing error
 	currentRetries := make(map[string]map[int32]error)
-	Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
+	Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID())
 
-	for batch := range input {
+	for batch := range f.input {
 		if closing != nil {
-			p.retryMessages(batch, closing)
+			f.parent.retryMessages(batch, closing)
 			continue
 		}
 
@@ -576,10 +608,10 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
 				if msg.flags&chaser == chaser {
 					// we can start processing this topic/partition again
 					Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
-						broker.ID(), msg.Topic, msg.Partition)
+						f.broker.ID(), msg.Topic, msg.Partition)
 					currentRetries[msg.Topic][msg.Partition] = nil
 				}
-				p.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.Partition])
+				f.parent.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.Partition])
 				batch[i] = nil // to prevent it being returned/retried twice
 				continue
 			}
@@ -593,31 +625,31 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
 			partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg)
 		}
 
-		request := p.buildRequest(msgSets)
+		request := f.parent.buildRequest(msgSets)
 		if request == nil {
 			continue
 		}
 
-		response, err := broker.Produce(request)
+		response, err := f.broker.Produce(request)
 
 		switch err.(type) {
 		case nil:
 			break
 		case PacketEncodingError:
-			p.returnErrors(batch, err)
+			f.parent.returnErrors(batch, err)
 			continue
 		default:
-			Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
-			p.abandonBrokerConnection(broker)
-			_ = broker.Close()
+			Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err)
+			f.parent.abandonBrokerConnection(f.broker)
+			_ = f.broker.Close()
 			closing = err
-			p.retryMessages(batch, err)
+			f.parent.retryMessages(batch, err)
 			continue
 		}
 
 		if response == nil {
 			// this only happens when RequiredAcks is NoResponse, so we have to assume success
-			p.returnSuccesses(batch)
+			f.parent.returnSuccesses(batch)
 			continue
 		}
 
@@ -628,7 +660,7 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
 
 				block := response.GetBlock(topic, partition)
 				if block == nil {
-					p.returnErrors(msgs, ErrIncompleteResponse)
+					f.parent.returnErrors(msgs, ErrIncompleteResponse)
 					continue
 				}
 
@@ -638,23 +670,23 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
 					for i := range msgs {
 						msgs[i].Offset = block.Offset + int64(i)
 					}
-					p.returnSuccesses(msgs)
+					f.parent.returnSuccesses(msgs)
 				case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
 					ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
 					Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
-						broker.ID(), topic, partition, block.Err)
+						f.broker.ID(), topic, partition, block.Err)
 					if currentRetries[topic] == nil {
 						currentRetries[topic] = make(map[int32]error)
 					}
 					currentRetries[topic][partition] = block.Err
-					p.retryMessages(msgs, block.Err)
+					f.parent.retryMessages(msgs, block.Err)
 				default:
-					p.returnErrors(msgs, block.Err)
+					f.parent.returnErrors(msgs, block.Err)
 				}
 			}
 		}
 	}
-	Logger.Printf("producer/flusher/%d shut down\n", broker.ID())
+	Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
 }
 
 // singleton
@@ -814,17 +846,16 @@ func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
 	}
 }
 
-func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
+func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage {
 	p.brokerLock.Lock()
 	defer p.brokerLock.Unlock()
 
 	bp := p.brokers[broker]
 
 	if bp == nil {
-		bp = make(chan *ProducerMessage)
+		bp = p.newBrokerProducer(broker)
 		p.brokers[broker] = bp
 		p.brokerRefs[bp] = 0
-		go withRecover(func() { p.messageAggregator(broker, bp) })
 	}
 
 	p.brokerRefs[bp]++
@@ -832,7 +863,7 @@ func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage
 	return bp
 }
 
-func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan *ProducerMessage) {
+func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) {
 	p.brokerLock.Lock()
 	defer p.brokerLock.Unlock()