|
|
@@ -4,6 +4,8 @@ import (
|
|
|
"fmt"
|
|
|
"sync"
|
|
|
"time"
|
|
|
+
|
|
|
+ "github.com/eapache/go-resiliency/breaker"
|
|
|
)
|
|
|
|
|
|
func forceFlushThreshold() int {
|
|
|
@@ -329,6 +331,26 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
var leader *Broker
|
|
|
var output chan *MessageToSend
|
|
|
var backlog []*MessageToSend
|
|
|
+ breaker := breaker.New(3, 1, 10*time.Second)
|
|
|
+ doUpdate := func() (err error) {
|
|
|
+ if err = p.client.RefreshTopicMetadata(topic); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ if leader, err = p.client.Leader(topic, partition); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ output = p.getBrokerWorker(leader)
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+
|
|
|
+ // try to prefetch the leader; if this doesn't work, we'll do a proper breaker-protected refresh-and-fetch
|
|
|
+ // on the first message
|
|
|
+ leader, _ = p.client.Leader(topic, partition)
|
|
|
+ if leader != nil {
|
|
|
+ output = p.getBrokerWorker(leader)
|
|
|
+ }
|
|
|
|
|
|
for msg := range input {
|
|
|
if msg.flags&retried == 0 {
|
|
|
@@ -352,21 +374,11 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
// retry *and* chaser flag set, flush the backlog and return to normal processing
|
|
|
Logger.Printf("producer/leader state change to [flushing] on %s/%d\n", topic, partition)
|
|
|
if output == nil {
|
|
|
- err := p.client.RefreshTopicMetadata(topic)
|
|
|
- if err != nil {
|
|
|
+ if err := breaker.Run(doUpdate); err != nil {
|
|
|
p.returnErrors(backlog, err)
|
|
|
backlog = nil
|
|
|
continue
|
|
|
}
|
|
|
-
|
|
|
- leader, err = p.client.Leader(topic, partition)
|
|
|
- if err != nil {
|
|
|
- p.returnErrors(backlog, err)
|
|
|
- backlog = nil
|
|
|
- continue
|
|
|
- }
|
|
|
-
|
|
|
- output = p.getBrokerWorker(leader)
|
|
|
}
|
|
|
|
|
|
for _, msg := range backlog {
|
|
|
@@ -379,22 +391,10 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
}
|
|
|
|
|
|
if output == nil {
|
|
|
- var err error
|
|
|
- if backlog != nil {
|
|
|
- err = p.client.RefreshTopicMetadata(topic)
|
|
|
- if err != nil {
|
|
|
- p.returnError(msg, err)
|
|
|
- continue
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- leader, err = p.client.Leader(topic, partition)
|
|
|
- if err != nil {
|
|
|
+ if err := breaker.Run(doUpdate); err != nil {
|
|
|
p.returnError(msg, err)
|
|
|
continue
|
|
|
}
|
|
|
-
|
|
|
- output = p.getBrokerWorker(leader)
|
|
|
}
|
|
|
|
|
|
output <- msg
|