|
|
@@ -441,15 +441,17 @@ func (w *brokerConsumer) subscriptionConsumer() {
|
|
|
}
|
|
|
|
|
|
for child := range w.subscriptions {
|
|
|
- block := response.GetBlock(child.topic, child.partition)
|
|
|
- if block == nil {
|
|
|
- child.sendError(ErrIncompleteResponse)
|
|
|
- child.trigger <- none{}
|
|
|
- delete(w.subscriptions, child)
|
|
|
- continue
|
|
|
+ if err := w.handleResponse(child, response); err != nil {
|
|
|
+ switch err {
|
|
|
+ default:
|
|
|
+ child.sendError(err)
|
|
|
+ fallthrough
|
|
|
+ case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
|
|
|
+ // these three are not fatal errors, but do require redispatching
|
|
|
+ child.trigger <- none{}
|
|
|
+ delete(w.subscriptions, child)
|
|
|
+ }
|
|
|
}
|
|
|
-
|
|
|
- w.handleResponse(child, block)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
@@ -500,18 +502,14 @@ func (w *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
|
|
|
return w.broker.Fetch(request)
|
|
|
}
|
|
|
|
|
|
-func (w *brokerConsumer) handleResponse(child *partitionConsumer, block *FetchResponseBlock) {
|
|
|
- switch block.Err {
|
|
|
- case ErrNoError:
|
|
|
- break
|
|
|
- default:
|
|
|
- child.sendError(block.Err)
|
|
|
- fallthrough
|
|
|
- case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
|
|
|
- // doesn't belong to us, redispatch it
|
|
|
- child.trigger <- none{}
|
|
|
- delete(w.subscriptions, child)
|
|
|
- return
|
|
|
+func (w *brokerConsumer) handleResponse(child *partitionConsumer, response *FetchResponse) error {
|
|
|
+ block := response.GetBlock(child.topic, child.partition)
|
|
|
+ if block == nil {
|
|
|
+ return ErrIncompleteResponse
|
|
|
+ }
|
|
|
+
|
|
|
+ if block.Err != ErrNoError {
|
|
|
+ return block.Err
|
|
|
}
|
|
|
|
|
|
if len(block.MsgSet.Messages) == 0 {
|
|
|
@@ -530,7 +528,7 @@ func (w *brokerConsumer) handleResponse(child *partitionConsumer, block *FetchRe
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- return
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
// we got messages, reset our fetch size in case it was increased for a previous request
|
|
|
@@ -565,8 +563,7 @@ func (w *brokerConsumer) handleResponse(child *partitionConsumer, block *FetchRe
|
|
|
}
|
|
|
|
|
|
if incomplete || !atLeastOne {
|
|
|
- child.sendError(ErrIncompleteResponse)
|
|
|
- child.trigger <- none{}
|
|
|
- delete(w.subscriptions, child)
|
|
|
+ return ErrIncompleteResponse
|
|
|
}
|
|
|
+ return nil
|
|
|
}
|