|
|
@@ -384,6 +384,10 @@ func (bom *brokerOffsetManager) mainLoop() {
|
|
|
|
|
|
func (bom *brokerOffsetManager) flushToBroker() {
|
|
|
request := bom.constructRequest()
|
|
|
+ if request == nil {
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
response, err := bom.broker.CommitOffset(request)
|
|
|
|
|
|
if err != nil {
|
|
|
@@ -428,12 +432,20 @@ func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
|
|
|
Version: 1,
|
|
|
ConsumerGroup: bom.parent.group,
|
|
|
}
|
|
|
+
|
|
|
for s := range bom.subscriptions {
|
|
|
s.lock.Lock()
|
|
|
- r.AddBlock(s.topic, s.partition, s.offset, 0, s.metadata)
|
|
|
+ if s.dirty {
|
|
|
+ r.AddBlock(s.topic, s.partition, s.offset, 0, s.metadata)
|
|
|
+ }
|
|
|
s.lock.Unlock()
|
|
|
}
|
|
|
- return r
|
|
|
+
|
|
|
+ if len(r.blocks) > 0 {
|
|
|
+ return r
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
func (bom *brokerOffsetManager) abort(err error) {
|