浏览代码

consumer: fix another race pointed out by Maxim

Take the previous refactor to its logical conclusion by handling *all* the
error logic in the brokerConsumer, not the responseFeeder. This fixes the race
to close the dying channel (since the brokerConsumer can just close the trigger
instead as it has ownership).

At the same time, refactor `updateSubscriptionCache` into `handleResponses`, and
inline the "new subscriptions" bit into the main loop; otherwise we end up
processing the previous iterations results at the very beginning of the next
iteration, rather than at the very end of the current one.
Evan Huus 10 年之前
父节点
当前提交
8a91a50ecd
共有 1 个文件被更改,包括 31 次插入30 次删除
  1. 31 30
      consumer.go

+ 31 - 30
consumer.go

@@ -271,7 +271,7 @@ type partitionConsumer struct {
 	feeder   chan *FetchResponse
 	feeder   chan *FetchResponse
 
 
 	trigger, dying chan none
 	trigger, dying chan none
-	dispatchReason error
+	responseResult error
 
 
 	fetchSize           int32
 	fetchSize           int32
 	offset              int64
 	offset              int64
@@ -402,24 +402,7 @@ func (child *partitionConsumer) HighWaterMarkOffset() int64 {
 
 
 func (child *partitionConsumer) responseFeeder() {
 func (child *partitionConsumer) responseFeeder() {
 	for response := range child.feeder {
 	for response := range child.feeder {
-		switch err := child.handleResponse(response); err {
-		case nil:
-			break
-		case ErrOffsetOutOfRange:
-			// there's no point in retrying this it will just fail the same way again
-			// so shut it down and force the user to choose what to do
-			Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, err)
-			child.sendError(err)
-			child.AsyncClose()
-		case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
-			// these three are not fatal errors, but do require redispatching
-			child.dispatchReason = err
-		default:
-			// dunno, tell the user and try redispatching
-			child.sendError(err)
-			child.dispatchReason = err
-		}
-
+		child.responseResult = child.handleResponse(response)
 		child.broker.acks.Done()
 		child.broker.acks.Done()
 	}
 	}
 
 
@@ -569,7 +552,10 @@ func (bc *brokerConsumer) subscriptionConsumer() {
 
 
 	// the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
 	// the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
 	for newSubscriptions := range bc.newSubscriptions {
 	for newSubscriptions := range bc.newSubscriptions {
-		bc.updateSubscriptionCache(newSubscriptions)
+		for _, child := range newSubscriptions {
+			bc.subscriptions[child] = none{}
+			Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
+		}
 
 
 		if len(bc.subscriptions) == 0 {
 		if len(bc.subscriptions) == 0 {
 			// We're about to be shut down or we're about to receive more subscriptions.
 			// We're about to be shut down or we're about to receive more subscriptions.
@@ -591,16 +577,12 @@ func (bc *brokerConsumer) subscriptionConsumer() {
 			child.feeder <- response
 			child.feeder <- response
 		}
 		}
 		bc.acks.Wait()
 		bc.acks.Wait()
+		bc.handleResponses()
 	}
 	}
 }
 }
 
 
-func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionConsumer) {
-	// take new subscriptions, and abandon subscriptions that have been closed
-	for _, child := range newSubscriptions {
-		bc.subscriptions[child] = none{}
-		Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
-	}
-
+func (bc *brokerConsumer) handleResponses() {
+	// handles the response codes left for us by our subscriptions, and abandons ones that have been closed
 	for child := range bc.subscriptions {
 	for child := range bc.subscriptions {
 		select {
 		select {
 		case <-child.dying:
 		case <-child.dying:
@@ -608,13 +590,32 @@ func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionC
 			close(child.trigger)
 			close(child.trigger)
 			delete(bc.subscriptions, child)
 			delete(bc.subscriptions, child)
 		default:
 		default:
-			if child.dispatchReason != nil {
+			switch child.responseResult {
+			case nil:
+				break
+			case ErrOffsetOutOfRange:
+				// there's no point in retrying this it will just fail the same way again
+				// shut it down and force the user to choose what to do
+				child.sendError(child.responseResult)
+				Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, child.responseResult)
+				close(child.trigger)
+				delete(bc.subscriptions, child)
+			case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
+				// not an error, but does need redispatching
+				Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
+					bc.broker.ID(), child.topic, child.partition, child.responseResult)
+				child.trigger <- none{}
+				delete(bc.subscriptions, child)
+			default:
+				// dunno, tell the user and try redispatching
+				child.sendError(child.responseResult)
 				Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
 				Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
-					bc.broker.ID(), child.topic, child.partition, child.dispatchReason)
-				child.dispatchReason = nil
+					bc.broker.ID(), child.topic, child.partition, child.responseResult)
 				child.trigger <- none{}
 				child.trigger <- none{}
 				delete(bc.subscriptions, child)
 				delete(bc.subscriptions, child)
 			}
 			}
+
+			child.responseResult = nil
 		}
 		}
 	}
 	}
 }
 }