|
|
@@ -27,7 +27,7 @@ type Client struct {
|
|
|
// so we store them separately
|
|
|
extraBrokerAddrs []string
|
|
|
extraBroker *Broker
|
|
|
- deadBrokerAddrs []string
|
|
|
+ deadBrokerAddrs map[string]struct{}
|
|
|
|
|
|
brokers map[int32]*Broker // maps broker ids to brokers
|
|
|
leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
|
|
|
@@ -57,6 +57,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
|
|
|
config: *config,
|
|
|
extraBrokerAddrs: addrs,
|
|
|
extraBroker: NewBroker(addrs[0]),
|
|
|
+ deadBrokerAddrs: make(map[string]struct{}),
|
|
|
brokers: make(map[int32]*Broker),
|
|
|
leaders: make(map[string]map[int32]int32),
|
|
|
}
|
|
|
@@ -232,7 +233,7 @@ func (client *Client) disconnectBroker(broker *Broker) {
|
|
|
defer client.lock.Unlock()
|
|
|
Logger.Printf("Disconnecting Broker %d\n", broker.ID())
|
|
|
|
|
|
- client.deadBrokerAddrs = append(client.deadBrokerAddrs, broker.addr)
|
|
|
+ client.deadBrokerAddrs[broker.addr] = struct{}{}
|
|
|
|
|
|
if broker == client.extraBroker {
|
|
|
client.extraBrokerAddrs = client.extraBrokerAddrs[1:]
|
|
|
@@ -320,19 +321,15 @@ func (client *Client) resurrectDeadBrokers() {
|
|
|
client.lock.Lock()
|
|
|
defer client.lock.Unlock()
|
|
|
|
|
|
- brokers := make(map[string]struct{})
|
|
|
- for _, addr := range client.deadBrokerAddrs {
|
|
|
- brokers[addr] = struct{}{}
|
|
|
- }
|
|
|
for _, addr := range client.extraBrokerAddrs {
|
|
|
- brokers[addr] = struct{}{}
|
|
|
+ client.deadBrokerAddrs[addr] = struct{}{}
|
|
|
}
|
|
|
|
|
|
- client.deadBrokerAddrs = []string{}
|
|
|
client.extraBrokerAddrs = []string{}
|
|
|
- for addr := range brokers {
|
|
|
+ for addr := range client.deadBrokerAddrs {
|
|
|
client.extraBrokerAddrs = append(client.extraBrokerAddrs, addr)
|
|
|
}
|
|
|
+ client.deadBrokerAddrs = make(map[string]struct{})
|
|
|
|
|
|
client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
|
|
|
client.extraBroker.Open(client.config.DefaultBrokerConf)
|