Browse Source

Fix offset_manager for Kafka 0.9.0.0

Maxim Vladimirsky 10 years ago
parent
commit
1e5cc31d7c
2 changed files with 5 additions and 11 deletions
  1. 0 9
      functional_offset_manager_test.go
  2. 5 2
      offset_manager.go

+ 0 - 9
functional_offset_manager_test.go

@@ -1,16 +1,11 @@
 package sarama
 
 import (
-	"os"
 	"testing"
 )
 
 func TestFuncOffsetManager(t *testing.T) {
 	checkKafkaVersion(t, "0.8.2")
-	if os.Getenv("KAFKA_VERSION") == "0.9.0.0" {
-		t.Skip("Offset manager is broken with kafka 0.9 at the moment.")
-	}
-
 	setupFunctionalTest(t)
 	defer teardownFunctionalTest(t)
 
@@ -24,10 +19,6 @@ func TestFuncOffsetManager(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	if _, err := offsetManager.ManagePartition("does_not_exist", 123); err != ErrUnknownTopicOrPartition {
-		t.Fatal("Expected ErrUnknownTopicOrPartition when starting a partition offset manager for a partition that does not exist, got:", err)
-	}
-
 	pom1, err := offsetManager.ManagePartition("test.1", 0)
 	if err != nil {
 		t.Fatal(err)

+ 5 - 2
offset_manager.go

@@ -7,6 +7,8 @@ import (
 
 // Offset Manager
 
+const groupGenerationUndefined = -1
+
 // OffsetManager uses Kafka to store and fetch consumed partition offsets.
 type OffsetManager interface {
 	// ManagePartition creates a PartitionOffsetManager on the given topic/partition.
@@ -476,8 +478,9 @@ func (bom *brokerOffsetManager) flushToBroker() {
 
 func (bom *brokerOffsetManager) constructRequest() *OffsetCommitRequest {
 	r := &OffsetCommitRequest{
-		Version:       1,
-		ConsumerGroup: bom.parent.group,
+		Version:                 1,
+		ConsumerGroup:           bom.parent.group,
+		ConsumerGroupGeneration: groupGenerationUndefined,
 	}
 
 	for s := range bom.subscriptions {