Browse Source

Merge pull request #602 from iobeam/master

Add support for custom offset retention durations to offset manager
Evan Huus 9 năm trước cách đây
mục cha
commit
0040558fce
3 tập tin đã thay đổi với 62 bổ sung4 xóa
  1. 10 0
      config.go
  2. 15 4
      offset_manager.go
  3. 37 0
      offset_manager_test.go

+ 10 - 0
config.go

@@ -183,6 +183,13 @@ type Config struct {
 			// The initial offset to use if no offset was previously committed.
 			// Should be OffsetNewest or OffsetOldest. Defaults to OffsetNewest.
 			Initial int64
+
+			// The retention duration for committed offsets. If zero, disabled
+			// (in which case the `offsets.retention.minutes` option on the
+			// broker will be used).  Kafka only supports precision up to
+			// milliseconds; nanoseconds will be truncated.
+			// (default is 0: disabled).
+			Retention time.Duration
 		}
 	}
 
@@ -257,6 +264,9 @@ func (c *Config) Validate() error {
 	if c.Consumer.MaxWaitTime%time.Millisecond != 0 {
 		Logger.Println("Consumer.MaxWaitTime only supports millisecond precision; nanoseconds will be truncated.")
 	}
+	if c.Consumer.Offsets.Retention%time.Millisecond != 0 {
+		Logger.Println("Consumer.Offsets.Retention only supports millisecond precision; nanoseconds will be truncated.")
+	}
 	if c.ClientID == "sarama" {
 		Logger.Println("ClientID is the default of 'sarama', you should consider setting it to something application-specific.")
 	}

+ 15 - 4
offset_manager.go

@@ -475,10 +475,21 @@ func (bom *brokerOffsetManager) flushToBroker() {
 }
 
 func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
-	r := &OffsetCommitRequest{
-		Version:                 1,
-		ConsumerGroup:           bom.parent.group,
-		ConsumerGroupGeneration: GroupGenerationUndefined,
+	var r *OffsetCommitRequest
+	if bom.parent.conf.Consumer.Offsets.Retention == 0 {
+		r = &OffsetCommitRequest{
+			Version:                 1,
+			ConsumerGroup:           bom.parent.group,
+			ConsumerGroupGeneration: GroupGenerationUndefined,
+		}
+	} else {
+		r = &OffsetCommitRequest{
+			Version:                 2,
+			RetentionTime:           int64(bom.parent.conf.Consumer.Offsets.Retention / time.Millisecond),
+			ConsumerGroup:           bom.parent.group,
+			ConsumerGroupGeneration: GroupGenerationUndefined,
+		}
+
 	}
 
 	for s := range bom.subscriptions {

+ 37 - 0
offset_manager_test.go

@@ -228,6 +228,43 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
 	coordinator.Close()
 }
 
+func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
+	om, testClient, broker, coordinator := initOffsetManager(t)
+	testClient.Config().Consumer.Offsets.Retention = time.Hour
+
+	pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
+
+	ocResponse := new(OffsetCommitResponse)
+	ocResponse.AddError("my_topic", 0, ErrNoError)
+	handler := func(req *request) (res encoder) {
+		if req.body.version() != 2 {
+			t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
+		}
+		offsetCommitRequest := req.body.(*OffsetCommitRequest)
+		if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
+			t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
+		}
+		return ocResponse
+	}
+	coordinator.setHandler(handler)
+
+	pom.MarkOffset(100, "modified_meta")
+	offset, meta := pom.NextOffset()
+
+	if offset != 101 {
+		t.Errorf("Expected offset 100. Actual: %v", offset)
+	}
+	if meta != "modified_meta" {
+		t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
+	}
+
+	safeClose(t, pom)
+	safeClose(t, om)
+	safeClose(t, testClient)
+	broker.Close()
+	coordinator.Close()
+}
+
 func TestPartitionOffsetManagerCommitErr(t *testing.T) {
 	om, testClient, broker, coordinator := initOffsetManager(t)
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "meta")