|
|
@@ -20,40 +20,38 @@ func TestDefaultClientConfigValidates(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
func TestSimpleClient(t *testing.T) {
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
|
|
|
- mb := NewMockBroker(t, 1)
|
|
|
+ seedBroker.Returns(new(MetadataResponse))
|
|
|
|
|
|
- mb.Returns(new(MetadataResponse))
|
|
|
-
|
|
|
- client, err := NewClient("client_id", []string{mb.Addr()}, nil)
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer safeClose(t, client)
|
|
|
- defer mb.Close()
|
|
|
+
|
|
|
+ seedBroker.Close()
|
|
|
+ safeClose(t, client)
|
|
|
}
|
|
|
|
|
|
func TestCachedPartitions(t *testing.T) {
|
|
|
- mb1 := NewMockBroker(t, 1)
|
|
|
- mb5 := NewMockBroker(t, 5)
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ leader := NewMockBroker(t, 5)
|
|
|
+
|
|
|
replicas := []int32{3, 1, 5}
|
|
|
isr := []int32{5, 1}
|
|
|
|
|
|
- mdr := new(MetadataResponse)
|
|
|
- mdr.AddBroker(mb5.Addr(), mb5.BrokerID())
|
|
|
- mdr.AddTopicPartition("my_topic", 0, mb5.BrokerID(), replicas, isr, NoError)
|
|
|
- mdr.AddTopicPartition("my_topic", 1, mb5.BrokerID(), replicas, isr, LeaderNotAvailable)
|
|
|
- mb1.Returns(mdr)
|
|
|
+ metadataResponse := new(MetadataResponse)
|
|
|
+ metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, NoError)
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, LeaderNotAvailable)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
config := NewClientConfig()
|
|
|
config.MetadataRetries = 0
|
|
|
- client, err := NewClient("client_id", []string{mb1.Addr()}, config)
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, config)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer safeClose(t, client)
|
|
|
- defer mb1.Close()
|
|
|
- defer mb5.Close()
|
|
|
|
|
|
// Verify they aren't cached the same
|
|
|
allP := client.cachedPartitionsResults["my_topic"][allPartitions]
|
|
|
@@ -69,48 +67,49 @@ func TestCachedPartitions(t *testing.T) {
|
|
|
if 4 != len(client.cachedPartitions("my_topic", allPartitions)) {
|
|
|
t.Fatal("Not using the cache!")
|
|
|
}
|
|
|
+
|
|
|
+ leader.Close()
|
|
|
+ seedBroker.Close()
|
|
|
+ safeClose(t, client)
|
|
|
}
|
|
|
|
|
|
func TestClientSeedBrokers(t *testing.T) {
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ discoveredBroker := NewMockBroker(t, 2)
|
|
|
|
|
|
- mb1 := NewMockBroker(t, 1)
|
|
|
- mb2 := NewMockBroker(t, 2)
|
|
|
+ metadataResponse := new(MetadataResponse)
|
|
|
+ metadataResponse.AddBroker(discoveredBroker.Addr(), discoveredBroker.BrokerID())
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
- mdr := new(MetadataResponse)
|
|
|
- mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
|
|
|
- mb1.Returns(mdr)
|
|
|
-
|
|
|
- client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer safeClose(t, client)
|
|
|
- defer mb1.Close()
|
|
|
- defer mb2.Close()
|
|
|
+
|
|
|
+ discoveredBroker.Close()
|
|
|
+ seedBroker.Close()
|
|
|
+ safeClose(t, client)
|
|
|
}
|
|
|
|
|
|
func TestClientMetadata(t *testing.T) {
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ leader := NewMockBroker(t, 5)
|
|
|
|
|
|
- mb1 := NewMockBroker(t, 1)
|
|
|
- mb5 := NewMockBroker(t, 5)
|
|
|
replicas := []int32{3, 1, 5}
|
|
|
isr := []int32{5, 1}
|
|
|
|
|
|
- mdr := new(MetadataResponse)
|
|
|
- mdr.AddBroker(mb5.Addr(), mb5.BrokerID())
|
|
|
- mdr.AddTopicPartition("my_topic", 0, mb5.BrokerID(), replicas, isr, NoError)
|
|
|
- mdr.AddTopicPartition("my_topic", 1, mb5.BrokerID(), replicas, isr, LeaderNotAvailable)
|
|
|
- mb1.Returns(mdr)
|
|
|
+ metadataResponse := new(MetadataResponse)
|
|
|
+ metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 0, leader.BrokerID(), replicas, isr, NoError)
|
|
|
+ metadataResponse.AddTopicPartition("my_topic", 1, leader.BrokerID(), replicas, isr, LeaderNotAvailable)
|
|
|
+ seedBroker.Returns(metadataResponse)
|
|
|
|
|
|
config := NewClientConfig()
|
|
|
config.MetadataRetries = 0
|
|
|
- client, err := NewClient("client_id", []string{mb1.Addr()}, config)
|
|
|
+ client, err := NewClient("client_id", []string{seedBroker.Addr()}, config)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
- defer safeClose(t, client)
|
|
|
- defer mb1.Close()
|
|
|
- defer mb5.Close()
|
|
|
|
|
|
topics, err := client.Topics()
|
|
|
if err != nil {
|
|
|
@@ -159,21 +158,25 @@ func TestClientMetadata(t *testing.T) {
|
|
|
} else if isr[1] != 5 {
|
|
|
t.Error("Incorrect (or unsorted) isr")
|
|
|
}
|
|
|
+
|
|
|
+ leader.Close()
|
|
|
+ seedBroker.Close()
|
|
|
+ safeClose(t, client)
|
|
|
}
|
|
|
|
|
|
func TestClientRefreshBehaviour(t *testing.T) {
|
|
|
- mb1 := NewMockBroker(t, 1)
|
|
|
- mb5 := NewMockBroker(t, 5)
|
|
|
+ seedBroker := NewMockBroker(t, 1)
|
|
|
+ leader := NewMockBroker(t, 5)
|
|
|
|
|
|
- mdr := new(MetadataResponse)
|
|
|
- mdr.AddBroker(mb5.Addr(), mb5.BrokerID())
|
|
|
- mb1.Returns(mdr)
|
|
|
+ metadataResponse1 := new(MetadataResponse)
|
|
|
+ metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ seedBroker.Returns(metadataResponse1)
|
|
|
|
|
|
- mdr2 := new(MetadataResponse)
|
|
|
- mdr2.AddTopicPartition("my_topic", 0xb, mb5.BrokerID(), nil, nil, NoError)
|
|
|
- mb1.Returns(mdr2)
|
|
|
+ metadataResponse2 := new(MetadataResponse)
|
|
|
+ metadataResponse2.AddTopicPartition("my_topic", 0xb, leader.BrokerID(), nil, nil, NoError)
|
|
|
+ seedBroker.Returns(metadataResponse2)
|
|
|
|
|
|
- client, err := NewClient("clientID", []string{mb1.Addr()}, nil)
|
|
|
+ client, err := NewClient("clientID", []string{seedBroker.Addr()}, nil)
|
|
|
if err != nil {
|
|
|
t.Fatal(err)
|
|
|
}
|
|
|
@@ -193,7 +196,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
|
|
|
}
|
|
|
|
|
|
client.disconnectBroker(tst)
|
|
|
- mb5.Close()
|
|
|
- mb1.Close()
|
|
|
+ leader.Close()
|
|
|
+ seedBroker.Close()
|
|
|
safeClose(t, client)
|
|
|
}
|