Browse Source

Simplify how we manage seed brokers

Create all the Broker objects themselves right up front rather than storing the
addresses. Don't eagerly open the seed connection in the constructor, we open it
lazily in `any()` already. Never add non-seed brokers to the dead set.
Substantially simplify `resurrectDeadBrokers`. Don't try and close a broker that
isn't even connected.
Evan Huus 10 năm trước cách đây
mục cha
commit
766d1bfaac
2 tập tin đã thay đổi với 23 bổ sung38 xóa
  1. 19 36
      client.go
  2. 4 2
      utils.go

+ 19 - 36
client.go

@@ -69,9 +69,8 @@ type client struct {
 	// the broker addresses given to us through the constructor are not guaranteed to be returned in
 	// the broker addresses given to us through the constructor are not guaranteed to be returned in
 	// the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
 	// the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
 	// so we store them separately
 	// so we store them separately
-	seedBrokerAddrs []string
-	seedBroker      *Broker
-	deadBrokerAddrs map[string]none
+	seedBrokers []*Broker
+	deadSeeds   []*Broker
 
 
 	brokers  map[int32]*Broker                       // maps broker ids to brokers
 	brokers  map[int32]*Broker                       // maps broker ids to brokers
 	metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
 	metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
@@ -103,14 +102,13 @@ func NewClient(addrs []string, conf *Config) (Client, error) {
 	client := &client{
 	client := &client{
 		conf:                    conf,
 		conf:                    conf,
 		closer:                  make(chan none),
 		closer:                  make(chan none),
-		seedBrokerAddrs:         addrs,
-		seedBroker:              NewBroker(addrs[0]),
-		deadBrokerAddrs:         make(map[string]none),
 		brokers:                 make(map[int32]*Broker),
 		brokers:                 make(map[int32]*Broker),
 		metadata:                make(map[string]map[int32]*PartitionMetadata),
 		metadata:                make(map[string]map[int32]*PartitionMetadata),
 		cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
 		cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
 	}
 	}
-	_ = client.seedBroker.Open(conf)
+	for _, addr := range addrs {
+		client.seedBrokers = append(client.seedBrokers, NewBroker(addr))
+	}
 
 
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
 	err := client.RefreshMetadata()
 	err := client.RefreshMetadata()
@@ -151,13 +149,14 @@ func (client *client) Close() error {
 	for _, broker := range client.brokers {
 	for _, broker := range client.brokers {
 		safeAsyncClose(broker)
 		safeAsyncClose(broker)
 	}
 	}
-	client.brokers = nil
-	client.metadata = nil
 
 
-	if client.seedBroker != nil {
-		safeAsyncClose(client.seedBroker)
+	for _, broker := range client.seedBrokers {
+		safeAsyncClose(broker)
 	}
 	}
 
 
+	client.brokers = nil
+	client.metadata = nil
+
 	close(client.closer)
 	close(client.closer)
 
 
 	return nil
 	return nil
@@ -326,16 +325,9 @@ func (client *client) disconnectBroker(broker *Broker) {
 	client.lock.Lock()
 	client.lock.Lock()
 	defer client.lock.Unlock()
 	defer client.lock.Unlock()
 
 
-	client.deadBrokerAddrs[broker.addr] = none{}
-
-	if broker == client.seedBroker {
-		client.seedBrokerAddrs = client.seedBrokerAddrs[1:]
-		if len(client.seedBrokerAddrs) > 0 {
-			client.seedBroker = NewBroker(client.seedBrokerAddrs[0])
-			_ = client.seedBroker.Open(client.conf)
-		} else {
-			client.seedBroker = nil
-		}
+	if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] {
+		client.deadSeeds = append(client.deadSeeds, broker)
+		client.seedBrokers = client.seedBrokers[1:]
 	} else {
 	} else {
 		// we do this so that our loop in `tryRefreshMetadata` doesn't go on forever,
 		// we do this so that our loop in `tryRefreshMetadata` doesn't go on forever,
 		// but we really shouldn't have to; once that loop is made better this case can be
 		// but we really shouldn't have to; once that loop is made better this case can be
@@ -349,29 +341,20 @@ func (client *client) resurrectDeadBrokers() {
 	client.lock.Lock()
 	client.lock.Lock()
 	defer client.lock.Unlock()
 	defer client.lock.Unlock()
 
 
-	for _, addr := range client.seedBrokerAddrs {
-		client.deadBrokerAddrs[addr] = none{}
-	}
-
-	client.seedBrokerAddrs = []string{}
-	for addr := range client.deadBrokerAddrs {
-		client.seedBrokerAddrs = append(client.seedBrokerAddrs, addr)
-	}
-	client.deadBrokerAddrs = make(map[string]none)
-
-	client.seedBroker = NewBroker(client.seedBrokerAddrs[0])
-	_ = client.seedBroker.Open(client.conf)
+	client.seedBrokers = append(client.seedBrokers, client.deadSeeds...)
+	client.deadSeeds = nil
 }
 }
 
 
 func (client *client) any() *Broker {
 func (client *client) any() *Broker {
 	client.lock.RLock()
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 	defer client.lock.RUnlock()
 
 
-	if client.seedBroker != nil {
-		_ = client.seedBroker.Open(client.conf)
-		return client.seedBroker
+	if len(client.seedBrokers) > 0 {
+		_ = client.seedBrokers[0].Open(client.conf)
+		return client.seedBrokers[0]
 	}
 	}
 
 
+	// not guaranteed to be random *or* deterministic
 	for _, broker := range client.brokers {
 	for _, broker := range client.brokers {
 		_ = broker.Open(client.conf)
 		_ = broker.Open(client.conf)
 		return broker
 		return broker

+ 4 - 2
utils.go

@@ -45,8 +45,10 @@ func withRecover(fn func()) {
 func safeAsyncClose(b *Broker) {
 func safeAsyncClose(b *Broker) {
 	tmp := b // local var prevents clobbering in goroutine
 	tmp := b // local var prevents clobbering in goroutine
 	go withRecover(func() {
 	go withRecover(func() {
-		if err := tmp.Close(); err != nil {
-			Logger.Println("Error closing broker", tmp.ID(), ":", err)
+		if connected, _ := tmp.Connected(); connected {
+			if err := tmp.Close(); err != nil {
+				Logger.Println("Error closing broker", tmp.ID(), ":", err)
+			}
 		}
 		}
 	})
 	})
 }
 }