|
@@ -96,8 +96,10 @@ func (bm *brokerManager) tryLeader(topic string, partition int32, req encoder, r
|
|
|
delete(bm.brokers, b.id)
|
|
delete(bm.brokers, b.id)
|
|
|
bm.brokersLock.Unlock()
|
|
bm.brokersLock.Unlock()
|
|
|
|
|
|
|
|
- // then do the whole thing again treating all errors as fatal
|
|
|
|
|
|
|
+ // then do the whole thing again
|
|
|
// (the metadata for the broker gets refreshed automatically in getLeader)
|
|
// (the metadata for the broker gets refreshed automatically in getLeader)
|
|
|
|
|
+ // if we get a broker here, it's guaranteed to be fresh, so if it fails then
|
|
|
|
|
+ // we pass that error back to the user (as opposed to retrying indefinitely)
|
|
|
b, err = bm.getLeader(topic, partition)
|
|
b, err = bm.getLeader(topic, partition)
|
|
|
if err != nil {
|
|
if err != nil {
|
|
|
return err
|
|
return err
|