Browse Source

Test of one fetchInitialOffset error

Includes fix to error returns in offset_manager.go
Aaron Kavlie 10 năm trước cách đây
mục cha
commit
71aee6656f
1 tập tin đã thay đổi với 52 bổ sung3 xóa
  1. 52 3
      offset_manager_test.go

+ 52 - 3
offset_manager_test.go

@@ -35,9 +35,11 @@ func TestNewOffsetManager(t *testing.T) {
 	seedBroker.Close()
 }
 
-func initPOM(t *testing.T) PartitionOffsetManager {
+func TestFetchInitialFail(t *testing.T) {
+	// Mostly copy-paste from initPOM
+	// TODO: eliminate this repetition
 	config := NewConfig()
-	config.Metadata.Retry.Max = 1
+	config.Metadata.Retry.Max = 0
 	config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
 
 	broker = newMockBroker(t, 1)
@@ -61,18 +63,65 @@ func initPOM(t *testing.T) PartitionOffsetManager {
 		CoordinatorPort: coordinator.Port(),
 	})
 
+	om, err := NewOffsetManagerFromClient("group", testClient)
+	if err != nil {
+		t.Fatal(err)
+	}
+
 	fetchResponse := new(OffsetFetchResponse)
+
+	// Only Err below modified
 	fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
-		Err:      ErrNoError,
+		Err:      ErrNotCoordinatorForConsumer,
 		Offset:   5,
 		Metadata: "test_meta",
 	})
 	coordinator.Returns(fetchResponse)
 
+	_, err = om.ManagePartition("my_topic", 0)
+	if err != ErrNotCoordinatorForConsumer {
+		t.Error(err)
+	}
+
+}
+
+func initPOM(t *testing.T) PartitionOffsetManager {
+	config := NewConfig()
+	config.Metadata.Retry.Max = 0
+	config.Consumer.Offsets.CommitInterval = 1 * time.Millisecond
+
+	broker = newMockBroker(t, 1)
+	coordinator = newMockBroker(t, 2)
+
+	seedMeta := new(MetadataResponse)
+	seedMeta.AddBroker(coordinator.Addr(), coordinator.BrokerID())
+	seedMeta.AddTopicPartition("my_topic", 0, 1, []int32{}, []int32{}, ErrNoError)
+	seedMeta.AddTopicPartition("my_topic", 1, 1, []int32{}, []int32{}, ErrNoError)
+	broker.Returns(seedMeta)
+
+	var err error
+	testClient, err = NewClient([]string{broker.Addr()}, config)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	broker.Returns(&ConsumerMetadataResponse{
+		CoordinatorID:   coordinator.BrokerID(),
+		CoordinatorHost: "127.0.0.1",
+		CoordinatorPort: coordinator.Port(),
+	})
+
 	om, err := NewOffsetManagerFromClient("group", testClient)
 	if err != nil {
 		t.Fatal(err)
 	}
+	fetchResponse := new(OffsetFetchResponse)
+	fetchResponse.AddBlock("my_topic", 0, &OffsetFetchResponseBlock{
+		Err:      ErrNoError,
+		Offset:   5,
+		Metadata: "test_meta",
+	})
+	coordinator.Returns(fetchResponse)
 
 	pom, err := om.ManagePartition("my_topic", 0)
 	if err != nil {