Browse Source

Merge pull request #651 from Shopify/fix-doubled-timestamp-log

Don't try to send partition timestamps in v2 OCRs
Evan Huus 9 years ago
parent
commit
4e901c1ab8
1 changed files with 3 additions and 1 deletions
  1. 3 1
      offset_manager.go

+ 3 - 1
offset_manager.go

@@ -484,7 +484,9 @@ func (bom *brokerOffsetManager) flushToBroker() {
 
 
 func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
 func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
 	var r *OffsetCommitRequest
 	var r *OffsetCommitRequest
+	var perPartitionTimestamp int64
 	if bom.parent.conf.Consumer.Offsets.Retention == 0 {
 	if bom.parent.conf.Consumer.Offsets.Retention == 0 {
+		perPartitionTimestamp = ReceiveTime
 		r = &OffsetCommitRequest{
 		r = &OffsetCommitRequest{
 			Version:                 1,
 			Version:                 1,
 			ConsumerGroup:           bom.parent.group,
 			ConsumerGroup:           bom.parent.group,
@@ -503,7 +505,7 @@ func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
 	for s := range bom.subscriptions {
 	for s := range bom.subscriptions {
 		s.lock.Lock()
 		s.lock.Lock()
 		if s.dirty {
 		if s.dirty {
-			r.AddBlock(s.topic, s.partition, s.offset, ReceiveTime, s.metadata)
+			r.AddBlock(s.topic, s.partition, s.offset, perPartitionTimestamp, s.metadata)
 		}
 		}
 		s.lock.Unlock()
 		s.lock.Unlock()
 	}
 	}