Przeglądaj źródła

Add randomize seed broker method

Justin Chen 3 lat temu
rodzic
commit
5bcc532a52
1 zmienionych plików z 9 dodań i 8 usunięć
  1. 9 8
      client.go

+ 9 - 8
client.go

@@ -160,10 +160,7 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
 		coordinators:            make(map[string]int32),
 	}
 
-	random := rand.New(rand.NewSource(time.Now().UnixNano()))
-	for _, index := range random.Perm(len(addrs)) {
-		client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
-	}
+	client.randomizeSeedBrokers(addrs)
 
 	if conf.Metadata.Full {
 		// do an initial fetch of all cluster metadata by specifying an empty list of topics
@@ -450,10 +447,7 @@ func (client *client) RefreshBrokers(addrs []string) error {
 	client.seedBrokers = nil
 	client.deadSeeds = nil
 
-	random := rand.New(rand.NewSource(time.Now().UnixNano()))
-	for _, index := range random.Perm(len(addrs)) {
-		client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
-	}
+	client.randomizeSeedBrokers(addrs)
 
 	return nil
 }
@@ -591,6 +585,13 @@ func (client *client) RefreshCoordinator(consumerGroup string) error {
 
 // private broker management helpers
 
+func (client *client) randomizeSeedBrokers(addrs []string) {
+	random := rand.New(rand.NewSource(time.Now().UnixNano()))
+	for _, index := range random.Perm(len(addrs)) {
+		client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
+	}
+}
+
 func (client *client) updateBroker(brokers []*Broker) {
 	var currentBroker = make(map[int32]*Broker, len(brokers))