|
@@ -168,6 +168,8 @@ func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ pom.broker.updateSubscriptions <- pom
|
|
|
|
|
+
|
|
|
go withRecover(pom.mainLoop)
|
|
go withRecover(pom.mainLoop)
|
|
|
|
|
|
|
|
return pom, nil
|
|
return pom, nil
|
|
@@ -180,6 +182,8 @@ func (pom *partitionOffsetManager) mainLoop() {
|
|
|
if err := pom.selectBroker(); err != nil {
|
|
if err := pom.selectBroker(); err != nil {
|
|
|
pom.handleError(err)
|
|
pom.handleError(err)
|
|
|
pom.rebalance <- none{}
|
|
pom.rebalance <- none{}
|
|
|
|
|
+ } else {
|
|
|
|
|
+ pom.broker.updateSubscriptions <- pom
|
|
|
}
|
|
}
|
|
|
case <-pom.dying:
|
|
case <-pom.dying:
|
|
|
if pom.broker != nil {
|
|
if pom.broker != nil {
|
|
@@ -213,7 +217,6 @@ func (pom *partitionOffsetManager) selectBroker() error {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
pom.broker = pom.parent.refBrokerOffsetManager(broker)
|
|
pom.broker = pom.parent.refBrokerOffsetManager(broker)
|
|
|
- pom.broker.updateSubscriptions <- pom
|
|
|
|
|
return nil
|
|
return nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -362,7 +365,9 @@ func (bom *brokerOffsetManager) mainLoop() {
|
|
|
for {
|
|
for {
|
|
|
select {
|
|
select {
|
|
|
case <-bom.timer.C:
|
|
case <-bom.timer.C:
|
|
|
- bom.flushToBroker()
|
|
|
|
|
|
|
+ if len(bom.subscriptions) > 0 {
|
|
|
|
|
+ bom.flushToBroker()
|
|
|
|
|
+ }
|
|
|
case s, ok := <-bom.updateSubscriptions:
|
|
case s, ok := <-bom.updateSubscriptions:
|
|
|
if !ok {
|
|
if !ok {
|
|
|
bom.timer.Stop()
|
|
bom.timer.Stop()
|