|
|
@@ -9,11 +9,12 @@ import (
|
|
|
var (
|
|
|
broker, coordinator *mockBroker
|
|
|
testClient Client
|
|
|
+ config *Config
|
|
|
)
|
|
|
|
|
|
func initOM(t *testing.T) OffsetManager {
|
|
|
- config := NewConfig()
|
|
|
- config.Metadata.Retry.Max = 0
|
|
|
+ config = NewConfig()
|
|
|
+ config.Metadata.Retry.Max = 1
|
|
|
config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
|
|
|
|
|
|
broker = newMockBroker(t, 1)
|
|
|
@@ -87,19 +88,67 @@ func TestNewOffsetManager(t *testing.T) {
|
|
|
seedBroker.Close()
|
|
|
}
|
|
|
|
|
|
+// Test recovery from ErrNotCoordinatorForConsumer
|
|
|
+// on first fetchInitialOffset call
|
|
|
func TestFetchInitialFail(t *testing.T) {
|
|
|
om := initOM(t)
|
|
|
|
|
|
- fetchResponse := new(OffsetFetchResponse)
|
|
|
- fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
|
|
|
+ // Error on first fetchInitialOffset call
|
|
|
+ responseBlock := OffsetFetchResponseBlock{
|
|
|
Err: ErrNotCoordinatorForConsumer,
|
|
|
Offset: 5,
|
|
|
Metadata: "test_meta",
|
|
|
- })
|
|
|
+ }
|
|
|
+
|
|
|
+ fetchResponse := new(OffsetFetchResponse)
|
|
|
+ fetchResponse.AddBlock("my_topic", 0, &responseBlock)
|
|
|
coordinator.Returns(fetchResponse)
|
|
|
|
|
|
- _, err := om.ManagePartition("my_topic", 0)
|
|
|
- if err != ErrNotCoordinatorForConsumer {
|
|
|
+ // Refresh coordinator
|
|
|
+ newCoordinator := newMockBroker(t, 3)
|
|
|
+ refreshResponse := new(ConsumerMetadataResponse)
|
|
|
+ refreshResponse.CoordinatorID = newCoordinator.BrokerID()
|
|
|
+ refreshResponse.CoordinatorHost = "127.0.0.1"
|
|
|
+ refreshResponse.CoordinatorPort = newCoordinator.Port()
|
|
|
+ broker.Returns(refreshResponse)
|
|
|
+
|
|
|
+ // Second fetchInitialOffset call is fine
|
|
|
+ fetchResponse2 := new(OffsetFetchResponse)
|
|
|
+ responseBlock2 := responseBlock
|
|
|
+ responseBlock2.Err = ErrNoError
|
|
|
+ fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
|
|
|
+ newCoordinator.Returns(fetchResponse2)
|
|
|
+
|
|
|
+ if _, err := om.ManagePartition("my_topic", 0); err != nil {
|
|
|
+ t.Error(err)
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// Test fetchInitialOffset retry on ErrOffsetsLoadInProgress
|
|
|
+func TestFetchInitialInProgress(t *testing.T) {
|
|
|
+ om := initOM(t)
|
|
|
+ // Remove once fix is merged in
|
|
|
+ config.Consumer.Offsets.CommitInterval = 10 * time.Second
|
|
|
+
|
|
|
+ // Error on first fetchInitialOffset call
|
|
|
+ responseBlock := OffsetFetchResponseBlock{
|
|
|
+ Err: ErrOffsetsLoadInProgress,
|
|
|
+ Offset: 5,
|
|
|
+ Metadata: "test_meta",
|
|
|
+ }
|
|
|
+
|
|
|
+ fetchResponse := new(OffsetFetchResponse)
|
|
|
+ fetchResponse.AddBlock("my_topic", 0, &responseBlock)
|
|
|
+ coordinator.Returns(fetchResponse)
|
|
|
+
|
|
|
+ // Second fetchInitialOffset call is fine
|
|
|
+ fetchResponse2 := new(OffsetFetchResponse)
|
|
|
+ responseBlock2 := responseBlock
|
|
|
+ responseBlock2.Err = ErrNoError
|
|
|
+ fetchResponse2.AddBlock("my_topic", 0, &responseBlock2)
|
|
|
+ coordinator.Returns(fetchResponse2)
|
|
|
+
|
|
|
+ if _, err := om.ManagePartition("my_topic", 0); err != nil {
|
|
|
t.Error(err)
|
|
|
}
|
|
|
|
|
|
@@ -144,14 +193,12 @@ func TestSetOffset(t *testing.T) {
|
|
|
coordinator.Close()
|
|
|
}
|
|
|
|
|
|
-// This test is not passing
|
|
|
func TestCommitErr(t *testing.T) {
|
|
|
log.Println("TestCommitErr")
|
|
|
log.Println("=============")
|
|
|
pom := initPOM(t)
|
|
|
|
|
|
ocResponse := new(OffsetCommitResponse)
|
|
|
- // ocResponse.Errors
|
|
|
ocResponse.AddError("my_topic", 0, ErrOffsetOutOfRange)
|
|
|
ocResponse.AddError("my_topic", 1, ErrNoError)
|
|
|
coordinator.Returns(ocResponse)
|