Helpful if you just want to update the metadata, or maybe in more async scenarios.
@@ -340,7 +340,7 @@ func (pom *partitionOffsetManager) ResetOffset(offset int64, metadata string) {
pom.lock.Lock()
defer pom.lock.Unlock()
- if offset < pom.offset {
+ if offset <= pom.offset {
pom.offset = offset
pom.metadata = metadata
pom.dirty = true