|
|
@@ -25,9 +25,9 @@ type Client struct {
|
|
|
// the broker addresses given to us through the constructor are not guaranteed to be returned in
|
|
|
// the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
|
|
|
// so we store them separately
|
|
|
- extraBrokerAddrs []string
|
|
|
- extraBroker *Broker
|
|
|
- deadBrokerAddrs map[string]struct{}
|
|
|
+ seedBrokerAddrs []string
|
|
|
+ seedBroker *Broker
|
|
|
+ deadBrokerAddrs map[string]struct{}
|
|
|
|
|
|
brokers map[int32]*Broker // maps broker ids to brokers
|
|
|
leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
|
|
|
@@ -53,15 +53,15 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
|
|
|
}
|
|
|
|
|
|
client := &Client{
|
|
|
- id: id,
|
|
|
- config: *config,
|
|
|
- extraBrokerAddrs: addrs,
|
|
|
- extraBroker: NewBroker(addrs[0]),
|
|
|
- deadBrokerAddrs: make(map[string]struct{}),
|
|
|
- brokers: make(map[int32]*Broker),
|
|
|
- leaders: make(map[string]map[int32]int32),
|
|
|
+ id: id,
|
|
|
+ config: *config,
|
|
|
+ seedBrokerAddrs: addrs,
|
|
|
+ seedBroker: NewBroker(addrs[0]),
|
|
|
+ deadBrokerAddrs: make(map[string]struct{}),
|
|
|
+ brokers: make(map[int32]*Broker),
|
|
|
+ leaders: make(map[string]map[int32]int32),
|
|
|
}
|
|
|
- client.extraBroker.Open(config.DefaultBrokerConf)
|
|
|
+ client.seedBroker.Open(config.DefaultBrokerConf)
|
|
|
|
|
|
// do an initial fetch of all cluster metadata by specifing an empty list of topics
|
|
|
err := client.RefreshAllMetadata()
|
|
|
@@ -105,8 +105,8 @@ func (client *Client) Close() error {
|
|
|
client.brokers = nil
|
|
|
client.leaders = nil
|
|
|
|
|
|
- if client.extraBroker != nil {
|
|
|
- go withRecover(func() { client.extraBroker.Close() })
|
|
|
+ if client.seedBroker != nil {
|
|
|
+ go withRecover(func() { client.seedBroker.Close() })
|
|
|
}
|
|
|
|
|
|
return nil
|
|
|
@@ -235,13 +235,13 @@ func (client *Client) disconnectBroker(broker *Broker) {
|
|
|
|
|
|
client.deadBrokerAddrs[broker.addr] = struct{}{}
|
|
|
|
|
|
- if broker == client.extraBroker {
|
|
|
- client.extraBrokerAddrs = client.extraBrokerAddrs[1:]
|
|
|
- if len(client.extraBrokerAddrs) > 0 {
|
|
|
- client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
|
|
|
- client.extraBroker.Open(client.config.DefaultBrokerConf)
|
|
|
+ if broker == client.seedBroker {
|
|
|
+ client.seedBrokerAddrs = client.seedBrokerAddrs[1:]
|
|
|
+ if len(client.seedBrokerAddrs) > 0 {
|
|
|
+ client.seedBroker = NewBroker(client.seedBrokerAddrs[0])
|
|
|
+ client.seedBroker.Open(client.config.DefaultBrokerConf)
|
|
|
} else {
|
|
|
- client.extraBroker = nil
|
|
|
+ client.seedBroker = nil
|
|
|
}
|
|
|
} else {
|
|
|
// we don't need to update the leaders hash, it will automatically get refreshed next time because
|
|
|
@@ -321,18 +321,18 @@ func (client *Client) resurrectDeadBrokers() {
|
|
|
client.lock.Lock()
|
|
|
defer client.lock.Unlock()
|
|
|
|
|
|
- for _, addr := range client.extraBrokerAddrs {
|
|
|
+ for _, addr := range client.seedBrokerAddrs {
|
|
|
client.deadBrokerAddrs[addr] = struct{}{}
|
|
|
}
|
|
|
|
|
|
- client.extraBrokerAddrs = []string{}
|
|
|
+ client.seedBrokerAddrs = []string{}
|
|
|
for addr := range client.deadBrokerAddrs {
|
|
|
- client.extraBrokerAddrs = append(client.extraBrokerAddrs, addr)
|
|
|
+ client.seedBrokerAddrs = append(client.seedBrokerAddrs, addr)
|
|
|
}
|
|
|
client.deadBrokerAddrs = make(map[string]struct{})
|
|
|
|
|
|
- client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
|
|
|
- client.extraBroker.Open(client.config.DefaultBrokerConf)
|
|
|
+ client.seedBroker = NewBroker(client.seedBrokerAddrs[0])
|
|
|
+ client.seedBroker.Open(client.config.DefaultBrokerConf)
|
|
|
}
|
|
|
|
|
|
func (client *Client) any() *Broker {
|
|
|
@@ -343,7 +343,7 @@ func (client *Client) any() *Broker {
|
|
|
return broker
|
|
|
}
|
|
|
|
|
|
- return client.extraBroker
|
|
|
+ return client.seedBroker
|
|
|
}
|
|
|
|
|
|
func (client *Client) cachedLeader(topic string, partitionID int32) *Broker {
|