Browse Source

fix erronous errTimedOut ("abandoned subscription...because consuming was taking too long")

The expiryTimer continues to run after msg is delivered to
child.messages. If <-child.feeder takes > MaxProcessingTime (which
depends on how fast the broker runs), the expiryTimer can expire
before we Reset() it. If this happens there is a message waiting
in expiryTimer.C which, if we don't clear it out, makes us think
the consumer stalled.

This is reproduced with a high traffic partition on a busy
broker, and measuring the time each line of the loop takes. The
>100ms stall is the <-child.feeder inside 'range child.feeder'.
Nicolas S. Dade 9 years ago
parent
commit
906ed729ae
1 changed files with 4 additions and 1 deletions
  1. 4 1
      consumer.go

+ 4 - 1
consumer.go

@@ -420,7 +420,10 @@ feederLoop:
 		msgs, child.responseResult = child.parseResponse(response)
 		msgs, child.responseResult = child.parseResponse(response)
 
 
 		for i, msg := range msgs {
 		for i, msg := range msgs {
-			expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
+			if !expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime) {
+				// expiryTimer was expired; clear out the waiting msg
+				<-expiryTimer.C
+			}
 
 
 			select {
 			select {
 			case child.messages <- msg:
 			case child.messages <- msg: