|
|
@@ -1,7 +1,6 @@
|
|
|
package sarama
|
|
|
|
|
|
import (
|
|
|
- "log"
|
|
|
"testing"
|
|
|
"time"
|
|
|
)
|
|
|
@@ -61,6 +60,7 @@ func initPOM(t *testing.T) PartitionOffsetManager {
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
+
|
|
|
return pom
|
|
|
}
|
|
|
|
|
|
@@ -106,11 +106,11 @@ func TestFetchInitialFail(t *testing.T) {
|
|
|
|
|
|
// Refresh coordinator
|
|
|
newCoordinator := newMockBroker(t, 3)
|
|
|
- refreshResponse := new(ConsumerMetadataResponse)
|
|
|
- refreshResponse.CoordinatorID = newCoordinator.BrokerID()
|
|
|
- refreshResponse.CoordinatorHost = "127.0.0.1"
|
|
|
- refreshResponse.CoordinatorPort = newCoordinator.Port()
|
|
|
- broker.Returns(refreshResponse)
|
|
|
+ broker.Returns(&ConsumerMetadataResponse{
|
|
|
+ CoordinatorID: newCoordinator.BrokerID(),
|
|
|
+ CoordinatorHost: "127.0.0.1",
|
|
|
+ CoordinatorPort: newCoordinator.Port(),
|
|
|
+ })
|
|
|
|
|
|
// Second fetchInitialOffset call is fine
|
|
|
fetchResponse2 := new(OffsetFetchResponse)
|
|
|
@@ -119,9 +119,19 @@ func TestFetchInitialFail(t *testing.T) {
|
|
|
fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
|
|
|
newCoordinator.Returns(fetchResponse2)
|
|
|
|
|
|
- if _, err := om.ManagePartition("my_topic", 0); err != nil {
|
|
|
+ pom, err := om.ManagePartition("my_topic", 0)
|
|
|
+ if err != nil {
|
|
|
t.Error(err)
|
|
|
}
|
|
|
+
|
|
|
+ broker.Close()
|
|
|
+ coordinator.Close()
|
|
|
+ newCoordinator.Close()
|
|
|
+ pom.Close()
|
|
|
+ testClient.Close()
|
|
|
+
|
|
|
+ // Wait to see what the brokerOffsetManager does
|
|
|
+ time.Sleep(500 * time.Millisecond)
|
|
|
}
|
|
|
|
|
|
// Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
|
|
|
@@ -146,10 +156,15 @@ func TestFetchInitialInProgress(t *testing.T) {
|
|
|
fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
|
|
|
coordinator.Returns(fetchResponse2)
|
|
|
|
|
|
- if _, err := om.ManagePartition("my_topic", 0); err != nil {
|
|
|
+ pom, err := om.ManagePartition("my_topic", 0)
|
|
|
+ if err != nil {
|
|
|
t.Error(err)
|
|
|
}
|
|
|
|
|
|
+ broker.Close()
|
|
|
+ coordinator.Close()
|
|
|
+ pom.Close()
|
|
|
+ testClient.Close()
|
|
|
}
|
|
|
|
|
|
func TestOffset(t *testing.T) {
|
|
|
@@ -163,9 +178,9 @@ func TestOffset(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
pom.Close()
|
|
|
- testClient.Close()
|
|
|
broker.Close()
|
|
|
coordinator.Close()
|
|
|
+ testClient.Close()
|
|
|
}
|
|
|
|
|
|
func TestSetOffset(t *testing.T) {
|
|
|
@@ -192,8 +207,6 @@ func TestSetOffset(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestCommitErr(t *testing.T) {
|
|
|
- log.Println("TestCommitErr")
|
|
|
- log.Println("=============")
|
|
|
pom := initPOM(t)
|
|
|
|
|
|
ocResponse := new(OffsetCommitResponse)
|
|
|
@@ -201,25 +214,18 @@ func TestCommitErr(t *testing.T) {
|
|
|
ocResponse.AddError("my_topic", 1, ErrNoError)
|
|
|
coordinator.Returns(ocResponse)
|
|
|
|
|
|
- // ocResponse2 := new(OffsetCommitResponse)
|
|
|
- // // ocResponse.Errors
|
|
|
- // ocResponse2.AddError("my_topic", 1, ErrNoError)
|
|
|
- // coordinator.Returns(ocResponse2)
|
|
|
-
|
|
|
- freshCoordinator := newMockBroker(t, 3)
|
|
|
+ newCoordinator := newMockBroker(t, 3)
|
|
|
|
|
|
// For RefreshCoordinator()
|
|
|
- coordinatorResponse3 := new(ConsumerMetadataResponse)
|
|
|
- coordinatorResponse3.CoordinatorID = freshCoordinator.BrokerID()
|
|
|
- coordinatorResponse3.CoordinatorHost = "127.0.0.1"
|
|
|
- coordinatorResponse3.CoordinatorPort = freshCoordinator.Port()
|
|
|
- broker.Returns(coordinatorResponse3)
|
|
|
+ broker.Returns(&ConsumerMetadataResponse{
|
|
|
+ CoordinatorID: newCoordinator.BrokerID(),
|
|
|
+ CoordinatorHost: "127.0.0.1",
|
|
|
+ CoordinatorPort: newCoordinator.Port(),
|
|
|
+ })
|
|
|
|
|
|
ocResponse2 := new(OffsetCommitResponse)
|
|
|
- // ocResponse.Errors
|
|
|
- // ocResponse2.AddError("my_topic", 0, ErrOffsetOutOfRange)
|
|
|
ocResponse2.AddError("my_topic", 0, ErrNoError)
|
|
|
- freshCoordinator.Returns(ocResponse2)
|
|
|
+ newCoordinator.Returns(ocResponse2)
|
|
|
|
|
|
pom.SetOffset(100, "modified_meta")
|
|
|
|
|
|
@@ -227,35 +233,36 @@ func TestCommitErr(t *testing.T) {
|
|
|
if err != nil {
|
|
|
t.Error(err)
|
|
|
}
|
|
|
+
|
|
|
+ broker.Close()
|
|
|
+ coordinator.Close()
|
|
|
+ newCoordinator.Close()
|
|
|
+ testClient.Close()
|
|
|
}
|
|
|
|
|
|
// Test of recovery from abort
|
|
|
func TestBOMAbort(t *testing.T) {
|
|
|
pom := initPOM(t)
|
|
|
+
|
|
|
+ // this triggers an error in the CommitOffset request,
|
|
|
+ // which leads to the abort call
|
|
|
coordinator.Close()
|
|
|
|
|
|
- // Refresh coordinator
|
|
|
+ // Response to refresh coordinator request
|
|
|
newCoordinator := newMockBroker(t, 3)
|
|
|
- refreshResponse := new(ConsumerMetadataResponse)
|
|
|
- refreshResponse.CoordinatorID = newCoordinator.BrokerID()
|
|
|
- refreshResponse.CoordinatorHost = "127.0.0.1"
|
|
|
- refreshResponse.CoordinatorPort = newCoordinator.Port()
|
|
|
- broker.Returns(refreshResponse)
|
|
|
+ broker.Returns(&ConsumerMetadataResponse{
|
|
|
+ CoordinatorID: newCoordinator.BrokerID(),
|
|
|
+ CoordinatorHost: "127.0.0.1",
|
|
|
+ CoordinatorPort: newCoordinator.Port(),
|
|
|
+ })
|
|
|
|
|
|
ocResponse := new(OffsetCommitResponse)
|
|
|
ocResponse.AddError("my_topic", 0, ErrNoError)
|
|
|
newCoordinator.Returns(ocResponse)
|
|
|
|
|
|
pom.SetOffset(100, "modified_meta")
|
|
|
- offset, meta := pom.Offset()
|
|
|
-
|
|
|
- if offset != 100 {
|
|
|
- t.Errorf("Expected offset 100. Actual: %v", offset)
|
|
|
- }
|
|
|
- if meta != "modified_meta" {
|
|
|
- t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
|
|
|
- }
|
|
|
|
|
|
pom.Close()
|
|
|
+ broker.Close()
|
|
|
testClient.Close()
|
|
|
}
|