Browse Source

Merge pull request #1156 from imjustfly/master

try fixing a PartitionConsumer's race condition
Vlad Gorodetsky 7 năm trước cách đây
mục cha
commit
da30382100
1 tập tin đã thay đổi với 9 bổ sung7 xóa
  1. 9 7
      consumer.go

+ 9 - 7
consumer.go

@@ -421,12 +421,6 @@ func (child *partitionConsumer) AsyncClose() {
 func (child *partitionConsumer) Close() error {
 	child.AsyncClose()
 
-	go withRecover(func() {
-		for range child.messages {
-			// drain
-		}
-	})
-
 	var errors ConsumerErrors
 	for err := range child.errors {
 		errors = append(errors, err)
@@ -458,14 +452,22 @@ feederLoop:
 		for i, msg := range msgs {
 		messageSelect:
 			select {
+			case <-child.dying:
+				child.broker.acks.Done()
+				continue feederLoop
 			case child.messages <- msg:
 				firstAttempt = true
 			case <-expiryTicker.C:
 				if !firstAttempt {
 					child.responseResult = errTimedOut
 					child.broker.acks.Done()
+				remainingLoop:
 					for _, msg = range msgs[i:] {
-						child.messages <- msg
+						select {
+						case child.messages <- msg:
+						case <-child.dying:
+							break remainingLoop
+						}
 					}
 					child.broker.input <- child
 					continue feederLoop