|
|
@@ -95,7 +95,7 @@ func (config *ProducerConfig) Validate() error {
|
|
|
// Producer publishes Kafka messages. It routes messages to the correct broker
|
|
|
// for the provided topic-partition, refreshing metadata as appropriate, and
|
|
|
// parses responses for errors. You must read from the Errors() channel or the
|
|
|
-// producer will deadlock. You must call Close() on a producer to avoid
|
|
|
+// producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
|
|
|
// leaks: it will not be garbage-collected automatically when it passes out of
|
|
|
// scope (this is in addition to calling Close on the underlying client, which
|
|
|
// is still necessary).
|
|
|
@@ -228,9 +228,7 @@ func (p *Producer) Input() chan<- *MessageToSend {
|
|
|
// it may otherwise leak memory. You must call this before calling Close on the
|
|
|
// underlying client.
|
|
|
func (p *Producer) Close() error {
|
|
|
- go withRecover(func() {
|
|
|
- p.input <- &MessageToSend{flags: shutdown}
|
|
|
- })
|
|
|
+ p.AsyncClose()
|
|
|
|
|
|
if p.config.AckSuccesses {
|
|
|
go withRecover(func() {
|
|
|
@@ -244,14 +242,22 @@ func (p *Producer) Close() error {
|
|
|
errors = append(errors, event)
|
|
|
}
|
|
|
|
|
|
- close(p.successes)
|
|
|
-
|
|
|
if len(errors) > 0 {
|
|
|
return errors
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+// AsyncClose triggers a shutdown of the producer, flushing any messages it may have
|
|
|
+// buffered. The shutdown has completed when both the Errors and Successes channels
|
|
|
+// have been closed. When calling AsyncClose, you *must* continue to read from those
|
|
|
+// channels in order to drain the results of any messages in flight.
|
|
|
+func (p *Producer) AsyncClose() {
|
|
|
+ go withRecover(func() {
|
|
|
+ p.input <- &MessageToSend{flags: shutdown}
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
///////////////////////////////////////////
|
|
|
// In normal processing, a message flows through the following functions from top to bottom,
|
|
|
// starting at topicDispatcher (which reads from Producer.input) and ending in flusher
|
|
|
@@ -306,6 +312,7 @@ func (p *Producer) topicDispatcher() {
|
|
|
}
|
|
|
|
|
|
close(p.errors)
|
|
|
+ close(p.successes)
|
|
|
}
|
|
|
|
|
|
// one per topic
|