|
@@ -115,10 +115,10 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if leader, err := c.client.Leader(child.topic, child.partition); err != nil {
|
|
|
|
|
|
|
+ var leader *Broker
|
|
|
|
|
+ var err error
|
|
|
|
|
+ if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
|
|
|
return nil, err
|
|
return nil, err
|
|
|
- } else {
|
|
|
|
|
- child.broker = leader
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if err := c.addChild(child); err != nil {
|
|
if err := c.addChild(child); err != nil {
|
|
@@ -127,8 +127,8 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
|
|
|
|
|
|
|
|
go withRecover(child.dispatcher)
|
|
go withRecover(child.dispatcher)
|
|
|
|
|
|
|
|
- brokerWorker := c.refBrokerConsumer(child.broker)
|
|
|
|
|
- brokerWorker.input <- child
|
|
|
|
|
|
|
+ child.broker = c.refBrokerConsumer(leader)
|
|
|
|
|
+ child.broker.input <- child
|
|
|
|
|
|
|
|
return child, nil
|
|
return child, nil
|
|
|
}
|
|
}
|
|
@@ -171,31 +171,39 @@ func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
|
|
|
newSubscriptions: make(chan []*partitionConsumer),
|
|
newSubscriptions: make(chan []*partitionConsumer),
|
|
|
wait: make(chan none),
|
|
wait: make(chan none),
|
|
|
subscriptions: make(map[*partitionConsumer]none),
|
|
subscriptions: make(map[*partitionConsumer]none),
|
|
|
- refs: 1,
|
|
|
|
|
|
|
+ refs: 0,
|
|
|
}
|
|
}
|
|
|
go withRecover(brokerWorker.subscriptionManager)
|
|
go withRecover(brokerWorker.subscriptionManager)
|
|
|
go withRecover(brokerWorker.subscriptionConsumer)
|
|
go withRecover(brokerWorker.subscriptionConsumer)
|
|
|
c.brokerConsumers[broker] = brokerWorker
|
|
c.brokerConsumers[broker] = brokerWorker
|
|
|
- } else {
|
|
|
|
|
- brokerWorker.refs++
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ brokerWorker.refs++
|
|
|
|
|
+
|
|
|
return brokerWorker
|
|
return brokerWorker
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (c *consumer) unrefBrokerConsumer(broker *Broker) {
|
|
|
|
|
|
|
+func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
|
|
|
c.lock.Lock()
|
|
c.lock.Lock()
|
|
|
defer c.lock.Unlock()
|
|
defer c.lock.Unlock()
|
|
|
|
|
|
|
|
- brokerWorker := c.brokerConsumers[broker]
|
|
|
|
|
brokerWorker.refs--
|
|
brokerWorker.refs--
|
|
|
|
|
|
|
|
if brokerWorker.refs == 0 {
|
|
if brokerWorker.refs == 0 {
|
|
|
close(brokerWorker.input)
|
|
close(brokerWorker.input)
|
|
|
- delete(c.brokerConsumers, broker)
|
|
|
|
|
|
|
+ if c.brokerConsumers[brokerWorker.broker] == brokerWorker {
|
|
|
|
|
+ delete(c.brokerConsumers, brokerWorker.broker)
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
|
|
|
|
|
+ c.lock.Lock()
|
|
|
|
|
+ defer c.lock.Unlock()
|
|
|
|
|
+
|
|
|
|
|
+ delete(c.brokerConsumers, brokerWorker.broker)
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// PartitionConsumer
|
|
// PartitionConsumer
|
|
|
|
|
|
|
|
// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close()
|
|
// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call Close()
|
|
@@ -237,7 +245,7 @@ type partitionConsumer struct {
|
|
|
topic string
|
|
topic string
|
|
|
partition int32
|
|
partition int32
|
|
|
|
|
|
|
|
- broker *Broker
|
|
|
|
|
|
|
+ broker *brokerConsumer
|
|
|
messages chan *ConsumerMessage
|
|
messages chan *ConsumerMessage
|
|
|
errors chan *ConsumerError
|
|
errors chan *ConsumerError
|
|
|
trigger, dying chan none
|
|
trigger, dying chan none
|
|
@@ -291,15 +299,15 @@ func (child *partitionConsumer) dispatch() error {
|
|
|
return err
|
|
return err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if leader, err := child.consumer.client.Leader(child.topic, child.partition); err != nil {
|
|
|
|
|
|
|
+ var leader *Broker
|
|
|
|
|
+ var err error
|
|
|
|
|
+ if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
|
|
|
return err
|
|
return err
|
|
|
- } else {
|
|
|
|
|
- child.broker = leader
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- brokerWorker := child.consumer.refBrokerConsumer(child.broker)
|
|
|
|
|
|
|
+ child.broker = child.consumer.refBrokerConsumer(leader)
|
|
|
|
|
|
|
|
- brokerWorker.input <- child
|
|
|
|
|
|
|
+ child.broker.input <- child
|
|
|
|
|
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
@@ -463,6 +471,7 @@ func (w *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionCo
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (w *brokerConsumer) abort(err error) {
|
|
func (w *brokerConsumer) abort(err error) {
|
|
|
|
|
+ w.consumer.abandonBrokerConsumer(w)
|
|
|
_ = w.broker.Close() // we don't care about the error this might return, we already have one
|
|
_ = w.broker.Close() // we don't care about the error this might return, we already have one
|
|
|
|
|
|
|
|
for child := range w.subscriptions {
|
|
for child := range w.subscriptions {
|