|
|
@@ -332,6 +332,18 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
var output chan *MessageToSend
|
|
|
var backlog []*MessageToSend
|
|
|
breaker := breaker.New(3, 1, 10*time.Second)
|
|
|
+ doUpdate := func() (err error) {
|
|
|
+ if err = p.client.RefreshTopicMetadata(topic); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ if leader, err = p.client.Leader(topic, partition); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ output = p.getBrokerWorker(leader)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
|
|
|
// try to prefetch the leader; if this doesn't work, we'll do a proper breaker-protected refresh-and-fetch
|
|
|
// on the first message
|
|
|
@@ -362,12 +374,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
// retry *and* chaser flag set, flush the backlog and return to normal processing
|
|
|
Logger.Printf("producer/leader state change to [flushing] on %s/%d\n", topic, partition)
|
|
|
if output == nil {
|
|
|
- err := breaker.Run(func() (err error) {
|
|
|
- leader, output, err = p.updateLeaderAndWorker(topic, partition)
|
|
|
- return err
|
|
|
- })
|
|
|
-
|
|
|
- if err != nil {
|
|
|
+ if err := breaker.Run(doUpdate); err != nil {
|
|
|
p.returnErrors(backlog, err)
|
|
|
backlog = nil
|
|
|
continue
|
|
|
@@ -384,12 +391,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
}
|
|
|
|
|
|
if output == nil {
|
|
|
- err := breaker.Run(func() (err error) {
|
|
|
- leader, output, err = p.updateLeaderAndWorker(topic, partition)
|
|
|
- return err
|
|
|
- })
|
|
|
-
|
|
|
- if err != nil {
|
|
|
+ if err := breaker.Run(doUpdate); err != nil {
|
|
|
p.returnError(msg, err)
|
|
|
continue
|
|
|
}
|
|
|
@@ -785,17 +787,3 @@ func (p *Producer) unrefBrokerWorker(broker *Broker) {
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
-func (p *Producer) updateLeaderAndWorker(topic string, partition int32) (leader *Broker, worker chan *MessageToSend, err error) {
|
|
|
- if err = p.client.RefreshTopicMetadata(topic); err != nil {
|
|
|
- return nil, nil, err
|
|
|
- }
|
|
|
-
|
|
|
- if leader, err = p.client.Leader(topic, partition); err != nil {
|
|
|
- return nil, nil, err
|
|
|
- }
|
|
|
-
|
|
|
- worker = p.getBrokerWorker(leader)
|
|
|
-
|
|
|
- return leader, worker, nil
|
|
|
-}
|