浏览代码

adds ResetOffset to reset to earlier offset values. #554

Felix Geller 8 年之前
父节点
当前提交
b966238f31
共有 2 个文件被更改,包括 82 次插入0 次删除
  1. 18 0
      offset_manager.go
  2. 64 0
      offset_manager_test.go

+ 18 - 0
offset_manager.go

@@ -151,6 +151,13 @@ type PartitionOffsetManager interface {
 	// message twice, and your processing should ideally be idempotent.
 	MarkOffset(offset int64, metadata string)
 
+	// ResetOffset resets to the provided offset, alongside a metadata string that
+	// represents the state of the partition consumer at that point in time. Reset
+	// acts as a counterpart to MarkOffset, the difference being that it allows to
+	// reset an offset to an earlier or smaller value, where MarkOffset only
+	// allows incrementing the offset. cf MarkOffset for more details.
+	ResetOffset(offset int64, metadata string)
+
 	// Errors returns a read channel of errors that occur during offset management, if
 	// enabled. By default, errors are logged and not returned over this channel. If
 	// you want to implement any custom error handling, set your config's
@@ -329,6 +336,17 @@ func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
 	}
 }
 
+func (pom *partitionOffsetManager) ResetOffset(offset int64, metadata string) {
+	pom.lock.Lock()
+	defer pom.lock.Unlock()
+
+	if offset < pom.offset {
+		pom.offset = offset
+		pom.metadata = metadata
+		pom.dirty = true
+	}
+}
+
 func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
 	pom.lock.Lock()
 	defer pom.lock.Unlock()

+ 64 - 0
offset_manager_test.go

@@ -204,6 +204,70 @@ func TestPartitionOffsetManagerNextOffset(t *testing.T) {
 	safeClose(t, testClient)
 }
 
+func TestPartitionOffsetManagerResetOffset(t *testing.T) {
+	om, testClient, broker, coordinator := initOffsetManager(t)
+	pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
+
+	ocResponse := new(OffsetCommitResponse)
+	ocResponse.AddError("my_topic", 0, ErrNoError)
+	coordinator.Returns(ocResponse)
+
+	expected := int64(1)
+	pom.ResetOffset(expected, "modified_meta")
+	actual, meta := pom.NextOffset()
+
+	if actual != expected {
+		t.Errorf("Expected offset %v. Actual: %v", expected, actual)
+	}
+	if meta != "modified_meta" {
+		t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
+	}
+
+	safeClose(t, pom)
+	safeClose(t, om)
+	safeClose(t, testClient)
+	broker.Close()
+	coordinator.Close()
+}
+
+func TestPartitionOffsetManagerResetOffsetWithRetention(t *testing.T) {
+	om, testClient, broker, coordinator := initOffsetManager(t)
+	testClient.Config().Consumer.Offsets.Retention = time.Hour
+
+	pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")
+
+	ocResponse := new(OffsetCommitResponse)
+	ocResponse.AddError("my_topic", 0, ErrNoError)
+	handler := func(req *request) (res encoder) {
+		if req.body.version() != 2 {
+			t.Errorf("Expected to be using version 2. Actual: %v", req.body.version())
+		}
+		offsetCommitRequest := req.body.(*OffsetCommitRequest)
+		if offsetCommitRequest.RetentionTime != (60 * 60 * 1000) {
+			t.Errorf("Expected an hour retention time. Actual: %v", offsetCommitRequest.RetentionTime)
+		}
+		return ocResponse
+	}
+	coordinator.setHandler(handler)
+
+	expected := int64(1)
+	pom.ResetOffset(expected, "modified_meta")
+	actual, meta := pom.NextOffset()
+
+	if actual != expected {
+		t.Errorf("Expected offset %v. Actual: %v", expected, actual)
+	}
+	if meta != "modified_meta" {
+		t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
+	}
+
+	safeClose(t, pom)
+	safeClose(t, om)
+	safeClose(t, testClient)
+	broker.Close()
+	coordinator.Close()
+}
+
 func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
 	om, testClient, broker, coordinator := initOffsetManager(t)
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "original_meta")