Browse Source

Merge branch 'master' of https://github.com/Shopify/sarama

Shriram Rajagopalan 9 years ago
parent
commit
369d017f7b
2 changed files with 5 additions and 2 deletions
  1. 2 1
      config.go
  2. 3 1
      offset_manager.go

+ 2 - 1
config.go

@@ -208,7 +208,8 @@ type Config struct {
 			// The retention duration for committed offsets. If zero, disabled
 			// (in which case the `offsets.retention.minutes` option on the
 			// broker will be used).  Kafka only supports precision up to
-			// milliseconds; nanoseconds will be truncated.
+			// milliseconds; nanoseconds will be truncated. Requires Kafka
+			// broker version 0.9.0 or later.
 			// (default is 0: disabled).
 			Retention time.Duration
 		}

+ 3 - 1
offset_manager.go

@@ -484,7 +484,9 @@ func (bom *brokerOffsetManager) flushToBroker() {
 
 func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
 	var r *OffsetCommitRequest
+	var perPartitionTimestamp int64
 	if bom.parent.conf.Consumer.Offsets.Retention == 0 {
+		perPartitionTimestamp = ReceiveTime
 		r = &OffsetCommitRequest{
 			Version:                 1,
 			ConsumerGroup:           bom.parent.group,
@@ -503,7 +505,7 @@ func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
 	for s := range bom.subscriptions {
 		s.lock.Lock()
 		if s.dirty {
-			r.AddBlock(s.topic, s.partition, s.offset, ReceiveTime, s.metadata)
+			r.AddBlock(s.topic, s.partition, s.offset, perPartitionTimestamp, s.metadata)
 		}
 		s.lock.Unlock()
 	}