Bläddra i källkod

Test to covert abort process.

Includes small fix to offset manager -- return to avoid
checking nil response.
Aaron Kavlie 10 år sedan
förälder
incheckning
147e2fe87e
1 ändrade filer med 32 tillägg och 3 borttagningar
  1. 32 3
      offset_manager_test.go

+ 32 - 3
offset_manager_test.go

@@ -73,7 +73,7 @@ func TestNewOffsetManager(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	_, err = NewOffsetManagerFromClient("grouop", testClient)
+	_, err = NewOffsetManagerFromClient("group", testClient)
 	if err != nil {
 		t.Error(err)
 	}
@@ -127,8 +127,6 @@ func TestFetchInitialFail(t *testing.T) {
 // Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
 func TestFetchInitialInProgress(t *testing.T) {
 	om := initOM(t)
-	// Remove once fix is merged in
-	config.Consumer.Offsets.CommitInterval = 10 * time.Second
 
 	// Error on first fetchInitialOffset call
 	responseBlock := OffsetFetchResponseBlock{
@@ -230,3 +228,34 @@ func TestCommitErr(t *testing.T) {
 		t.Error(err)
 	}
 }
+
+// Test of recovery from abort
+func TestBOMAbort(t *testing.T) {
+	pom := initPOM(t)
+	coordinator.Close()
+
+	// Refresh coordinator
+	newCoordinator := newMockBroker(t, 3)
+	refreshResponse := new(ConsumerMetadataResponse)
+	refreshResponse.CoordinatorID = newCoordinator.BrokerID()
+	refreshResponse.CoordinatorHost = "127.0.0.1"
+	refreshResponse.CoordinatorPort = newCoordinator.Port()
+	broker.Returns(refreshResponse)
+
+	ocResponse := new(OffsetCommitResponse)
+	ocResponse.AddError("my_topic", 0, ErrNoError)
+	newCoordinator.Returns(ocResponse)
+
+	pom.SetOffset(100, "modified_meta")
+	offset, meta := pom.Offset()
+
+	if offset != 100 {
+		t.Errorf("Expected offset 100. Actual: %v", offset)
+	}
+	if meta != "modified_meta" {
+		t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
+	}
+
+	pom.Close()
+	testClient.Close()
+}