|
|
@@ -41,7 +41,7 @@ func (bm *brokerManager) terminateBroker(id int32) {
|
|
|
bm.lock.Unlock()
|
|
|
}
|
|
|
|
|
|
-func (bm *brokerManager) getLeader(topic string, partition_id int32) *broker {
|
|
|
+func (bm *brokerManager) getCachedLeader(topic string, partition_id int32) *broker {
|
|
|
var leader *broker = nil
|
|
|
|
|
|
bm.lock.RLock()
|
|
|
@@ -58,9 +58,9 @@ func (bm *brokerManager) getLeader(topic string, partition_id int32) *broker {
|
|
|
return leader
|
|
|
}
|
|
|
|
|
|
-func (bm *brokerManager) getValidLeader(topic string, partition_id int32) (*broker, error) {
|
|
|
+func (bm *brokerManager) getLeader(topic string, partition_id int32) (*broker, error) {
|
|
|
|
|
|
- leader := bm.getLeader(topic, partition_id)
|
|
|
+ leader := bm.getCachedLeader(topic, partition_id)
|
|
|
|
|
|
if leader == nil {
|
|
|
err := bm.refreshTopic(topic)
|
|
|
@@ -68,7 +68,7 @@ func (bm *brokerManager) getValidLeader(topic string, partition_id int32) (*brok
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- leader = bm.getLeader(topic, partition_id)
|
|
|
+ leader = bm.getCachedLeader(topic, partition_id)
|
|
|
}
|
|
|
|
|
|
if leader == nil {
|
|
|
@@ -104,49 +104,25 @@ func (bm *brokerManager) partitionsForTopic(topic string) ([]int32, error) {
|
|
|
return partitions, nil
|
|
|
}
|
|
|
|
|
|
-func (bm *brokerManager) sendToPartition(topic string, partition int32, req requestEncoder, res responseDecoder) (bool, error) {
|
|
|
- b, err := bm.getValidLeader(topic, partition)
|
|
|
+func (bm *brokerManager) sendToPartition(topic string, partition int32, req requestEncoder, res responseDecoder) error {
|
|
|
+ b, err := bm.getLeader(topic, partition)
|
|
|
if err != nil {
|
|
|
- return false, err
|
|
|
+ return err
|
|
|
}
|
|
|
|
|
|
- gotResponse, err := b.sendAndReceive(bm.client.id, req, res)
|
|
|
+ err = b.sendAndReceive(bm.client.id, req, res)
|
|
|
switch err.(type) {
|
|
|
case EncodingError:
|
|
|
// encoding errors are our problem, not the broker's, so just return them
|
|
|
// rather than refreshing the broker metadata
|
|
|
- return false, err
|
|
|
+ return err
|
|
|
case nil:
|
|
|
- // no error, did we get a response?
|
|
|
- if gotResponse {
|
|
|
- // yes, so check for stale topics that may require a resend
|
|
|
- stale := res.staleTopics()
|
|
|
- if len(stale) == 0 {
|
|
|
- return true, nil
|
|
|
- }
|
|
|
- err = bm.refreshTopics(stale)
|
|
|
- if err != nil {
|
|
|
- return true, err
|
|
|
- }
|
|
|
- } else {
|
|
|
- // no, so we have to assume it worked
|
|
|
- return false, nil
|
|
|
- }
|
|
|
+ return nil
|
|
|
default:
|
|
|
// broker error, so discard that broker
|
|
|
bm.terminateBroker(b.id)
|
|
|
+ return err
|
|
|
}
|
|
|
-
|
|
|
- // then do the whole thing again
|
|
|
- // (the metadata for the broker gets refreshed automatically in getValidLeader)
|
|
|
- // if we get a broker here, it's guaranteed to be fresh, so if it fails then
|
|
|
- // we pass that error back to the user (as opposed to retrying indefinitely)
|
|
|
- b, err = bm.getValidLeader(topic, partition)
|
|
|
- if err != nil {
|
|
|
- return false, err
|
|
|
- }
|
|
|
-
|
|
|
- return b.sendAndReceive(bm.client.id, req, res)
|
|
|
}
|
|
|
|
|
|
func (bm *brokerManager) getDefault() *broker {
|
|
|
@@ -163,24 +139,24 @@ func (bm *brokerManager) getDefault() *broker {
|
|
|
return bm.defaultBroker
|
|
|
}
|
|
|
|
|
|
-func (bm *brokerManager) sendToAny(req requestEncoder, res decoder) (bool, error) {
|
|
|
+func (bm *brokerManager) sendToAny(req requestEncoder, res decoder) error {
|
|
|
for b := bm.getDefault(); b != nil; b = bm.getDefault() {
|
|
|
- gotResponse, err := b.sendAndReceive(bm.client.id, req, res)
|
|
|
+ err := b.sendAndReceive(bm.client.id, req, res)
|
|
|
switch err.(type) {
|
|
|
case nil, EncodingError:
|
|
|
- return gotResponse, err
|
|
|
+ return err
|
|
|
default:
|
|
|
// broker error, so discard that broker
|
|
|
bm.defaultBroker = nil
|
|
|
bm.terminateBroker(b.id)
|
|
|
}
|
|
|
}
|
|
|
- return false, OutOfBrokers{}
|
|
|
+ return OutOfBrokers{}
|
|
|
}
|
|
|
|
|
|
func (bm *brokerManager) refreshTopics(topics []*string) error {
|
|
|
response := new(metadata)
|
|
|
- _, err := bm.sendToAny(&metadataRequest{topics}, response)
|
|
|
+ err := bm.sendToAny(&metadataRequest{topics}, response)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|