Browse Source

Consumer: fix deadlock - if the timer had already expired, don't try to read the channel again.

Nimi Wariboko Jr 9 years ago
parent
commit
f0f5b0ae1a
1 changed files with 4 additions and 1 deletions
  1. 4 1
      consumer.go

+ 4 - 1
consumer.go

@@ -414,21 +414,24 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
 func (child *partitionConsumer) responseFeeder() {
 	var msgs []*ConsumerMessage
 	expiryTimer := time.NewTimer(child.conf.Consumer.MaxProcessingTime)
+	expireTimedOut := false
 
 feederLoop:
 	for response := range child.feeder {
 		msgs, child.responseResult = child.parseResponse(response)
 
 		for i, msg := range msgs {
-			if !expiryTimer.Stop() {
+			if !expiryTimer.Stop() && !expireTimedOut {
 				// expiryTimer was expired; clear out the waiting msg
 				<-expiryTimer.C
 			}
 			expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
+			expireTimedOut = false
 
 			select {
 			case child.messages <- msg:
 			case <-expiryTimer.C:
+				expireTimedOut = true
 				child.responseResult = errTimedOut
 				child.broker.acks.Done()
 				for _, msg = range msgs[i:] {