Explorar o código

Merge pull request #713 from Shopify/match-upstream-offset-manager

OffsetManager: match upstream mark/next behaviour
Evan Huus %!s(int64=9) %!d(string=hai) anos
pai
achega
e8020bffa1
Modificáronse 4 ficheiros con 20 adicións e 7 borrados
  1. 9 0
      CHANGELOG.md
  2. 2 2
      functional_offset_manager_test.go
  3. 6 2
      offset_manager.go
  4. 3 3
      offset_manager_test.go

+ 9 - 0
CHANGELOG.md

@@ -8,6 +8,13 @@ features that may not be compatible with old Kafka versions. If you don't
 specify this value it will default to 0.8.2 (the minimum supported), and trying
 specify this value it will default to 0.8.2 (the minimum supported), and trying
 to use more recent features (like the offset manager) will fail with an error.
 to use more recent features (like the offset manager) will fail with an error.
 
 
+_Also:_ The offset-manager's behaviour has been changed to match the upstream
+java consumer (see [#705](https://github.com/Shopify/sarama/pull/705) and
+[#713](https://github.com/Shopify/sarama/pull/713)). If you use the
+offset-manager, please ensure that you are committing one *greater* than the
+last consumed message offset or else you may end up consuming duplicate
+messages.
+
 New Features:
 New Features:
  - Support for Kafka 0.10
  - Support for Kafka 0.10
    ([#672](https://github.com/Shopify/sarama/pull/672),
    ([#672](https://github.com/Shopify/sarama/pull/672),
@@ -35,6 +42,8 @@ Bug Fixes:
    ([#685](https://github.com/Shopify/sarama/pull/685)).
    ([#685](https://github.com/Shopify/sarama/pull/685)).
  - Fix a possible tight loop in the consumer
  - Fix a possible tight loop in the consumer
    ([#693](https://github.com/Shopify/sarama/pull/693)).
    ([#693](https://github.com/Shopify/sarama/pull/693)).
+ - Match upstream's offset-tracking behaviour
+   ([#705](https://github.com/Shopify/sarama/pull/705)).
  - Report UnknownTopicOrPartition errors from the offset manager
  - Report UnknownTopicOrPartition errors from the offset manager
    ([#706](https://github.com/Shopify/sarama/pull/706)).
    ([#706](https://github.com/Shopify/sarama/pull/706)).
  - Fix possible negative partition value from the HashPartitioner
  - Fix possible negative partition value from the HashPartitioner

+ 2 - 2
functional_offset_manager_test.go

@@ -34,8 +34,8 @@ func TestFuncOffsetManager(t *testing.T) {
 
 
 	offset, metadata := pom2.NextOffset()
 	offset, metadata := pom2.NextOffset()
 
 
-	if offset != 10+1 {
-		t.Errorf("Expected the next offset to be 11, found %d.", offset)
+	if offset != 10 {
+		t.Errorf("Expected the next offset to be 10, found %d.", offset)
 	}
 	}
 	if metadata != "test metadata" {
 	if metadata != "test metadata" {
 		t.Errorf("Expected metadata to be 'test metadata', found %s.", metadata)
 		t.Errorf("Expected metadata to be 'test metadata', found %s.", metadata)

+ 6 - 2
offset_manager.go

@@ -136,11 +136,15 @@ type PartitionOffsetManager interface {
 	// was committed for this partition yet.
 	// was committed for this partition yet.
 	NextOffset() (int64, string)
 	NextOffset() (int64, string)
 
 
-	// MarkOffset marks the provided offset as processed, alongside a metadata string
+	// MarkOffset marks the provided offset, alongside a metadata string
 	// that represents the state of the partition consumer at that point in time. The
 	// that represents the state of the partition consumer at that point in time. The
 	// metadata string can be used by another consumer to restore that state, so it
 	// metadata string can be used by another consumer to restore that state, so it
 	// can resume consumption.
 	// can resume consumption.
 	//
 	//
+	// To follow upstream conventions, you are expected to mark the offset of the
+	// next message to read, not the last message read. Thus, when calling `MarkOffset`
+	// you should typically add one to the offset of the last consumed message.
+	//
 	// Note: calling MarkOffset does not necessarily commit the offset to the backend
 	// Note: calling MarkOffset does not necessarily commit the offset to the backend
 	// store immediately for efficiency reasons, and it may never be committed if
 	// store immediately for efficiency reasons, and it may never be committed if
 	// your application crashes. This means that you may end up processing the same
 	// your application crashes. This means that you may end up processing the same
@@ -340,7 +344,7 @@ func (pom *partitionOffsetManager) NextOffset() (int64, string) {
 	defer pom.lock.Unlock()
 	defer pom.lock.Unlock()
 
 
 	if pom.offset >= 0 {
 	if pom.offset >= 0 {
-		return pom.offset + 1, pom.metadata
+		return pom.offset, pom.metadata
 	}
 	}
 
 
 	return pom.parent.conf.Consumer.Offsets.Initial, ""
 	return pom.parent.conf.Consumer.Offsets.Initial, ""

+ 3 - 3
offset_manager_test.go

@@ -190,7 +190,7 @@ func TestPartitionOffsetManagerNextOffset(t *testing.T) {
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")
 	pom := initPartitionOffsetManager(t, om, coordinator, 5, "test_meta")
 
 
 	offset, meta := pom.NextOffset()
 	offset, meta := pom.NextOffset()
-	if offset != 6 {
+	if offset != 5 {
 		t.Errorf("Expected offset 5. Actual: %v", offset)
 		t.Errorf("Expected offset 5. Actual: %v", offset)
 	}
 	}
 	if meta != "test_meta" {
 	if meta != "test_meta" {
@@ -215,7 +215,7 @@ func TestPartitionOffsetManagerMarkOffset(t *testing.T) {
 	pom.MarkOffset(100, "modified_meta")
 	pom.MarkOffset(100, "modified_meta")
 	offset, meta := pom.NextOffset()
 	offset, meta := pom.NextOffset()
 
 
-	if offset != 101 {
+	if offset != 100 {
 		t.Errorf("Expected offset 100. Actual: %v", offset)
 		t.Errorf("Expected offset 100. Actual: %v", offset)
 	}
 	}
 	if meta != "modified_meta" {
 	if meta != "modified_meta" {
@@ -252,7 +252,7 @@ func TestPartitionOffsetManagerMarkOffsetWithRetention(t *testing.T) {
 	pom.MarkOffset(100, "modified_meta")
 	pom.MarkOffset(100, "modified_meta")
 	offset, meta := pom.NextOffset()
 	offset, meta := pom.NextOffset()
 
 
-	if offset != 101 {
+	if offset != 100 {
 		t.Errorf("Expected offset 100. Actual: %v", offset)
 		t.Errorf("Expected offset 100. Actual: %v", offset)
 	}
 	}
 	if meta != "modified_meta" {
 	if meta != "modified_meta" {