|
|
@@ -457,7 +457,7 @@ func (bom *brokerOffsetManager) flushToBroker() {
|
|
|
case ErrNoError:
|
|
|
block := request.blocks[s.topic][s.partition]
|
|
|
s.updateCommitted(block.offset, block.metadata)
|
|
|
- case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
|
|
|
+ case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
|
|
|
ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
|
|
|
// not a critical error, we just need to redispatch
|
|
|
delete(bom.subscriptions, s)
|
|
|
@@ -468,6 +468,12 @@ func (bom *brokerOffsetManager) flushToBroker() {
|
|
|
case ErrOffsetsLoadInProgress:
|
|
|
// nothing wrong but we didn't commit, we'll get it next time round
|
|
|
break
|
|
|
+ case ErrUnknownTopicOrPartition:
|
|
|
+ // let the user know *and* try redispatching - if topic-auto-create is
|
|
|
+ // enabled, redispatching should trigger a metadata request and create the
|
|
|
+ // topic; if not then re-dispatching won't help, but we've let the user
|
|
|
+ // know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706)
|
|
|
+ fallthrough
|
|
|
default:
|
|
|
// dunno, tell the user and try redispatching
|
|
|
s.handleError(err)
|