|
|
@@ -69,22 +69,21 @@ func (client *Client) Close() {
|
|
|
}
|
|
|
|
|
|
// functions for use by producers and consumers
|
|
|
-// if Go had the concept they would be protected instead of private
|
|
|
+// if Go had the concept they would be marked 'protected'
|
|
|
|
|
|
func (client *Client) leader(topic string, partition_id int32) (*k.Broker, error) {
|
|
|
- leader := client.cachedLeader(topic, partition_id)
|
|
|
+ leader, kerr := client.cachedLeader(topic, partition_id)
|
|
|
|
|
|
- if leader == nil {
|
|
|
+ if kerr != k.NO_ERROR {
|
|
|
err := client.refreshTopic(topic)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
-
|
|
|
- leader = client.cachedLeader(topic, partition_id)
|
|
|
+ leader, kerr = client.cachedLeader(topic, partition_id)
|
|
|
}
|
|
|
|
|
|
- if leader == nil {
|
|
|
- return nil, k.UNKNOWN_TOPIC_OR_PARTITION
|
|
|
+ if kerr != k.NO_ERROR {
|
|
|
+ return nil, kerr
|
|
|
}
|
|
|
|
|
|
return leader, nil
|
|
|
@@ -98,7 +97,6 @@ func (client *Client) partitions(topic string) ([]int32, error) {
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
-
|
|
|
partitions = client.cachedPartitions(topic)
|
|
|
}
|
|
|
|
|
|
@@ -158,21 +156,23 @@ func (client *Client) any() *k.Broker {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (client *Client) cachedLeader(topic string, partition_id int32) *k.Broker {
|
|
|
+func (client *Client) cachedLeader(topic string, partition_id int32) (*k.Broker, k.KError) {
|
|
|
client.lock.RLock()
|
|
|
defer client.lock.RUnlock()
|
|
|
|
|
|
partitions := client.leaders[topic]
|
|
|
if partitions != nil {
|
|
|
- leader := partitions[partition_id]
|
|
|
- if leader == -1 {
|
|
|
- return nil
|
|
|
- } else {
|
|
|
- return client.brokers[leader]
|
|
|
+ leader, ok := partitions[partition_id]
|
|
|
+ if ok {
|
|
|
+ if leader == -1 {
|
|
|
+ return nil, k.LEADER_NOT_AVAILABLE
|
|
|
+ } else {
|
|
|
+ return client.brokers[leader], k.NO_ERROR
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- return nil
|
|
|
+ return nil, k.UNKNOWN_TOPIC_OR_PARTITION
|
|
|
}
|
|
|
|
|
|
func (client *Client) cachedPartitions(topic string) []int32 {
|