|
|
@@ -484,7 +484,9 @@ func (bom *brokerOffsetManager) flushToBroker() {
|
|
|
|
|
|
func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
|
|
|
var r *OffsetCommitRequest
|
|
|
+ var perPartitionTimestamp int64
|
|
|
if bom.parent.conf.Consumer.Offsets.Retention == 0 {
|
|
|
+ perPartitionTimestamp = ReceiveTime
|
|
|
r = &OffsetCommitRequest{
|
|
|
Version: 1,
|
|
|
ConsumerGroup: bom.parent.group,
|
|
|
@@ -503,7 +505,7 @@ func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
|
|
|
for s := range bom.subscriptions {
|
|
|
s.lock.Lock()
|
|
|
if s.dirty {
|
|
|
- r.AddBlock(s.topic, s.partition, s.offset, ReceiveTime, s.metadata)
|
|
|
+ r.AddBlock(s.topic, s.partition, s.offset, perPartitionTimestamp, s.metadata)
|
|
|
}
|
|
|
s.lock.Unlock()
|
|
|
}
|