Browse Source

Merge pull request #529 from Shopify/fix-consumer-race

Fix consumer race panic on close
Evan Huus 9 years ago
parent
commit
9c6285c339
1 changed files with 45 additions and 37 deletions
  1. 45 37
      consumer.go

+ 45 - 37
consumer.go

@@ -572,10 +572,7 @@ func (bc *brokerConsumer) subscriptionConsumer() {
 
 	// the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
 	for newSubscriptions := range bc.newSubscriptions {
-		for _, child := range newSubscriptions {
-			bc.subscriptions[child] = none{}
-			Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
-		}
+		bc.updateSubscriptions(newSubscriptions)
 
 		if len(bc.subscriptions) == 0 {
 			// We're about to be shut down or we're about to receive more subscriptions.
@@ -601,8 +598,12 @@ func (bc *brokerConsumer) subscriptionConsumer() {
 	}
 }
 
-func (bc *brokerConsumer) handleResponses() {
-	// handles the response codes left for us by our subscriptions, and abandons ones that have been closed
+func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
+	for _, child := range newSubscriptions {
+		bc.subscriptions[child] = none{}
+		Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
+	}
+
 	for child := range bc.subscriptions {
 		select {
 		case <-child.dying:
@@ -610,37 +611,44 @@ func (bc *brokerConsumer) handleResponses() {
 			close(child.trigger)
 			delete(bc.subscriptions, child)
 		default:
-			result := child.responseResult
-			child.responseResult = nil
-
-			switch result {
-			case nil:
-				break
-			case errTimedOut:
-				Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
-					bc.broker.ID(), child.topic, child.partition)
-				delete(bc.subscriptions, child)
-			case ErrOffsetOutOfRange:
-				// there's no point in retrying this it will just fail the same way again
-				// shut it down and force the user to choose what to do
-				child.sendError(result)
-				Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
-				close(child.trigger)
-				delete(bc.subscriptions, child)
-			case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
-				// not an error, but does need redispatching
-				Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
-					bc.broker.ID(), child.topic, child.partition, result)
-				child.trigger <- none{}
-				delete(bc.subscriptions, child)
-			default:
-				// dunno, tell the user and try redispatching
-				child.sendError(result)
-				Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
-					bc.broker.ID(), child.topic, child.partition, result)
-				child.trigger <- none{}
-				delete(bc.subscriptions, child)
-			}
+			break
+		}
+	}
+}
+
+func (bc *brokerConsumer) handleResponses() {
+	// handles the response codes left for us by our subscriptions, and abandons ones that have been closed
+	for child := range bc.subscriptions {
+		result := child.responseResult
+		child.responseResult = nil
+
+		switch result {
+		case nil:
+			break
+		case errTimedOut:
+			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
+				bc.broker.ID(), child.topic, child.partition)
+			delete(bc.subscriptions, child)
+		case ErrOffsetOutOfRange:
+			// there's no point in retrying this it will just fail the same way again
+			// shut it down and force the user to choose what to do
+			child.sendError(result)
+			Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
+			close(child.trigger)
+			delete(bc.subscriptions, child)
+		case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
+			// not an error, but does need redispatching
+			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
+				bc.broker.ID(), child.topic, child.partition, result)
+			child.trigger <- none{}
+			delete(bc.subscriptions, child)
+		default:
+			// dunno, tell the user and try redispatching
+			child.sendError(result)
+			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
+				bc.broker.ID(), child.topic, child.partition, result)
+			child.trigger <- none{}
+			delete(bc.subscriptions, child)
 		}
 	}
 }