|
@@ -11,33 +11,7 @@ var (
|
|
|
testClient Client
|
|
testClient Client
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
-func TestNewOffsetManager(t *testing.T) {
|
|
|
|
|
- seedBroker := newMockBroker(t, 1)
|
|
|
|
|
- seedBroker.Returns(new(MetadataResponse))
|
|
|
|
|
-
|
|
|
|
|
- testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Fatal(err)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- _, err = NewOffsetManagerFromClient("grouop", testClient)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Error(err)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- testClient.Close()
|
|
|
|
|
-
|
|
|
|
|
- _, err = NewOffsetManagerFromClient("group", testClient)
|
|
|
|
|
- if err != ErrClosedClient {
|
|
|
|
|
- t.Errorf("Error expected for closed client; actual value: %v", err)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- seedBroker.Close()
|
|
|
|
|
-}
|
|
|
|
|
-
|
|
|
|
|
-func TestFetchInitialFail(t *testing.T) {
|
|
|
|
|
- // Mostly copy-paste from initPOM
|
|
|
|
|
- // TODO: eliminate this repetition
|
|
|
|
|
|
|
+func initOM(t *testing.T) OffsetManager {
|
|
|
config := NewConfig()
|
|
config := NewConfig()
|
|
|
config.Metadata.Retry.Max = 0
|
|
config.Metadata.Retry.Max = 0
|
|
|
config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
|
|
config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
|
|
@@ -68,66 +42,67 @@ func TestFetchInitialFail(t *testing.T) {
|
|
|
t.Fatal(err)
|
|
t.Fatal(err)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- fetchResponse := new(OffsetFetchResponse)
|
|
|
|
|
|
|
+ return om
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func initPOM(t *testing.T) PartitionOffsetManager {
|
|
|
|
|
+ om := initOM(t)
|
|
|
|
|
|
|
|
- // Only Err below modified
|
|
|
|
|
|
|
+ fetchResponse := new(OffsetFetchResponse)
|
|
|
fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
|
|
fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
|
|
|
- Err: ErrNotCoordinatorForConsumer,
|
|
|
|
|
|
|
+ Err: ErrNoError,
|
|
|
Offset: 5,
|
|
Offset: 5,
|
|
|
Metadata: "test_meta",
|
|
Metadata: "test_meta",
|
|
|
})
|
|
})
|
|
|
coordinator.Returns(fetchResponse)
|
|
coordinator.Returns(fetchResponse)
|
|
|
|
|
|
|
|
- _, err = om.ManagePartition("my_topic", 0)
|
|
|
|
|
- if err != ErrNotCoordinatorForConsumer {
|
|
|
|
|
- t.Error(err)
|
|
|
|
|
|
|
+ pom, err := om.ManagePartition("my_topic", 0)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ t.Fatal(err)
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
|
|
+ return pom
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func initPOM(t *testing.T) PartitionOffsetManager {
|
|
|
|
|
- config := NewConfig()
|
|
|
|
|
- config.Metadata.Retry.Max = 0
|
|
|
|
|
- config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
|
|
|
|
|
-
|
|
|
|
|
- broker = newMockBroker(t, 1)
|
|
|
|
|
- coordinator = newMockBroker(t, 2)
|
|
|
|
|
-
|
|
|
|
|
- seedMeta := new(MetadataResponse)
|
|
|
|
|
- seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
|
|
|
|
|
- seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
|
|
|
|
|
- seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
|
|
|
|
|
- broker.Returns(seedMeta)
|
|
|
|
|
|
|
+func TestNewOffsetManager(t *testing.T) {
|
|
|
|
|
+ seedBroker := newMockBroker(t, 1)
|
|
|
|
|
+ seedBroker.Returns(new(MetadataResponse))
|
|
|
|
|
|
|
|
- var err error
|
|
|
|
|
- testClient, err = NewClient([]string{broker.Addr()}, config)
|
|
|
|
|
|
|
+ testClient, err := NewClient([]string{seedBroker.Addr()}, nil)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
t.Fatal(err)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- broker.Returns(&ConsumerMetadataResponse{
|
|
|
|
|
- CoordinatorID: coordinator.BrokerID(),
|
|
|
|
|
- CoordinatorHost: "127.0.0.1",
|
|
|
|
|
- CoordinatorPort: coordinator.Port(),
|
|
|
|
|
- })
|
|
|
|
|
-
|
|
|
|
|
- om, err := NewOffsetManagerFromClient("group", testClient)
|
|
|
|
|
|
|
+ _, err = NewOffsetManagerFromClient("grouop", testClient)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
- t.Fatal(err)
|
|
|
|
|
|
|
+ t.Error(err)
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ testClient.Close()
|
|
|
|
|
+
|
|
|
|
|
+ _, err = NewOffsetManagerFromClient("group", testClient)
|
|
|
|
|
+ if err != ErrClosedClient {
|
|
|
|
|
+ t.Errorf("Error expected for closed client; actual value: %v", err)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ seedBroker.Close()
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func TestFetchInitialFail(t *testing.T) {
|
|
|
|
|
+ om := initOM(t)
|
|
|
|
|
+
|
|
|
fetchResponse := new(OffsetFetchResponse)
|
|
fetchResponse := new(OffsetFetchResponse)
|
|
|
fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
|
|
fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
|
|
|
- Err: ErrNoError,
|
|
|
|
|
|
|
+ Err: ErrNotCoordinatorForConsumer,
|
|
|
Offset: 5,
|
|
Offset: 5,
|
|
|
Metadata: "test_meta",
|
|
Metadata: "test_meta",
|
|
|
})
|
|
})
|
|
|
coordinator.Returns(fetchResponse)
|
|
coordinator.Returns(fetchResponse)
|
|
|
|
|
|
|
|
- pom, err := om.ManagePartition("my_topic", 0)
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- t.Fatal(err)
|
|
|
|
|
|
|
+ _, err := om.ManagePartition("my_topic", 0)
|
|
|
|
|
+ if err != ErrNotCoordinatorForConsumer {
|
|
|
|
|
+ t.Error(err)
|
|
|
}
|
|
}
|
|
|
- return pom
|
|
|
|
|
|
|
+
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func TestOffset(t *testing.T) {
|
|
func TestOffset(t *testing.T) {
|