|
@@ -231,16 +231,16 @@ func (c *Consumer) refWorker(broker *Broker) *consumerWorker {
|
|
|
worker := c.workers[broker]
|
|
worker := c.workers[broker]
|
|
|
if worker == nil {
|
|
if worker == nil {
|
|
|
worker = &consumerWorker{
|
|
worker = &consumerWorker{
|
|
|
- consumer: c,
|
|
|
|
|
- broker: broker,
|
|
|
|
|
- input: make(chan *PartitionConsumer),
|
|
|
|
|
- newWork: make(chan []*PartitionConsumer),
|
|
|
|
|
- wait: make(chan none),
|
|
|
|
|
- work: make(map[*PartitionConsumer]none),
|
|
|
|
|
- refs: 1,
|
|
|
|
|
|
|
+ consumer: c,
|
|
|
|
|
+ broker: broker,
|
|
|
|
|
+ input: make(chan *PartitionConsumer),
|
|
|
|
|
+ newSubscriptions: make(chan []*PartitionConsumer),
|
|
|
|
|
+ wait: make(chan none),
|
|
|
|
|
+ subscriptions: make(map[*PartitionConsumer]none),
|
|
|
|
|
+ refs: 1,
|
|
|
}
|
|
}
|
|
|
- go withRecover(worker.bridge)
|
|
|
|
|
- go withRecover(worker.doWork)
|
|
|
|
|
|
|
+ go withRecover(worker.subscriptionManager)
|
|
|
|
|
+ go withRecover(worker.subscriptionConsumer)
|
|
|
c.workers[broker] = worker
|
|
c.workers[broker] = worker
|
|
|
} else {
|
|
} else {
|
|
|
worker.refs++
|
|
worker.refs++
|
|
@@ -392,22 +392,22 @@ func (child *PartitionConsumer) Close() error {
|
|
|
// consumerWorker
|
|
// consumerWorker
|
|
|
|
|
|
|
|
type consumerWorker struct {
|
|
type consumerWorker struct {
|
|
|
- consumer *Consumer
|
|
|
|
|
- broker *Broker
|
|
|
|
|
- input chan *PartitionConsumer
|
|
|
|
|
- newWork chan []*PartitionConsumer
|
|
|
|
|
- wait chan none
|
|
|
|
|
- work map[*PartitionConsumer]none
|
|
|
|
|
- refs int
|
|
|
|
|
|
|
+ consumer *Consumer
|
|
|
|
|
+ broker *Broker
|
|
|
|
|
+ input chan *PartitionConsumer
|
|
|
|
|
+ newSubscriptions chan []*PartitionConsumer
|
|
|
|
|
+ wait chan none
|
|
|
|
|
+ subscriptions map[*PartitionConsumer]none
|
|
|
|
|
+ refs int
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (w *consumerWorker) bridge() {
|
|
|
|
|
|
|
+func (w *consumerWorker) subscriptionManager() {
|
|
|
var buffer []*PartitionConsumer
|
|
var buffer []*PartitionConsumer
|
|
|
|
|
|
|
|
- // The bridge constantly accepts new work on `input` (even when the main worker goroutine
|
|
|
|
|
- // is in the middle of a network request) and batches it up. The main worker goroutine picks
|
|
|
|
|
- // up a batch of new work between every network request by reading from `newWork`, so we give
|
|
|
|
|
- // it nil if no new work is available. We also write to `wait` only when new work is available,
|
|
|
|
|
|
|
+ // The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
|
|
|
|
|
+ // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
|
|
|
|
|
+ // up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
|
|
|
|
|
+ // it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
|
|
|
// so the main goroutine can block waiting for work if it has none.
|
|
// so the main goroutine can block waiting for work if it has none.
|
|
|
for {
|
|
for {
|
|
|
if len(buffer) > 0 {
|
|
if len(buffer) > 0 {
|
|
@@ -417,7 +417,7 @@ func (w *consumerWorker) bridge() {
|
|
|
goto done
|
|
goto done
|
|
|
}
|
|
}
|
|
|
buffer = append(buffer, event)
|
|
buffer = append(buffer, event)
|
|
|
- case w.newWork <- buffer:
|
|
|
|
|
|
|
+ case w.newSubscriptions <- buffer:
|
|
|
buffer = nil
|
|
buffer = nil
|
|
|
case w.wait <- none{}:
|
|
case w.wait <- none{}:
|
|
|
}
|
|
}
|
|
@@ -428,7 +428,7 @@ func (w *consumerWorker) bridge() {
|
|
|
goto done
|
|
goto done
|
|
|
}
|
|
}
|
|
|
buffer = append(buffer, event)
|
|
buffer = append(buffer, event)
|
|
|
- case w.newWork <- nil:
|
|
|
|
|
|
|
+ case w.newSubscriptions <- nil:
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -436,20 +436,20 @@ func (w *consumerWorker) bridge() {
|
|
|
done:
|
|
done:
|
|
|
close(w.wait)
|
|
close(w.wait)
|
|
|
if len(buffer) > 0 {
|
|
if len(buffer) > 0 {
|
|
|
- w.newWork <- buffer
|
|
|
|
|
|
|
+ w.newSubscriptions <- buffer
|
|
|
}
|
|
}
|
|
|
- close(w.newWork)
|
|
|
|
|
|
|
+ close(w.newSubscriptions)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (w *consumerWorker) doWork() {
|
|
|
|
|
|
|
+func (w *consumerWorker) subscriptionConsumer() {
|
|
|
<-w.wait // wait for our first piece of work
|
|
<-w.wait // wait for our first piece of work
|
|
|
|
|
|
|
|
- // the bridge ensures we will get nil right away if no new work is available
|
|
|
|
|
- for newWork := range w.newWork {
|
|
|
|
|
- w.updateWorkCache(newWork)
|
|
|
|
|
|
|
+ // the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
|
|
|
|
|
+ for newSubscriptions := range w.newSubscriptions {
|
|
|
|
|
+ w.updateSubscriptionCache(newSubscriptions)
|
|
|
|
|
|
|
|
- if len(w.work) == 0 {
|
|
|
|
|
- // We're about to be shut down or we're about to receive more work.
|
|
|
|
|
|
|
+ if len(w.subscriptions) == 0 {
|
|
|
|
|
+ // We're about to be shut down or we're about to receive more subscriptions.
|
|
|
// Either way, the signal just hasn't propagated to our goroutine yet.
|
|
// Either way, the signal just hasn't propagated to our goroutine yet.
|
|
|
<-w.wait
|
|
<-w.wait
|
|
|
continue
|
|
continue
|
|
@@ -463,12 +463,12 @@ func (w *consumerWorker) doWork() {
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- for child, _ := range w.work {
|
|
|
|
|
|
|
+ for child, _ := range w.subscriptions {
|
|
|
block := response.GetBlock(child.topic, child.partition)
|
|
block := response.GetBlock(child.topic, child.partition)
|
|
|
if block == nil {
|
|
if block == nil {
|
|
|
child.sendError(IncompleteResponse)
|
|
child.sendError(IncompleteResponse)
|
|
|
child.trigger <- none{}
|
|
child.trigger <- none{}
|
|
|
- delete(w.work, child)
|
|
|
|
|
|
|
+ delete(w.subscriptions, child)
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -477,17 +477,17 @@ func (w *consumerWorker) doWork() {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (w *consumerWorker) updateWorkCache(newWork []*PartitionConsumer) {
|
|
|
|
|
- // take new work, and abandon work that has been closed
|
|
|
|
|
- for _, child := range newWork {
|
|
|
|
|
- w.work[child] = none{}
|
|
|
|
|
|
|
+func (w *consumerWorker) updateSubscriptionCache(newSubscriptions []*PartitionConsumer) {
|
|
|
|
|
+ // take new subscriptions, and abandon subscriptions that have been closed
|
|
|
|
|
+ for _, child := range newSubscriptions {
|
|
|
|
|
+ w.subscriptions[child] = none{}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- for child, _ := range w.work {
|
|
|
|
|
|
|
+ for child, _ := range w.subscriptions {
|
|
|
select {
|
|
select {
|
|
|
case <-child.dying:
|
|
case <-child.dying:
|
|
|
close(child.trigger)
|
|
close(child.trigger)
|
|
|
- delete(w.work, child)
|
|
|
|
|
|
|
+ delete(w.subscriptions, child)
|
|
|
default:
|
|
default:
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -497,13 +497,13 @@ func (w *consumerWorker) abort(err error) {
|
|
|
_ = w.broker.Close() // we don't care about the error this might return, we already have one
|
|
_ = w.broker.Close() // we don't care about the error this might return, we already have one
|
|
|
w.consumer.client.disconnectBroker(w.broker)
|
|
w.consumer.client.disconnectBroker(w.broker)
|
|
|
|
|
|
|
|
- for child, _ := range w.work {
|
|
|
|
|
|
|
+ for child, _ := range w.subscriptions {
|
|
|
child.sendError(err)
|
|
child.sendError(err)
|
|
|
child.trigger <- none{}
|
|
child.trigger <- none{}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- for newWork := range w.newWork {
|
|
|
|
|
- for _, child := range newWork {
|
|
|
|
|
|
|
+ for newSubscription := range w.newSubscriptions {
|
|
|
|
|
+ for _, child := range newSubscription {
|
|
|
child.sendError(err)
|
|
child.sendError(err)
|
|
|
child.trigger <- none{}
|
|
child.trigger <- none{}
|
|
|
}
|
|
}
|
|
@@ -516,7 +516,7 @@ func (w *consumerWorker) fetchNewMessages() (*FetchResponse, error) {
|
|
|
MaxWaitTime: int32(w.consumer.config.MaxWaitTime / time.Millisecond),
|
|
MaxWaitTime: int32(w.consumer.config.MaxWaitTime / time.Millisecond),
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- for child, _ := range w.work {
|
|
|
|
|
|
|
+ for child, _ := range w.subscriptions {
|
|
|
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
|
|
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -533,7 +533,7 @@ func (w *consumerWorker) handleResponse(child *PartitionConsumer, block *FetchRe
|
|
|
case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
|
|
case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
|
|
|
// doesn't belong to us, redispatch it
|
|
// doesn't belong to us, redispatch it
|
|
|
child.trigger <- none{}
|
|
child.trigger <- none{}
|
|
|
- delete(w.work, child)
|
|
|
|
|
|
|
+ delete(w.subscriptions, child)
|
|
|
return
|
|
return
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -590,6 +590,6 @@ func (w *consumerWorker) handleResponse(child *PartitionConsumer, block *FetchRe
|
|
|
if incomplete || !atLeastOne {
|
|
if incomplete || !atLeastOne {
|
|
|
child.sendError(IncompleteResponse)
|
|
child.sendError(IncompleteResponse)
|
|
|
child.trigger <- none{}
|
|
child.trigger <- none{}
|
|
|
- delete(w.work, child)
|
|
|
|
|
|
|
+ delete(w.subscriptions, child)
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|