Browse Source

optimized compatibility

justfly 7 years ago
parent
commit
dcc4684462
2 changed files with 9 additions and 21 deletions
  1. 9 1
      consumer.go
  2. 0 20
      consumer_test.go

+ 9 - 1
consumer.go

@@ -448,14 +448,22 @@ feederLoop:
 		for i, msg := range msgs {
 		messageSelect:
 			select {
+			case <-child.dying:
+				child.broker.acks.Done()
+				continue feederLoop
 			case child.messages <- msg:
 				firstAttempt = true
 			case <-expiryTicker.C:
 				if !firstAttempt {
 					child.responseResult = errTimedOut
 					child.broker.acks.Done()
+				remainingLoop:
 					for _, msg = range msgs[i:] {
-						child.messages <- msg
+						select {
+						case child.messages <- msg:
+						case <-child.dying:
+							break remainingLoop
+						}
 					}
 					child.broker.input <- child
 					continue feederLoop

+ 0 - 20
consumer_test.go

@@ -807,28 +807,8 @@ func TestConsumerInterleavedClose(t *testing.T) {
 	assertMessageOffset(t, <-c0.Messages(), 1001)
 	assertMessageOffset(t, <-c0.Messages(), 1002)
 
-	var wg sync.WaitGroup
-	wg.Add(2)
-
-	go func() {
-		defer wg.Done()
-
-		for range c0.Messages() {
-			// drain
-		}
-	}()
-	go func() {
-		defer wg.Done()
-
-		for range c1.Messages() {
-			// drain
-		}
-	}()
-
 	safeClose(t, c1)
 	safeClose(t, c0)
-	wg.Wait()
-
 	safeClose(t, master)
 	broker0.Close()
 }