|
|
@@ -136,11 +136,15 @@ type PartitionOffsetManager interface {
|
|
|
// was committed for this partition yet.
|
|
|
NextOffset() (int64, string)
|
|
|
|
|
|
- // MarkOffset marks the provided offset as processed, alongside a metadata string
|
|
|
+ // MarkOffset marks the provided offset, alongside a metadata string
|
|
|
// that represents the state of the partition consumer at that point in time. The
|
|
|
// metadata string can be used by another consumer to restore that state, so it
|
|
|
// can resume consumption.
|
|
|
//
|
|
|
+ // To follow upstream conventions, you are expected to mark the offset of the
|
|
|
+ // next message to read, not the last message read. Thus, when calling `MarkOffset`
|
|
|
+ // you should typically add one to the offset of the last consumed message.
|
|
|
+ //
|
|
|
// Note: calling MarkOffset does not necessarily commit the offset to the backend
|
|
|
// store immediately for efficiency reasons, and it may never be committed if
|
|
|
// your application crashes. This means that you may end up processing the same
|
|
|
@@ -340,7 +344,7 @@ func (pom *partitionOffsetManager) NextOffset() (int64, string) {
|
|
|
defer pom.lock.Unlock()
|
|
|
|
|
|
if pom.offset >= 0 {
|
|
|
- return pom.offset + 1, pom.metadata
|
|
|
+ return pom.offset, pom.metadata
|
|
|
}
|
|
|
|
|
|
return pom.parent.conf.Consumer.Offsets.Initial, ""
|