|
|
@@ -540,7 +540,7 @@ func (bc *brokerConsumer) subscriptionManager() {
|
|
|
var buffer []*partitionConsumer
|
|
|
|
|
|
// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
|
|
|
- // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
|
|
|
+ // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
|
|
|
// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
|
|
|
// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
|
|
|
// so the main goroutine can block waiting for work if it has none.
|
|
|
@@ -671,8 +671,12 @@ func (bc *brokerConsumer) abort(err error) {
|
|
|
child.trigger <- none{}
|
|
|
}
|
|
|
|
|
|
- for newSubscription := range bc.newSubscriptions {
|
|
|
- for _, child := range newSubscription {
|
|
|
+ for newSubscriptions := range bc.newSubscriptions {
|
|
|
+ if len(newSubscriptions) == 0 {
|
|
|
+ <-bc.wait
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ for _, child := range newSubscriptions {
|
|
|
child.sendError(err)
|
|
|
child.trigger <- none{}
|
|
|
}
|