|
@@ -140,6 +140,7 @@ type partitionOffsetManager struct {
|
|
|
lock sync.Mutex
|
|
lock sync.Mutex
|
|
|
offset int64
|
|
offset int64
|
|
|
metadata string
|
|
metadata string
|
|
|
|
|
+ dirty bool
|
|
|
broker *brokerOffsetManager
|
|
broker *brokerOffsetManager
|
|
|
|
|
|
|
|
errors chan *ConsumerError
|
|
errors chan *ConsumerError
|
|
@@ -278,6 +279,16 @@ func (pom *partitionOffsetManager) SetOffset(offset int64, metadata string) {
|
|
|
|
|
|
|
|
pom.offset = offset
|
|
pom.offset = offset
|
|
|
pom.metadata = metadata
|
|
pom.metadata = metadata
|
|
|
|
|
+ pom.dirty = true
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
|
|
|
|
|
+ pom.lock.Lock()
|
|
|
|
|
+ defer pom.lock.Unlock()
|
|
|
|
|
+
|
|
|
|
|
+ if pom.offset == offset && pom.metadata == metadata {
|
|
|
|
|
+ pom.dirty = false
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (pom *partitionOffsetManager) Offset() (int64, string) {
|
|
func (pom *partitionOffsetManager) Offset() (int64, string) {
|
|
@@ -376,6 +387,8 @@ func (bom *brokerOffsetManager) flushToBroker() {
|
|
|
|
|
|
|
|
switch err {
|
|
switch err {
|
|
|
case ErrNoError:
|
|
case ErrNoError:
|
|
|
|
|
+ block := request.blocks[s.topic][s.partition]
|
|
|
|
|
+ s.updateCommitted(block.offset, block.metadata)
|
|
|
break
|
|
break
|
|
|
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
|
|
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable:
|
|
|
delete(bom.subscriptions, s)
|
|
delete(bom.subscriptions, s)
|