|
|
@@ -221,6 +221,7 @@ func (p *asyncProducer) topicDispatcher() {
|
|
|
|
|
|
if msg.flags&shutdown != 0 {
|
|
|
shuttingDown = true
|
|
|
+ p.inFlight.Done()
|
|
|
continue
|
|
|
} else if msg.retries == 0 {
|
|
|
p.inFlight.Add(1)
|
|
|
@@ -256,7 +257,7 @@ func (p *asyncProducer) topicDispatcher() {
|
|
|
|
|
|
// one per topic
|
|
|
// partitions messages, then dispatches them by partition
|
|
|
-func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMessage) {
|
|
|
+func (p *asyncProducer) partitionDispatcher(topic string, input <-chan *ProducerMessage) {
|
|
|
handlers := make(map[int32]chan *ProducerMessage)
|
|
|
partitioner := p.conf.Producer.Partitioner(topic)
|
|
|
breaker := breaker.New(3, 1, 10*time.Second)
|
|
|
@@ -293,7 +294,7 @@ func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMe
|
|
|
// one per partition per topic
|
|
|
// dispatches messages to the appropriate broker
|
|
|
// also responsible for maintaining message order during retries
|
|
|
-func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input chan *ProducerMessage) {
|
|
|
+func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input <-chan *ProducerMessage) {
|
|
|
var leader *Broker
|
|
|
var output chan *ProducerMessage
|
|
|
|
|
|
@@ -413,7 +414,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
|
|
|
// one per broker
|
|
|
// groups messages together into appropriately-sized batches for sending to the broker
|
|
|
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
|
|
|
-func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
|
|
|
+func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *ProducerMessage) {
|
|
|
var (
|
|
|
timer <-chan time.Time
|
|
|
buffer []*ProducerMessage
|
|
|
@@ -477,7 +478,7 @@ shutdown:
|
|
|
|
|
|
// one per broker
|
|
|
// takes a batch at a time from the messageAggregator and sends to the broker
|
|
|
-func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
+func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) {
|
|
|
var closing error
|
|
|
currentRetries := make(map[string]map[int32]error)
|
|
|
Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
|
|
|
@@ -610,6 +611,7 @@ func (p *asyncProducer) retryHandler() {
|
|
|
|
|
|
func (p *asyncProducer) shutdown() {
|
|
|
Logger.Println("Producer shutting down.")
|
|
|
+ p.inFlight.Add(1)
|
|
|
p.input <- &ProducerMessage{flags: shutdown}
|
|
|
|
|
|
p.inFlight.Wait()
|