Przeglądaj źródła

Merge pull request #734 from ChannelMeter/fix/timer-reset-logic

Deadlock: Don't read from the timer channel if it already expired
Evan Huus 9 lat temu
rodzic
commit
94895118af
1 zmienionych plików z 4 dodań i 1 usunięć
  1. 4 1
      consumer.go

+ 4 - 1
consumer.go

@@ -414,21 +414,24 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
 func (child *partitionConsumer) responseFeeder() {
 	var msgs []*ConsumerMessage
 	expiryTimer := time.NewTimer(child.conf.Consumer.MaxProcessingTime)
+	expireTimedOut := false
 
 feederLoop:
 	for response := range child.feeder {
 		msgs, child.responseResult = child.parseResponse(response)
 
 		for i, msg := range msgs {
-			if !expiryTimer.Stop() {
+			if !expiryTimer.Stop() && !expireTimedOut {
 				// expiryTimer was expired; clear out the waiting msg
 				<-expiryTimer.C
 			}
 			expiryTimer.Reset(child.conf.Consumer.MaxProcessingTime)
+			expireTimedOut = false
 
 			select {
 			case child.messages <- msg:
 			case <-expiryTimer.C:
+				expireTimedOut = true
 				child.responseResult = errTimedOut
 				child.broker.acks.Done()
 				for _, msg = range msgs[i:] {