فهرست منبع

Merge pull request #450 from Shopify/producer_cleanup

Ensure we always have called Add() on the inflight counter before we Wait() for it
Evan Huus 10 سال پیش
والد
کامیت
6468e16972
1فایلهای تغییر یافته به همراه6 افزوده شده و 4 حذف شده
  1. 6 4
      async_producer.go

+ 6 - 4
async_producer.go

@@ -221,6 +221,7 @@ func (p *asyncProducer) topicDispatcher() {
 
 
 		if msg.flags&shutdown != 0 {
 		if msg.flags&shutdown != 0 {
 			shuttingDown = true
 			shuttingDown = true
+			p.inFlight.Done()
 			continue
 			continue
 		} else if msg.retries == 0 {
 		} else if msg.retries == 0 {
 			p.inFlight.Add(1)
 			p.inFlight.Add(1)
@@ -256,7 +257,7 @@ func (p *asyncProducer) topicDispatcher() {
 
 
 // one per topic
 // one per topic
 // partitions messages, then dispatches them by partition
 // partitions messages, then dispatches them by partition
-func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMessage) {
+func (p *asyncProducer) partitionDispatcher(topic string, input <-chan *ProducerMessage) {
 	handlers := make(map[int32]chan *ProducerMessage)
 	handlers := make(map[int32]chan *ProducerMessage)
 	partitioner := p.conf.Producer.Partitioner(topic)
 	partitioner := p.conf.Producer.Partitioner(topic)
 	breaker := breaker.New(3, 1, 10*time.Second)
 	breaker := breaker.New(3, 1, 10*time.Second)
@@ -293,7 +294,7 @@ func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMe
 // one per partition per topic
 // one per partition per topic
 // dispatches messages to the appropriate broker
 // dispatches messages to the appropriate broker
 // also responsible for maintaining message order during retries
 // also responsible for maintaining message order during retries
-func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input chan *ProducerMessage) {
+func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input <-chan *ProducerMessage) {
 	var leader *Broker
 	var leader *Broker
 	var output chan *ProducerMessage
 	var output chan *ProducerMessage
 
 
@@ -413,7 +414,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
 // one per broker
 // one per broker
 // groups messages together into appropriately-sized batches for sending to the broker
 // groups messages together into appropriately-sized batches for sending to the broker
 // based on https://godoc.org/github.com/eapache/channels#BatchingChannel
 // based on https://godoc.org/github.com/eapache/channels#BatchingChannel
-func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
+func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *ProducerMessage) {
 	var (
 	var (
 		timer            <-chan time.Time
 		timer            <-chan time.Time
 		buffer           []*ProducerMessage
 		buffer           []*ProducerMessage
@@ -477,7 +478,7 @@ shutdown:
 
 
 // one per broker
 // one per broker
 // takes a batch at a time from the messageAggregator and sends to the broker
 // takes a batch at a time from the messageAggregator and sends to the broker
-func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
+func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) {
 	var closing error
 	var closing error
 	currentRetries := make(map[string]map[int32]error)
 	currentRetries := make(map[string]map[int32]error)
 	Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
 	Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
@@ -610,6 +611,7 @@ func (p *asyncProducer) retryHandler() {
 
 
 func (p *asyncProducer) shutdown() {
 func (p *asyncProducer) shutdown() {
 	Logger.Println("Producer shutting down.")
 	Logger.Println("Producer shutting down.")
+	p.inFlight.Add(1)
 	p.input <- &ProducerMessage{flags: shutdown}
 	p.input <- &ProducerMessage{flags: shutdown}
 
 
 	p.inFlight.Wait()
 	p.inFlight.Wait()