Add a missing return, and zero out the subscriptions when we're done.
@@ -392,6 +392,7 @@ func (bom *brokerOffsetManager) flushToBroker() {
if err != nil {
bom.abort(err)
+ return
}
for s := range bom.subscriptions {
@@ -463,4 +464,6 @@ func (bom *brokerOffsetManager) abort(err error) {
s.rebalance <- none{}
+
+ bom.subscriptions = make(map[*partitionOffsetManager]none)