Просмотр исходного кода

When OutOfBrokers, retry with previously-disconnected brokers.

This is a partial workaround for #15. Really, broker management should
be completely rearchitected, but today is not the day for that.
Burke Libbey 12 лет назад
Родитель
Сommit
87b7e454d0
1 измененных файлов с 32 добавлено и 0 удалено
  1. 32 0
      client.go

+ 32 - 0
client.go

@@ -26,6 +26,7 @@ type Client struct {
 	// so we store them separately
 	extraBrokerAddrs []string
 	extraBroker      *Broker
+	deadBrokerAddrs  []string
 
 	brokers map[int32]*Broker          // maps broker ids to brokers
 	leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
@@ -167,6 +168,8 @@ func (client *Client) disconnectBroker(broker *Broker) {
 	client.lock.Lock()
 	defer client.lock.Unlock()
 
+	client.deadBrokerAddrs = append(client.deadBrokerAddrs, broker.addr)
+
 	if broker == client.extraBroker {
 		client.extraBrokerAddrs = client.extraBrokerAddrs[1:]
 		if len(client.extraBrokerAddrs) > 0 {
@@ -223,9 +226,38 @@ func (client *Client) refreshMetadata(topics []string, retries int) error {
 		client.disconnectBroker(broker)
 	}
 
+	if retries > 0 {
+		time.Sleep(client.config.WaitForElection)
+		client.resurrectDeadBrokers()
+		return client.refreshMetadata(topics, retries-1)
+	}
+
 	return OutOfBrokers
 }
 
+func (client *Client) resurrectDeadBrokers() {
+	Logger.Println("Ran out of connectable brokers. Retrying with brokers marked dead.")
+	client.lock.Lock()
+	defer client.lock.Unlock()
+
+	brokers := make(map[string]struct{})
+	for _, addr := range client.deadBrokerAddrs {
+		brokers[addr] = struct{}{}
+	}
+	for _, addr := range client.extraBrokerAddrs {
+		brokers[addr] = struct{}{}
+	}
+
+	client.deadBrokerAddrs = []string{}
+	client.extraBrokerAddrs = []string{}
+	for addr, _ := range brokers {
+		client.extraBrokerAddrs = append(client.extraBrokerAddrs, addr)
+	}
+
+	client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
+	client.extraBroker.Open(client.config.ConcurrencyPerBroker)
+}
+
 func (client *Client) any() *Broker {
 	client.lock.RLock()
 	defer client.lock.RUnlock()