|
|
@@ -5,14 +5,10 @@ import (
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
-var (
|
|
|
- broker, coordinator *mockBroker
|
|
|
- testClient Client
|
|
|
- config *Config
|
|
|
-)
|
|
|
+func initOffsetManager(t *testing.T) (om OffsetManager,
|
|
|
+ testClient Client, broker, coordinator *mockBroker) {
|
|
|
|
|
|
-func initOM(t *testing.T) OffsetManager {
|
|
|
- config = NewConfig()
|
|
|
+ config := NewConfig()
|
|
|
config.Metadata.Retry.Max = 1
|
|
|
config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
|
|
|
|
|
|
@@ -37,16 +33,16 @@ func initOM(t *testing.T) OffsetManager {
|
|
|
CoordinatorPort: coordinator.Port(),
|
|
|
})
|
|
|
|
|
|
- om, err := NewOffsetManagerFromClient("group", testClient)
|
|
|
+ om, err = NewOffsetManagerFromClient("group", testClient)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
|
|
|
- return om
|
|
|
+ return om, testClient, broker, coordinator
|
|
|
}
|
|
|
|
|
|
-func initPOM(t *testing.T) PartitionOffsetManager {
|
|
|
- om := initOM(t)
|
|
|
+func initPartitionOffsetManager(t *testing.T, om OffsetManager,
|
|
|
+ coordinator *mockBroker) PartitionOffsetManager {
|
|
|
|
|
|
fetchResponse := new(OffsetFetchResponse)
|
|
|
fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
|
|
|
@@ -78,7 +74,7 @@ func TestNewOffsetManager(t *testing.T) {
|
|
|
t.Error(err)
|
|
|
}
|
|
|
|
|
|
- testClient.Close()
|
|
|
+ safeClose(t, testClient)
|
|
|
|
|
|
_, err = NewOffsetManagerFromClient("group", testClient)
|
|
|
if err != ErrClosedClient {
|
|
|
@@ -90,8 +86,8 @@ func TestNewOffsetManager(t *testing.T) {
|
|
|
|
|
|
// Test recovery from ErrNotCoordinatorForConsumer
|
|
|
// on first fetchInitialOffset call
|
|
|
-func TestFetchInitialFail(t *testing.T) {
|
|
|
- om := initOM(t)
|
|
|
+func TestOffsetManagerFetchInitialFail(t *testing.T) {
|
|
|
+ om, testClient, broker, coordinator := initOffsetManager(t)
|
|
|
|
|
|
// Error on first fetchInitialOffset call
|
|
|
responseBlock := OffsetFetchResponseBlock{
|
|
|
@@ -127,16 +123,13 @@ func TestFetchInitialFail(t *testing.T) {
|
|
|
broker.Close()
|
|
|
coordinator.Close()
|
|
|
newCoordinator.Close()
|
|
|
- pom.Close()
|
|
|
- testClient.Close()
|
|
|
-
|
|
|
- // Wait to see what the brokerOffsetManager does
|
|
|
- time.Sleep(500 * time.Millisecond)
|
|
|
+ safeClose(t, pom)
|
|
|
+ safeClose(t, testClient)
|
|
|
}
|
|
|
|
|
|
// Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
|
|
|
-func TestFetchInitialInProgress(t *testing.T) {
|
|
|
- om := initOM(t)
|
|
|
+func TestOffsetManagerFetchInitialLoadInProgress(t *testing.T) {
|
|
|
+ om, testClient, broker, coordinator := initOffsetManager(t)
|
|
|
|
|
|
// Error on first fetchInitialOffset call
|
|
|
responseBlock := OffsetFetchResponseBlock{
|
|
|
@@ -163,12 +156,14 @@ func TestFetchInitialInProgress(t *testing.T) {
|
|
|
|
|
|
broker.Close()
|
|
|
coordinator.Close()
|
|
|
- pom.Close()
|
|
|
- testClient.Close()
|
|
|
+ safeClose(t, pom)
|
|
|
+ safeClose(t, testClient)
|
|
|
}
|
|
|
|
|
|
-func TestOffset(t *testing.T) {
|
|
|
- pom := initPOM(t)
|
|
|
+func TestPartitionOffsetManagerOffset(t *testing.T) {
|
|
|
+ om, testClient, broker, coordinator := initOffsetManager(t)
|
|
|
+ pom := initPartitionOffsetManager(t, om, coordinator)
|
|
|
+
|
|
|
offset, meta := pom.Offset()
|
|
|
if offset != 5 {
|
|
|
t.Errorf("Expected offset 5. Actual: %v", offset)
|
|
|
@@ -177,14 +172,15 @@ func TestOffset(t *testing.T) {
|
|
|
t.Errorf("Expected metadata \"test_meta\". Actual: %q", meta)
|
|
|
}
|
|
|
|
|
|
- pom.Close()
|
|
|
+ safeClose(t, pom)
|
|
|
broker.Close()
|
|
|
coordinator.Close()
|
|
|
- testClient.Close()
|
|
|
+ safeClose(t, testClient)
|
|
|
}
|
|
|
|
|
|
-func TestSetOffset(t *testing.T) {
|
|
|
- pom := initPOM(t)
|
|
|
+func TestPartitionOffsetManagerSetOffset(t *testing.T) {
|
|
|
+ om, testClient, broker, coordinator := initOffsetManager(t)
|
|
|
+ pom := initPartitionOffsetManager(t, om, coordinator)
|
|
|
|
|
|
ocResponse := new(OffsetCommitResponse)
|
|
|
ocResponse.AddError("my_topic", 0, ErrNoError)
|
|
|
@@ -200,14 +196,15 @@ func TestSetOffset(t *testing.T) {
|
|
|
t.Errorf("Expected metadata \"modified_meta\". Actual: %q", meta)
|
|
|
}
|
|
|
|
|
|
- pom.Close()
|
|
|
- testClient.Close()
|
|
|
+ safeClose(t, pom)
|
|
|
+ safeClose(t, testClient)
|
|
|
broker.Close()
|
|
|
coordinator.Close()
|
|
|
}
|
|
|
|
|
|
-func TestCommitErr(t *testing.T) {
|
|
|
- pom := initPOM(t)
|
|
|
+func TestPartitionOffsetManagerCommitErr(t *testing.T) {
|
|
|
+ om, testClient, broker, coordinator := initOffsetManager(t)
|
|
|
+ pom := initPartitionOffsetManager(t, om, coordinator)
|
|
|
|
|
|
// Error on one partition
|
|
|
ocResponse := new(OffsetCommitResponse)
|
|
|
@@ -274,12 +271,13 @@ func TestCommitErr(t *testing.T) {
|
|
|
broker.Close()
|
|
|
coordinator.Close()
|
|
|
newCoordinator.Close()
|
|
|
- testClient.Close()
|
|
|
+ safeClose(t, testClient)
|
|
|
}
|
|
|
|
|
|
// Test of recovery from abort
|
|
|
-func TestBOMAbort(t *testing.T) {
|
|
|
- pom := initPOM(t)
|
|
|
+func TestAbortPartitionOffsetManager(t *testing.T) {
|
|
|
+ om, testClient, broker, coordinator := initOffsetManager(t)
|
|
|
+ pom := initPartitionOffsetManager(t, om, coordinator)
|
|
|
|
|
|
// this triggers an error in the CommitOffset request,
|
|
|
// which leads to the abort call
|
|
|
@@ -299,7 +297,7 @@ func TestBOMAbort(t *testing.T) {
|
|
|
|
|
|
pom.SetOffset(100, "modified_meta")
|
|
|
|
|
|
- pom.Close()
|
|
|
+ safeClose(t, pom)
|
|
|
broker.Close()
|
|
|
- testClient.Close()
|
|
|
+ safeClose(t, testClient)
|
|
|
}
|