|
@@ -125,9 +125,9 @@ type Consumer struct {
|
|
|
client *Client
|
|
|
config ConsumerConfig
|
|
|
|
|
|
- lock sync.Mutex
|
|
|
- children map[string]map[int32]*PartitionConsumer
|
|
|
- workers map[*Broker]*consumerWorker
|
|
|
+ lock sync.Mutex
|
|
|
+ children map[string]map[int32]*PartitionConsumer
|
|
|
+ brokerConsumers map[*Broker]*brokerConsumer
|
|
|
}
|
|
|
|
|
|
|
|
@@ -146,10 +146,10 @@ func NewConsumer(client *Client, config *ConsumerConfig) (*Consumer, error) {
|
|
|
}
|
|
|
|
|
|
c := &Consumer{
|
|
|
- client: client,
|
|
|
- config: *config,
|
|
|
- children: make(map[string]map[int32]*PartitionConsumer),
|
|
|
- workers: make(map[*Broker]*consumerWorker),
|
|
|
+ client: client,
|
|
|
+ config: *config,
|
|
|
+ children: make(map[string]map[int32]*PartitionConsumer),
|
|
|
+ brokerConsumers: make(map[*Broker]*brokerConsumer),
|
|
|
}
|
|
|
|
|
|
return c, nil
|
|
@@ -193,8 +193,8 @@ func (c *Consumer) ConsumePartition(topic string, partition int32, config *Parti
|
|
|
|
|
|
go withRecover(child.dispatcher)
|
|
|
|
|
|
- worker := c.refWorker(child.broker)
|
|
|
- worker.input <- child
|
|
|
+ brokerWorker := c.refBrokerConsumer(child.broker)
|
|
|
+ brokerWorker.input <- child
|
|
|
|
|
|
return child, nil
|
|
|
}
|
|
@@ -224,13 +224,13 @@ func (c *Consumer) removeChild(child *PartitionConsumer) {
|
|
|
delete(c.children[child.topic], child.partition)
|
|
|
}
|
|
|
|
|
|
-func (c *Consumer) refWorker(broker *Broker) *consumerWorker {
|
|
|
+func (c *Consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
|
|
|
c.lock.Lock()
|
|
|
defer c.lock.Unlock()
|
|
|
|
|
|
- worker := c.workers[broker]
|
|
|
- if worker == nil {
|
|
|
- worker = &consumerWorker{
|
|
|
+ brokerWorker := c.brokerConsumers[broker]
|
|
|
+ if brokerWorker == nil {
|
|
|
+ brokerWorker = &brokerConsumer{
|
|
|
consumer: c,
|
|
|
broker: broker,
|
|
|
input: make(chan *PartitionConsumer),
|
|
@@ -239,26 +239,26 @@ func (c *Consumer) refWorker(broker *Broker) *consumerWorker {
|
|
|
subscriptions: make(map[*PartitionConsumer]none),
|
|
|
refs: 1,
|
|
|
}
|
|
|
- go withRecover(worker.subscriptionManager)
|
|
|
- go withRecover(worker.subscriptionConsumer)
|
|
|
- c.workers[broker] = worker
|
|
|
+ go withRecover(brokerWorker.subscriptionManager)
|
|
|
+ go withRecover(brokerWorker.subscriptionConsumer)
|
|
|
+ c.brokerConsumers[broker] = brokerWorker
|
|
|
} else {
|
|
|
- worker.refs++
|
|
|
+ brokerWorker.refs++
|
|
|
}
|
|
|
|
|
|
- return worker
|
|
|
+ return brokerWorker
|
|
|
}
|
|
|
|
|
|
-func (c *Consumer) unrefWorker(broker *Broker) {
|
|
|
+func (c *Consumer) unrefBrokerConsumer(broker *Broker) {
|
|
|
c.lock.Lock()
|
|
|
defer c.lock.Unlock()
|
|
|
|
|
|
- worker := c.workers[broker]
|
|
|
- worker.refs--
|
|
|
+ brokerWorker := c.brokerConsumers[broker]
|
|
|
+ brokerWorker.refs--
|
|
|
|
|
|
- if worker.refs == 0 {
|
|
|
- close(worker.input)
|
|
|
- delete(c.workers, broker)
|
|
|
+ if brokerWorker.refs == 0 {
|
|
|
+ close(brokerWorker.input)
|
|
|
+ delete(c.brokerConsumers, broker)
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -296,7 +296,7 @@ func (child *PartitionConsumer) dispatcher() {
|
|
|
close(child.trigger)
|
|
|
default:
|
|
|
if child.broker != nil {
|
|
|
- child.consumer.unrefWorker(child.broker)
|
|
|
+ child.consumer.unrefBrokerConsumer(child.broker)
|
|
|
child.broker = nil
|
|
|
}
|
|
|
|
|
@@ -315,7 +315,7 @@ func (child *PartitionConsumer) dispatcher() {
|
|
|
}
|
|
|
|
|
|
if child.broker != nil {
|
|
|
- child.consumer.unrefWorker(child.broker)
|
|
|
+ child.consumer.unrefBrokerConsumer(child.broker)
|
|
|
}
|
|
|
child.consumer.removeChild(child)
|
|
|
close(child.events)
|
|
@@ -332,9 +332,9 @@ func (child *PartitionConsumer) dispatch() error {
|
|
|
child.broker = leader
|
|
|
}
|
|
|
|
|
|
- worker := child.consumer.refWorker(child.broker)
|
|
|
+ brokerWorker := child.consumer.refBrokerConsumer(child.broker)
|
|
|
|
|
|
- worker.input <- child
|
|
|
+ brokerWorker.input <- child
|
|
|
|
|
|
return nil
|
|
|
}
|
|
@@ -389,9 +389,9 @@ func (child *PartitionConsumer) Close() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-
|
|
|
+
|
|
|
|
|
|
-type consumerWorker struct {
|
|
|
+type brokerConsumer struct {
|
|
|
consumer *Consumer
|
|
|
broker *Broker
|
|
|
input chan *PartitionConsumer
|
|
@@ -401,7 +401,7 @@ type consumerWorker struct {
|
|
|
refs int
|
|
|
}
|
|
|
|
|
|
-func (w *consumerWorker) subscriptionManager() {
|
|
|
+func (w *brokerConsumer) subscriptionManager() {
|
|
|
var buffer []*PartitionConsumer
|
|
|
|
|
|
|
|
@@ -441,7 +441,7 @@ done:
|
|
|
close(w.newSubscriptions)
|
|
|
}
|
|
|
|
|
|
-func (w *consumerWorker) subscriptionConsumer() {
|
|
|
+func (w *brokerConsumer) subscriptionConsumer() {
|
|
|
<-w.wait
|
|
|
|
|
|
|
|
@@ -477,7 +477,7 @@ func (w *consumerWorker) subscriptionConsumer() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (w *consumerWorker) updateSubscriptionCache(newSubscriptions []*PartitionConsumer) {
|
|
|
+func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*PartitionConsumer) {
|
|
|
|
|
|
for _, child := range newSubscriptions {
|
|
|
w.subscriptions[child] = none{}
|
|
@@ -493,7 +493,7 @@ func (w *consumerWorker) updateSubscriptionCache(newSubscriptions []*PartitionCo
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (w *consumerWorker) abort(err error) {
|
|
|
+func (w *brokerConsumer) abort(err error) {
|
|
|
_ = w.broker.Close()
|
|
|
w.consumer.client.disconnectBroker(w.broker)
|
|
|
|
|
@@ -510,7 +510,7 @@ func (w *consumerWorker) abort(err error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (w *consumerWorker) fetchNewMessages() (*FetchResponse, error) {
|
|
|
+func (w *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
|
|
|
request := &FetchRequest{
|
|
|
MinBytes: w.consumer.config.MinFetchSize,
|
|
|
MaxWaitTime: int32(w.consumer.config.MaxWaitTime / time.Millisecond),
|
|
@@ -523,7 +523,7 @@ func (w *consumerWorker) fetchNewMessages() (*FetchResponse, error) {
|
|
|
return w.broker.Fetch(w.consumer.client.id, request)
|
|
|
}
|
|
|
|
|
|
-func (w *consumerWorker) handleResponse(child *PartitionConsumer, block *FetchResponseBlock) {
|
|
|
+func (w *brokerConsumer) handleResponse(child *PartitionConsumer, block *FetchResponseBlock) {
|
|
|
switch block.Err {
|
|
|
case NoError:
|
|
|
break
|