瀏覽代碼

Code review fixes.

Willem van Bergen 10 年之前
父節點
當前提交
ed24dc91a8
共有 2 個文件被更改,包括 29 次插入11 次删除
  1. 26 10
      consumer.go
  2. 3 1
      consumer_test.go

+ 26 - 10
consumer.go

@@ -278,6 +278,7 @@ func (c *Consumer) unrefBrokerConsumer(broker *Broker) {
 // PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close()
 // on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
 // scope (this is in addition to calling Close on the underlying consumer's client, which is still necessary).
+// You have to read from both the Messages and Errors channels to prevent the consumer from locking eventually.
 type PartitionConsumer struct {
 	consumer  *Consumer
 	config    PartitionConsumerConfig
@@ -380,23 +381,38 @@ func (child *PartitionConsumer) Messages() <-chan *ConsumerMessage {
 }
 
 // Errors returns the read channel for any errors that occurred while consuming the partition.
+// You have to read this channel to prevent the consumer from deadlock. Under no circumstances,
+// the partition consumer will shut down by itself. It will just wait until it is able to continue
+// consuming messages. If you want to shut down your consumer, you will have trigger it yourself
+// by consuming this channel and calling Close or AsyncClose when appropriate.
 func (child *PartitionConsumer) Errors() <-chan *ConsumerError {
 	return child.errors
 }
 
-// Close stops the PartitionConsumer from fetching messages. It is required to call this function before a
-// consumer object passes out of scope, as it will otherwise leak memory. You must call this before
-// calling Close on the underlying client.
-func (child *PartitionConsumer) Close() error {
+// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately,
+// after which you should wait until the 'messages' and 'errors' channel are drained.
+// It is required to call this function, or Close before a consumer object passes out of scope,
+// as it will otherwise leak memory.  You must call this before calling Close on the underlying
+// client.
+func (child *PartitionConsumer) AsyncClose() {
 	// this triggers whatever worker owns this child to abandon it and close its trigger channel, which causes
-	// the dispatcher to exit its loop, which removes it from the consumer then closes its 'events' channel
-	// (alternatively, if the child is already at the dispatcher for some reason, that will also just
-	// close itself)
+	// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
+	// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
+	// also just close itself)
 	close(child.dying)
+}
 
-	for _ = range child.messages {
-		// drain
-	}
+// Close stops the PartitionConsumer from fetching messages. It is required to call this function,
+// or AsyncCose before a consumer object passes out of scope, as it will otherwise leak memory. You must
+// call this before calling Close on the underlying client.
+func (child *PartitionConsumer) Close() error {
+	child.AsyncClose()
+
+	go withRecover(func() {
+		for _ = range child.messages {
+			// drain
+		}
+	})
 
 	var errors ConsumerErrors
 	for err := range child.errors {

+ 3 - 1
consumer_test.go

@@ -366,6 +366,7 @@ func ExampleConsumerWithGoroutines() {
 
 	wg.Add(1)
 	go func() {
+		defer wg.Done()
 		for message := range consumer.Messages() {
 			fmt.Printf("Consumed message with offset %d", message.Offset)
 			msgCount++
@@ -374,8 +375,9 @@ func ExampleConsumerWithGoroutines() {
 
 	wg.Add(1)
 	go func() {
+		defer wg.Done()
 		for err := range consumer.Errors() {
-			panic(err)
+			fmt.Println(err)
 		}
 	}()