|
|
@@ -462,11 +462,19 @@ func (bom *brokerOffsetManager) flushToBroker() {
|
|
|
case ErrNoError:
|
|
|
block := request.blocks[s.topic][s.partition]
|
|
|
s.updateCommitted(block.offset, block.metadata)
|
|
|
- break
|
|
|
- case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
|
|
|
+ case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
|
|
|
+ ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
|
|
|
+ // not a critical error, we just need to redispatch
|
|
|
delete(bom.subscriptions, s)
|
|
|
s.rebalance <- none{}
|
|
|
+ case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
|
|
|
+ // nothing we can do about this, just tell the user and carry on
|
|
|
+ s.handleError(err)
|
|
|
+ case ErrOffsetsLoadInProgress:
|
|
|
+ // nothing wrong but we didn't commit, we'll get it next time round
|
|
|
+ break
|
|
|
default:
|
|
|
+ // dunno, tell the user and try redispatching
|
|
|
s.handleError(err)
|
|
|
delete(bom.subscriptions, s)
|
|
|
s.rebalance <- none{}
|