Browse Source

dont't drain PartitionConsumer's messages channel when closing

justfly 7 years ago
parent
commit
58a9d32ae0
2 changed files with 20 additions and 6 deletions
  1. 0 6
      consumer.go
  2. 20 0
      consumer_test.go

+ 0 - 6
consumer.go

@@ -421,12 +421,6 @@ func (child *partitionConsumer) AsyncClose() {
 func (child *partitionConsumer) Close() error {
 	child.AsyncClose()
 
-	go withRecover(func() {
-		for range child.messages {
-			// drain
-		}
-	})
-
 	var errors ConsumerErrors
 	for err := range child.errors {
 		errors = append(errors, err)

+ 20 - 0
consumer_test.go

@@ -807,8 +807,28 @@ func TestConsumerInterleavedClose(t *testing.T) {
 	assertMessageOffset(t, <-c0.Messages(), 1001)
 	assertMessageOffset(t, <-c0.Messages(), 1002)
 
+	var wg sync.WaitGroup
+	wg.Add(2)
+
+	go func() {
+		defer wg.Done()
+
+		for range c0.Messages() {
+			// drain
+		}
+	}()
+	go func() {
+		defer wg.Done()
+
+		for range c1.Messages() {
+			// drain
+		}
+	}()
+
 	safeClose(t, c1)
 	safeClose(t, c0)
+	wg.Wait()
+
 	safeClose(t, master)
 	broker0.Close()
 }