|
|
@@ -130,7 +130,7 @@ func (c *consumer) ConsumePartition(topic string, partition int32, offset int64)
|
|
|
errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
|
|
|
feeder: make(chan *FetchResponse, 1),
|
|
|
trigger: make(chan none, 1),
|
|
|
- dying: make(chan error, 1),
|
|
|
+ dying: make(chan none),
|
|
|
fetchSize: c.conf.Consumer.Fetch.Default,
|
|
|
}
|
|
|
|
|
|
@@ -269,8 +269,9 @@ type partitionConsumer struct {
|
|
|
messages chan *ConsumerMessage
|
|
|
errors chan *ConsumerError
|
|
|
feeder chan *FetchResponse
|
|
|
- trigger chan none
|
|
|
- dying chan error
|
|
|
+
|
|
|
+ trigger, dying chan none
|
|
|
+ dispatchReason error
|
|
|
|
|
|
fetchSize int32
|
|
|
offset int64
|
|
|
@@ -372,7 +373,7 @@ func (child *partitionConsumer) AsyncClose() {
|
|
|
// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
|
|
|
// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
|
|
|
// also just close itself)
|
|
|
- child.dying <- nil
|
|
|
+ close(child.dying)
|
|
|
}
|
|
|
|
|
|
func (child *partitionConsumer) Close() error {
|
|
|
@@ -412,11 +413,11 @@ func (child *partitionConsumer) responseFeeder() {
|
|
|
child.AsyncClose()
|
|
|
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
|
|
|
// these three are not fatal errors, but do require redispatching
|
|
|
- child.dying <- err
|
|
|
+ child.dispatchReason = err
|
|
|
default:
|
|
|
// dunno, tell the user and try redispatching
|
|
|
child.sendError(err)
|
|
|
- child.dying <- err
|
|
|
+ child.dispatchReason = err
|
|
|
}
|
|
|
|
|
|
child.broker.acks.Done()
|
|
|
@@ -602,16 +603,18 @@ func (bc *brokerConsumer) updateSubscriptionCache(newSubscriptions []*partitionC
|
|
|
|
|
|
for child := range bc.subscriptions {
|
|
|
select {
|
|
|
- case err := <-child.dying:
|
|
|
- if err == nil {
|
|
|
- Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
|
|
|
- close(child.trigger)
|
|
|
- } else {
|
|
|
- Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n", bc.broker.ID(), child.topic, child.partition, err)
|
|
|
- child.trigger <- none{}
|
|
|
- }
|
|
|
+ case <-child.dying:
|
|
|
+ Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
|
|
|
+ close(child.trigger)
|
|
|
delete(bc.subscriptions, child)
|
|
|
default:
|
|
|
+ if child.dispatchReason != nil {
|
|
|
+ Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
|
|
|
+ bc.broker.ID(), child.topic, child.partition, child.dispatchReason)
|
|
|
+ child.dispatchReason = nil
|
|
|
+ child.trigger <- none{}
|
|
|
+ delete(bc.subscriptions, child)
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|