Browse Source

Merge pull request #258 from Shopify/async-close

Producer: implement AsyncClose
Evan Huus 10 years ago
parent
commit
c32c9c5cbb
1 changed files with 13 additions and 6 deletions
  1. 13 6
      producer.go

+ 13 - 6
producer.go

@@ -95,7 +95,7 @@ func (config *ProducerConfig) Validate() error {
 // Producer publishes Kafka messages. It routes messages to the correct broker
 // for the provided topic-partition, refreshing metadata as appropriate, and
 // parses responses for errors. You must read from the Errors() channel or the
-// producer will deadlock. You must call Close() on a producer to avoid
+// producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
 // leaks: it will not be garbage-collected automatically when it passes out of
 // scope (this is in addition to calling Close on the underlying client, which
 // is still necessary).
@@ -228,9 +228,7 @@ func (p *Producer) Input() chan<- *MessageToSend {
 // it may otherwise leak memory. You must call this before calling Close on the
 // underlying client.
 func (p *Producer) Close() error {
-	go withRecover(func() {
-		p.input <- &MessageToSend{flags: shutdown}
-	})
+	p.AsyncClose()
 
 	if p.config.AckSuccesses {
 		go withRecover(func() {
@@ -244,14 +242,22 @@ func (p *Producer) Close() error {
 		errors = append(errors, event)
 	}
 
-	close(p.successes)
-
 	if len(errors) > 0 {
 		return errors
 	}
 	return nil
 }
 
+// AsyncClose triggers a shutdown of the producer, flushing any messages it may have
+// buffered. The shutdown has completed when both the Errors and Successes channels
+// have been closed. When calling AsyncClose, you *must* continue to read from those
+// channels in order to drain the results of any messages in flight.
+func (p *Producer) AsyncClose() {
+	go withRecover(func() {
+		p.input <- &MessageToSend{flags: shutdown}
+	})
+}
+
 ///////////////////////////////////////////
 // In normal processing, a message flows through the following functions from top to bottom,
 // starting at topicDispatcher (which reads from Producer.input) and ending in flusher
@@ -306,6 +312,7 @@ func (p *Producer) topicDispatcher() {
 	}
 
 	close(p.errors)
+	close(p.successes)
 }
 
 // one per topic