|
@@ -515,6 +515,37 @@ func TestClientRefreshBehaviour(t *testing.T) {
|
|
|
safeClose(t, client)
|
|
|
}
|
|
|
|
|
|
+func TestClientRefreshBrokers(t *testing.T) {
|
|
|
+ initialSeed := NewMockBroker(t, 0)
|
|
|
+ leader := NewMockBroker(t, 5)
|
|
|
+
|
|
|
+ metadataResponse1 := new(MetadataResponse)
|
|
|
+ metadataResponse1.AddBroker(leader.Addr(), leader.BrokerID())
|
|
|
+ metadataResponse1.AddBroker(initialSeed.Addr(), initialSeed.BrokerID())
|
|
|
+ initialSeed.Returns(metadataResponse1)
|
|
|
+
|
|
|
+ c, err := NewClient([]string{initialSeed.Addr()}, nil)
|
|
|
+ client := c.(*client)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ t.Fatal(err)
|
|
|
+ }
|
|
|
+
|
|
|
+ if len(client.Brokers()) != 2 {
|
|
|
+ t.Error("Meta broker is not 2")
|
|
|
+ }
|
|
|
+
|
|
|
+ newSeedBrokers := []string{"localhost:12345"}
|
|
|
+ _ = client.RefreshBrokers(newSeedBrokers)
|
|
|
+
|
|
|
+ if client.seedBrokers[0].addr != newSeedBrokers[0] {
|
|
|
+ t.Error("Seed broker not updated")
|
|
|
+ }
|
|
|
+ if len(client.Brokers()) != 0 {
|
|
|
+ t.Error("Old brokers not closed")
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
func TestClientRefreshMetadataBrokerOffline(t *testing.T) {
|
|
|
seedBroker := NewMockBroker(t, 1)
|
|
|
leader := NewMockBroker(t, 5)
|