Browse Source

Fix shutdown even harder

Worked through the happens-before graph a little more formally and found some
(admittedly unlikely) cases where shutdown would fail/panic/hang/whatever. This
version *should* be correct.
Evan Huus 11 years ago
parent
commit
a72b884892
1 changed files with 7 additions and 2 deletions
  1. 7 2
      producer.go

+ 7 - 2
producer.go

@@ -202,7 +202,6 @@ func (p *Producer) Input() chan<- *MessageToSend {
 func (p *Producer) Close() error {
 	go func() {
 		p.input <- &MessageToSend{flags: shutdown}
-		p.retries <- &MessageToSend{flags: shutdown}
 	}()
 
 	var errors ProduceErrors
@@ -242,6 +241,7 @@ func (p *Producer) topicDispatcher() {
 
 		handler := handlers[msg.Topic]
 		if handler == nil {
+			p.retries <- &MessageToSend{flags: ref}
 			newHandler := make(chan *MessageToSend, p.config.ChannelBufferSize)
 			go withRecover(func() { p.partitionDispatcher(msg.Topic, newHandler) })
 			handler = newHandler
@@ -257,6 +257,8 @@ func (p *Producer) topicDispatcher() {
 		close(handler)
 	}
 
+	p.retries <- &MessageToSend{flags: shutdown}
+
 	for msg := range p.input {
 		p.errors <- &ProduceError{Msg: msg, Err: ShuttingDown}
 	}
@@ -280,6 +282,7 @@ func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend)
 
 		handler := handlers[msg.partition]
 		if handler == nil {
+			p.retries <- &MessageToSend{flags: ref}
 			newHandler := make(chan *MessageToSend, p.config.ChannelBufferSize)
 			go withRecover(func() { p.leaderDispatcher(msg.Topic, msg.partition, newHandler) })
 			handler = newHandler
@@ -292,6 +295,7 @@ func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend)
 	for _, handler := range handlers {
 		close(handler)
 	}
+	p.retries <- &MessageToSend{flags: unref}
 }
 
 // one per partition per topic
@@ -371,6 +375,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
 	}
 
 	p.unrefBrokerWorker(leader)
+	p.retries <- &MessageToSend{flags: unref}
 }
 
 // one per broker
@@ -436,7 +441,6 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
 	var closing error
 	currentRetries := make(map[string]map[int32]error)
 
-	p.retries <- &MessageToSend{flags: ref}
 	for batch := range input {
 		if closing != nil {
 			p.retryMessages(batch, closing)
@@ -698,6 +702,7 @@ func (p *Producer) getBrokerWorker(broker *Broker) chan *MessageToSend {
 	worker := p.brokers[broker]
 
 	if worker == nil {
+		p.retries <- &MessageToSend{flags: ref}
 		worker = &brokerWorker{
 			refs:  1,
 			input: make(chan *MessageToSend),