|
@@ -112,7 +112,7 @@ func (client *Client) Topics() ([]string, error) {
|
|
|
defer client.lock.RUnlock()
|
|
defer client.lock.RUnlock()
|
|
|
|
|
|
|
|
ret := make([]string, 0, len(client.leaders))
|
|
ret := make([]string, 0, len(client.leaders))
|
|
|
- for topic, _ := range client.leaders {
|
|
|
|
|
|
|
+ for topic := range client.leaders {
|
|
|
ret = append(ret, topic)
|
|
ret = append(ret, topic)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -121,19 +121,19 @@ func (client *Client) Topics() ([]string, error) {
|
|
|
|
|
|
|
|
// Leader returns the broker object that is the leader of the current topic/partition, as
|
|
// Leader returns the broker object that is the leader of the current topic/partition, as
|
|
|
// determined by querying the cluster metadata.
|
|
// determined by querying the cluster metadata.
|
|
|
-func (client *Client) Leader(topic string, partition_id int32) (*Broker, error) {
|
|
|
|
|
- leader := client.cachedLeader(topic, partition_id)
|
|
|
|
|
|
|
+func (client *Client) Leader(topic string, partitionID int32) (*Broker, error) {
|
|
|
|
|
+ leader := client.cachedLeader(topic, partitionID)
|
|
|
|
|
|
|
|
if leader == nil {
|
|
if leader == nil {
|
|
|
err := client.RefreshTopicMetadata(topic)
|
|
err := client.RefreshTopicMetadata(topic)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return nil, err
|
|
return nil, err
|
|
|
}
|
|
}
|
|
|
- leader = client.cachedLeader(topic, partition_id)
|
|
|
|
|
|
|
+ leader = client.cachedLeader(topic, partitionID)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if leader == nil {
|
|
if leader == nil {
|
|
|
- return nil, UNKNOWN_TOPIC_OR_PARTITION
|
|
|
|
|
|
|
+ return nil, UnknownTopicOrPartition
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
return leader, nil
|
|
return leader, nil
|
|
@@ -193,7 +193,7 @@ func (client *Client) refreshMetadata(topics []string, retries int) error {
|
|
|
return nil
|
|
return nil
|
|
|
default:
|
|
default:
|
|
|
if retries <= 0 {
|
|
if retries <= 0 {
|
|
|
- return LEADER_NOT_AVAILABLE
|
|
|
|
|
|
|
+ return LeaderNotAvailable
|
|
|
}
|
|
}
|
|
|
time.Sleep(client.config.WaitForElection) // wait for leader election
|
|
time.Sleep(client.config.WaitForElection) // wait for leader election
|
|
|
return client.refreshMetadata(retry, retries-1)
|
|
return client.refreshMetadata(retry, retries-1)
|
|
@@ -221,13 +221,13 @@ func (client *Client) any() *Broker {
|
|
|
return client.extraBroker
|
|
return client.extraBroker
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (client *Client) cachedLeader(topic string, partition_id int32) *Broker {
|
|
|
|
|
|
|
+func (client *Client) cachedLeader(topic string, partitionID int32) *Broker {
|
|
|
client.lock.RLock()
|
|
client.lock.RLock()
|
|
|
defer client.lock.RUnlock()
|
|
defer client.lock.RUnlock()
|
|
|
|
|
|
|
|
partitions := client.leaders[topic]
|
|
partitions := client.leaders[topic]
|
|
|
if partitions != nil {
|
|
if partitions != nil {
|
|
|
- leader, ok := partitions[partition_id]
|
|
|
|
|
|
|
+ leader, ok := partitions[partitionID]
|
|
|
if ok {
|
|
if ok {
|
|
|
return client.brokers[leader]
|
|
return client.brokers[leader]
|
|
|
}
|
|
}
|
|
@@ -246,7 +246,7 @@ func (client *Client) cachedPartitions(topic string) []int32 {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
ret := make([]int32, 0, len(partitions))
|
|
ret := make([]int32, 0, len(partitions))
|
|
|
- for id, _ := range partitions {
|
|
|
|
|
|
|
+ for id := range partitions {
|
|
|
ret = append(ret, id)
|
|
ret = append(ret, id)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -254,7 +254,7 @@ func (client *Client) cachedPartitions(topic string) []int32 {
|
|
|
return ret
|
|
return ret
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// if no fatal error, returns a list of topics that need retrying due to LEADER_NOT_AVAILABLE
|
|
|
|
|
|
|
+// if no fatal error, returns a list of topics that need retrying due to LeaderNotAvailable
|
|
|
func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
|
client.lock.Lock()
|
|
client.lock.Lock()
|
|
|
defer client.lock.Unlock()
|
|
defer client.lock.Unlock()
|
|
@@ -281,9 +281,9 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
|
|
|
|
|
|
for _, topic := range data.Topics {
|
|
for _, topic := range data.Topics {
|
|
|
switch topic.Err {
|
|
switch topic.Err {
|
|
|
- case NO_ERROR:
|
|
|
|
|
|
|
+ case NoError:
|
|
|
break
|
|
break
|
|
|
- case LEADER_NOT_AVAILABLE:
|
|
|
|
|
|
|
+ case LeaderNotAvailable:
|
|
|
toRetry[topic.Name] = true
|
|
toRetry[topic.Name] = true
|
|
|
default:
|
|
default:
|
|
|
return nil, topic.Err
|
|
return nil, topic.Err
|
|
@@ -291,11 +291,11 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
|
client.leaders[topic.Name] = make(map[int32]int32, len(topic.Partitions))
|
|
client.leaders[topic.Name] = make(map[int32]int32, len(topic.Partitions))
|
|
|
for _, partition := range topic.Partitions {
|
|
for _, partition := range topic.Partitions {
|
|
|
switch partition.Err {
|
|
switch partition.Err {
|
|
|
- case LEADER_NOT_AVAILABLE:
|
|
|
|
|
|
|
+ case LeaderNotAvailable:
|
|
|
toRetry[topic.Name] = true
|
|
toRetry[topic.Name] = true
|
|
|
- delete(client.leaders[topic.Name], partition.Id)
|
|
|
|
|
- case NO_ERROR:
|
|
|
|
|
- client.leaders[topic.Name][partition.Id] = partition.Leader
|
|
|
|
|
|
|
+ delete(client.leaders[topic.Name], partition.ID)
|
|
|
|
|
+ case NoError:
|
|
|
|
|
+ client.leaders[topic.Name][partition.ID] = partition.Leader
|
|
|
default:
|
|
default:
|
|
|
return nil, partition.Err
|
|
return nil, partition.Err
|
|
|
}
|
|
}
|
|
@@ -303,7 +303,7 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
ret := make([]string, 0, len(toRetry))
|
|
ret := make([]string, 0, len(toRetry))
|
|
|
- for topic, _ := range toRetry {
|
|
|
|
|
|
|
+ for topic := range toRetry {
|
|
|
ret = append(ret, topic)
|
|
ret = append(ret, topic)
|
|
|
}
|
|
}
|
|
|
return ret, nil
|
|
return ret, nil
|