|
|
@@ -180,25 +180,15 @@ func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
|
|
|
c.lock.Lock()
|
|
|
defer c.lock.Unlock()
|
|
|
|
|
|
- brokerWorker := c.brokerConsumers[broker]
|
|
|
- if brokerWorker == nil {
|
|
|
- brokerWorker = &brokerConsumer{
|
|
|
- consumer: c,
|
|
|
- broker: broker,
|
|
|
- input: make(chan *partitionConsumer),
|
|
|
- newSubscriptions: make(chan []*partitionConsumer),
|
|
|
- wait: make(chan none),
|
|
|
- subscriptions: make(map[*partitionConsumer]none),
|
|
|
- refs: 0,
|
|
|
- }
|
|
|
- go withRecover(brokerWorker.subscriptionManager)
|
|
|
- go withRecover(brokerWorker.subscriptionConsumer)
|
|
|
- c.brokerConsumers[broker] = brokerWorker
|
|
|
+ bc := c.brokerConsumers[broker]
|
|
|
+ if bc == nil {
|
|
|
+ bc = c.newBrokerConsumer(broker)
|
|
|
+ c.brokerConsumers[broker] = bc
|
|
|
}
|
|
|
|
|
|
- brokerWorker.refs++
|
|
|
+ bc.refs++
|
|
|
|
|
|
- return brokerWorker
|
|
|
+ return bc
|
|
|
}
|
|
|
|
|
|
func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
|
|
|
@@ -481,7 +471,24 @@ type brokerConsumer struct {
|
|
|
refs int
|
|
|
}
|
|
|
|
|
|
-func (w *brokerConsumer) subscriptionManager() {
|
|
|
+func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
|
|
|
+ bc := &brokerConsumer{
|
|
|
+ consumer: c,
|
|
|
+ broker: broker,
|
|
|
+ input: make(chan *partitionConsumer),
|
|
|
+ newSubscriptions: make(chan []*partitionConsumer),
|
|
|
+ wait: make(chan none),
|
|
|
+ subscriptions: make(map[*partitionConsumer]none),
|
|
|
+ refs: 0,
|
|
|
+ }
|
|
|
+
|
|
|
+ go withRecover(bc.subscriptionManager)
|
|
|
+ go withRecover(bc.subscriptionConsumer)
|
|
|
+
|
|
|
+ return bc
|
|
|
+}
|
|
|
+
|
|
|
+func (bc *brokerConsumer) subscriptionManager() {
|
|
|
var buffer []*partitionConsumer
|
|
|
|
|
|
// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
|
|
|
@@ -492,58 +499,58 @@ func (w *brokerConsumer) subscriptionManager() {
|
|
|
for {
|
|
|
if len(buffer) > 0 {
|
|
|
select {
|
|
|
- case event, ok := <-w.input:
|
|
|
+ case event, ok := <-bc.input:
|
|
|
if !ok {
|
|
|
goto done
|
|
|
}
|
|
|
buffer = append(buffer, event)
|
|
|
- case w.newSubscriptions <- buffer:
|
|
|
+ case bc.newSubscriptions <- buffer:
|
|
|
buffer = nil
|
|
|
- case w.wait <- none{}:
|
|
|
+ case bc.wait <- none{}:
|
|
|
}
|
|
|
} else {
|
|
|
select {
|
|
|
- case event, ok := <-w.input:
|
|
|
+ case event, ok := <-bc.input:
|
|
|
if !ok {
|
|
|
goto done
|
|
|
}
|
|
|
buffer = append(buffer, event)
|
|
|
- case w.newSubscriptions <- nil:
|
|
|
+ case bc.newSubscriptions <- nil:
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
done:
|
|
|
- close(w.wait)
|
|
|
+ close(bc.wait)
|
|
|
if len(buffer) > 0 {
|
|
|
- w.newSubscriptions <- buffer
|
|
|
+ bc.newSubscriptions <- buffer
|
|
|
}
|
|
|
- close(w.newSubscriptions)
|
|
|
+ close(bc.newSubscriptions)
|
|
|
}
|
|
|
|
|
|
-func (w *brokerConsumer) subscriptionConsumer() {
|
|
|
- <-w.wait // wait for our first piece of work
|
|
|
+func (bc *brokerConsumer) subscriptionConsumer() {
|
|
|
+ <-bc.wait // wait for our first piece of work
|
|
|
|
|
|
// the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
|
|
|
- for newSubscriptions := range w.newSubscriptions {
|
|
|
- w.updateSubscriptionCache(newSubscriptions)
|
|
|
+ for newSubscriptions := range bc.newSubscriptions {
|
|
|
+ bc.updateSubscriptionCache(newSubscriptions)
|
|
|
|
|
|
- if len(w.subscriptions) == 0 {
|
|
|
+ if len(bc.subscriptions) == 0 {
|
|
|
// We're about to be shut down or we're about to receive more subscriptions.
|
|
|
// Either way, the signal just hasn't propagated to our goroutine yet.
|
|
|
- <-w.wait
|
|
|
+ <-bc.wait
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- response, err := w.fetchNewMessages()
|
|
|
+ response, err := bc.fetchNewMessages()
|
|
|
|
|
|
if err != nil {
|
|
|
- Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", w.broker.ID(), err)
|
|
|
- w.abort(err)
|
|
|
+ Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
|
|
|
+ bc.abort(err)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
- for child := range w.subscriptions {
|
|
|
+ for child := range bc.subscriptions {
|
|
|
if err := child.handleResponse(response); err != nil {
|
|
|
switch err {
|
|
|
case ErrOffsetOutOfRange:
|
|
|
@@ -557,42 +564,42 @@ func (w *brokerConsumer) subscriptionConsumer() {
|
|
|
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
|
|
|
// these three are not fatal errors, but do require redispatching
|
|
|
child.trigger <- none{}
|
|
|
- delete(w.subscriptions, child)
|
|
|
- Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", w.broker.ID(), child.topic, child.partition, err)
|
|
|
+ delete(bc.subscriptions, child)
|
|
|
+ Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", bc.broker.ID(), child.topic, child.partition, err)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionConsumer) {
|
|
|
+func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionConsumer) {
|
|
|
// take new subscriptions, and abandon subscriptions that have been closed
|
|
|
for _, child := range newSubscriptions {
|
|
|
- w.subscriptions[child] = none{}
|
|
|
- Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", w.broker.ID(), child.topic, child.partition)
|
|
|
+ bc.subscriptions[child] = none{}
|
|
|
+ Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
|
|
|
}
|
|
|
|
|
|
- for child := range w.subscriptions {
|
|
|
+ for child := range bc.subscriptions {
|
|
|
select {
|
|
|
case <-child.dying:
|
|
|
close(child.trigger)
|
|
|
- delete(w.subscriptions, child)
|
|
|
- Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", w.broker.ID(), child.topic, child.partition)
|
|
|
+ delete(bc.subscriptions, child)
|
|
|
+ Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
|
|
|
default:
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (w *brokerConsumer) abort(err error) {
|
|
|
- w.consumer.abandonBrokerConsumer(w)
|
|
|
- _ = w.broker.Close() // we don't care about the error this might return, we already have one
|
|
|
+func (bc *brokerConsumer) abort(err error) {
|
|
|
+ bc.consumer.abandonBrokerConsumer(bc)
|
|
|
+ _ = bc.broker.Close() // we don't care about the error this might return, we already have one
|
|
|
|
|
|
- for child := range w.subscriptions {
|
|
|
+ for child := range bc.subscriptions {
|
|
|
child.sendError(err)
|
|
|
child.trigger <- none{}
|
|
|
}
|
|
|
|
|
|
- for newSubscription := range w.newSubscriptions {
|
|
|
+ for newSubscription := range bc.newSubscriptions {
|
|
|
for _, child := range newSubscription {
|
|
|
child.sendError(err)
|
|
|
child.trigger <- none{}
|
|
|
@@ -600,15 +607,15 @@ func (w *brokerConsumer) abort(err error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (w *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
|
|
|
+func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
|
|
|
request := &FetchRequest{
|
|
|
- MinBytes: w.consumer.conf.Consumer.Fetch.Min,
|
|
|
- MaxWaitTime: int32(w.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
|
|
|
+ MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
|
|
|
+ MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
|
|
|
}
|
|
|
|
|
|
- for child := range w.subscriptions {
|
|
|
+ for child := range bc.subscriptions {
|
|
|
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
|
|
|
}
|
|
|
|
|
|
- return w.broker.Fetch(request)
|
|
|
+ return bc.broker.Fetch(request)
|
|
|
}
|