Prechádzať zdrojové kódy

Merge pull request #152 from Shopify/wait-for-dead-brokers

Wait for dead brokers
Evan Huus 11 rokov pred
rodič
commit
43940c285e
1 zmenil súbory, kde vykonal 14 pridanie a 9 odobranie
  1. 14 9
      client.go

+ 14 - 9
client.go

@@ -27,7 +27,7 @@ type Client struct {
 	// so we store them separately
 	extraBrokerAddrs []string
 	extraBroker      *Broker
-	deadBrokerAddrs  []string
+	deadBrokerAddrs  map[string]struct{}
 
 	brokers map[int32]*Broker          // maps broker ids to brokers
 	leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
@@ -57,6 +57,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 		config:           *config,
 		extraBrokerAddrs: addrs,
 		extraBroker:      NewBroker(addrs[0]),
+		deadBrokerAddrs:  make(map[string]struct{}),
 		brokers:          make(map[int32]*Broker),
 		leaders:          make(map[string]map[int32]int32),
 	}
@@ -232,7 +233,7 @@ func (client *Client) disconnectBroker(broker *Broker) {
 	defer client.lock.Unlock()
 	Logger.Printf("Disconnecting Broker %d\n", broker.ID())
 
-	client.deadBrokerAddrs = append(client.deadBrokerAddrs, broker.addr)
+	client.deadBrokerAddrs[broker.addr] = struct{}{}
 
 	if broker == client.extraBroker {
 		client.extraBrokerAddrs = client.extraBrokerAddrs[1:]
@@ -320,19 +321,15 @@ func (client *Client) resurrectDeadBrokers() {
 	client.lock.Lock()
 	defer client.lock.Unlock()
 
-	brokers := make(map[string]struct{})
-	for _, addr := range client.deadBrokerAddrs {
-		brokers[addr] = struct{}{}
-	}
 	for _, addr := range client.extraBrokerAddrs {
-		brokers[addr] = struct{}{}
+		client.deadBrokerAddrs[addr] = struct{}{}
 	}
 
-	client.deadBrokerAddrs = []string{}
 	client.extraBrokerAddrs = []string{}
-	for addr := range brokers {
+	for addr := range client.deadBrokerAddrs {
 		client.extraBrokerAddrs = append(client.extraBrokerAddrs, addr)
 	}
+	client.deadBrokerAddrs = make(map[string]struct{})
 
 	client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
 	client.extraBroker.Open(client.config.DefaultBrokerConf)
@@ -444,6 +441,14 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 				toRetry[topic.Name] = true
 				delete(client.leaders[topic.Name], partition.ID)
 			case NoError:
+				broker := client.brokers[partition.Leader]
+				if _, present := client.deadBrokerAddrs[broker.Addr()]; present {
+					if connected, _ := broker.Connected(); !connected {
+						toRetry[topic.Name] = true
+						delete(client.leaders[topic.Name], partition.ID)
+						continue
+					}
+				}
 				client.leaders[topic.Name][partition.ID] = partition.Leader
 			default:
 				return nil, partition.Err