|
|
@@ -199,6 +199,92 @@ func TestClientMetadata(t *testing.T) {
|
|
|
safeClose(t, client)
|
|
|
}
|
|
|
|
|
|
+func TestClientReceivingUnknownTopic(t *testing.T) {
|
|
|
+ seedBroker := newMockBroker(t, 1)
|
|
|
+
|
|
|
+ metadataResponse1 := new(MetadataResponse)
|
|
|
+ seedBroker.Returns(metadataResponse1)
|
|
|
+
|
|
|
+ config := NewConfig()
|
|
|
+ config.Metadata.Retry.Max = 0
|
|
|
+ client, err := NewClient([]string{seedBroker.Addr()}, config)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ metadataUnknownTopic := new(MetadataResponse)
|
|
|
+ metadataUnknownTopic.AddTopic("new_topic", ErrUnknownTopicOrPartition)
|
|
|
+ seedBroker.Returns(metadataUnknownTopic)
|
|
|
+
|
|
|
+ if err := client.RefreshMetadata("new_topic"); err != ErrUnknownTopicOrPartition {
|
|
|
+ t.Error("ErrUnknownTopicOrPartition expected, got", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ // If we are asking for the leader of a partition of the non-existing topic.
|
|
|
+ // we will request metadata again.
|
|
|
+ seedBroker.Returns(metadataUnknownTopic)
|
|
|
+
|
|
|
+ if _, err = client.Leader("new_topic", 1); err != ErrUnknownTopicOrPartition {
|
|
|
+ t.Error("Expected ErrUnknownTopicOrPartition, got", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ safeClose(t, client)
|
|
|
+ seedBroker.Close()
|
|
|
+}
|
|
|
+
|
|
|
+func TestClientReceivingPartialMetadata(t *testing.T) {
|
|
|
+ seedBroker := newMockBroker(t, 1)
|
|
|
+ leader := newMockBroker(t, 5)
|
|
|
+
|
|
|
+ metadataResponse1 := new(MetadataResponse)
|
|
|
+ metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ seedBroker.Returns(metadataResponse1)
|
|
|
+
|
|
|
+ config := NewConfig()
|
|
|
+ config.Metadata.Retry.Max = 0
|
|
|
+ client, err := NewClient([]string{seedBroker.Addr()}, config)
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ replicas := []int32{leader.BrokerID(), seedBroker.BrokerID()}
|
|
|
+
|
|
|
+ metadataPartial := new(MetadataResponse)
|
|
|
+ metadataPartial.AddTopic("new_topic", ErrLeaderNotAvailable)
|
|
|
+ metadataPartial.AddTopicPartition("new_topic", 0, leader.BrokerID(), replicas, replicas, ErrNoError)
|
|
|
+ metadataPartial.AddTopicPartition("new_topic", 1, -1, replicas, []int32{}, ErrLeaderNotAvailable)
|
|
|
+ seedBroker.Returns(metadataPartial)
|
|
|
+
|
|
|
+ if err := client.RefreshMetadata("new_topic"); err != nil {
|
|
|
+ t.Error("ErrLeaderNotAvailable should not make RefreshMetadata respond with an error")
|
|
|
+ }
|
|
|
+
|
|
|
+ // Even though the metadata was incomplete, we should be able to get the leader of a partition
|
|
|
+ // for which we did get a useful response, without doing additional requests.
|
|
|
+
|
|
|
+ partition0Leader, err := client.Leader("new_topic", 0)
|
|
|
+ if err != nil {
|
|
|
+ t.Error(err)
|
|
|
+ } else if partition0Leader.Addr() != leader.Addr() {
|
|
|
+ t.Error("Unexpected leader returned", partition0Leader.Addr())
|
|
|
+ }
|
|
|
+
|
|
|
+ // If we are asking for the leader of a partition that didn't have a leader before,
|
|
|
+ // we will do another metadata request.
|
|
|
+
|
|
|
+ seedBroker.Returns(metadataPartial)
|
|
|
+
|
|
|
+ // Still no leader for the partition, so asking for it should return an error.
|
|
|
+ _, err = client.Leader("new_topic", 1)
|
|
|
+ if err != ErrLeaderNotAvailable {
|
|
|
+ t.Error("Expected ErrLeaderNotAvailable, got", err)
|
|
|
+ }
|
|
|
+
|
|
|
+ safeClose(t, client)
|
|
|
+ seedBroker.Close()
|
|
|
+ leader.Close()
|
|
|
+}
|
|
|
+
|
|
|
func TestClientRefreshBehaviour(t *testing.T) {
|
|
|
seedBroker := newMockBroker(t, 1)
|
|
|
leader := newMockBroker(t, 5)
|