|
@@ -40,7 +40,7 @@ func newBrokerManager(client *Client, host string, port int32) (bm *brokerManage
|
|
|
return bm, nil
|
|
return bm, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-func (bm *brokerManager) lookupLeader(topic string, partition int32) (*broker, error) {
|
|
|
|
|
|
|
+func (bm *brokerManager) getLeader(topic string, partition int32) (*broker, error) {
|
|
|
var broker *broker = nil
|
|
var broker *broker = nil
|
|
|
bm.brokersLock.RLock()
|
|
bm.brokersLock.RLock()
|
|
|
id, ok := bm.leaders[topicPartition{topic, partition}]
|
|
id, ok := bm.leaders[topicPartition{topic, partition}]
|
|
@@ -66,6 +66,59 @@ func (bm *brokerManager) lookupLeader(topic string, partition int32) (*broker, e
|
|
|
return broker, nil
|
|
return broker, nil
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (bm *brokerManager) tryLeader(topic string, partition int32, req encoder, res decoder) error {
|
|
|
|
|
+ b, err := bm.getLeader(topic, partition)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ responseChan, err := b.sendRequest(bm.client.id, req)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ // errors that would make us refresh the broker metadata don't get returned here,
|
|
|
|
|
+ // they'd come through responseChan.errors, so it's safe to just return here
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ select {
|
|
|
|
|
+ case buf := <-responseChan.packets:
|
|
|
|
|
+ decoder := realDecoder{raw: buf}
|
|
|
|
|
+ err = res.decode(&decoder)
|
|
|
|
|
+ case err = <-responseChan.errors:
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if err == nil {
|
|
|
|
|
+ // successfully received and decoded the packet, we're done
|
|
|
|
|
+ return nil
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // we got an error, so discard that broker
|
|
|
|
|
+ bm.brokersLock.Lock()
|
|
|
|
|
+ delete(bm.brokers, b.id)
|
|
|
|
|
+ bm.brokersLock.Unlock()
|
|
|
|
|
+
|
|
|
|
|
+ // then do the whole thing again treating all errors as fatal
|
|
|
|
|
+ // (the metadata for the broker gets refreshed automatically in getLeader)
|
|
|
|
|
+ b, err = bm.getLeader(topic, partition)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ responseChan, err = b.sendRequest(bm.client.id, req)
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ select {
|
|
|
|
|
+ case buf := <-responseChan.packets:
|
|
|
|
|
+ decoder := realDecoder{raw: buf}
|
|
|
|
|
+ err = res.decode(&decoder)
|
|
|
|
|
+ return err
|
|
|
|
|
+ case err = <-responseChan.errors:
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
func (bm *brokerManager) getDefault() *broker {
|
|
func (bm *brokerManager) getDefault() *broker {
|
|
|
|
|
|
|
|
if bm.defaultBroker == nil {
|
|
if bm.defaultBroker == nil {
|