浏览代码

Consumer: fix possible tight loop

In certain cases the `subscriptionManager` and drain loop in `abort` could start
spinning against each other for a non-trivial amount of time, if e.g. a
partition was suspended waiting for the user to consume its buffer. The tight
loop has always been there, it has just tended to be trivially short in normal
operation so nobody noticed.

This is the same issue that the `wait` channel was introduced to solve for the
`subscriptionConsumer` so solve it the same way.
Evan Huus 9 年之前
父节点
当前提交
d41f123b4a
共有 1 个文件被更改,包括 7 次插入3 次删除
  1. 7 3
      consumer.go

+ 7 - 3
consumer.go

@@ -540,7 +540,7 @@ func (bc *brokerConsumer) subscriptionManager() {
 	var buffer []*partitionConsumer
 	var buffer []*partitionConsumer
 
 
 	// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
 	// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
-	//  goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
+	// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
 	// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
 	// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
 	// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
 	// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
 	// so the main goroutine can block waiting for work if it has none.
 	// so the main goroutine can block waiting for work if it has none.
@@ -671,8 +671,12 @@ func (bc *brokerConsumer) abort(err error) {
 		child.trigger <- none{}
 		child.trigger <- none{}
 	}
 	}
 
 
-	for newSubscription := range bc.newSubscriptions {
-		for _, child := range newSubscription {
+	for newSubscriptions := range bc.newSubscriptions {
+		if len(newSubscriptions) == 0 {
+			<-bc.wait
+			continue
+		}
+		for _, child := range newSubscriptions {
 			child.sendError(err)
 			child.sendError(err)
 			child.trigger <- none{}
 			child.trigger <- none{}
 		}
 		}