|
|
@@ -367,7 +367,7 @@ func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend)
|
|
|
func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *MessageToSend) {
|
|
|
var leader *Broker
|
|
|
var output chan *MessageToSend
|
|
|
- var backlog []*MessageToSend
|
|
|
+
|
|
|
breaker := breaker.New(3, 1, 10*time.Second)
|
|
|
doUpdate := func() (err error) {
|
|
|
if err = p.client.RefreshTopicMetadata(topic); err != nil {
|
|
|
@@ -387,6 +387,12 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+ highWatermark := 0
|
|
|
+ retryState := make([]struct {
|
|
|
+ buf []*MessageToSend
|
|
|
+ expectChaser bool
|
|
|
+ }, p.config.MaxRetries+1)
|
|
|
+
|
|
|
// try to prefetch the leader; if this doesn't work, we'll do a proper breaker-protected refresh-and-fetch
|
|
|
// on the first message
|
|
|
leader, _ = p.client.Leader(topic, partition)
|
|
|
@@ -395,45 +401,60 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
}
|
|
|
|
|
|
for msg := range input {
|
|
|
- if msg.retries == 0 {
|
|
|
- // normal case
|
|
|
- if backlog != nil {
|
|
|
- backlog = append(backlog, msg)
|
|
|
- continue
|
|
|
- }
|
|
|
- } else if msg.flags&chaser == 0 {
|
|
|
- // retry flag set, chaser flag not set
|
|
|
- if backlog == nil {
|
|
|
- // on the very first retried message we send off a chaser so that we know when everything "in between" has made it
|
|
|
- // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
|
|
|
- Logger.Printf("producer/leader state change to [retrying] on %s/%d\n", topic, partition)
|
|
|
- Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
- output <- &MessageToSend{Topic: topic, partition: partition, flags: chaser}
|
|
|
- backlog = make([]*MessageToSend, 0)
|
|
|
- p.unrefBrokerWorker(leader)
|
|
|
- output = nil
|
|
|
- time.Sleep(p.config.RetryBackoff)
|
|
|
- }
|
|
|
- } else {
|
|
|
- // retry *and* chaser flag set, flush the backlog and return to normal processing
|
|
|
- Logger.Printf("producer/leader state change to [flushing] on %s/%d\n", topic, partition)
|
|
|
- if output == nil {
|
|
|
- if err := breaker.Run(doUpdate); err != nil {
|
|
|
- Logger.Printf("producer/leader state change to [normal] after \"%s\" on %s/%d\n", err, topic, partition)
|
|
|
- p.returnErrors(backlog, err)
|
|
|
- backlog = nil
|
|
|
- continue
|
|
|
+ if msg.retries > highWatermark {
|
|
|
+ // on the very first retried message we send off a chaser so that we know when everything "in between" has made it
|
|
|
+ // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
|
|
|
+ highWatermark = msg.retries
|
|
|
+ Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
+ retryState[msg.retries].expectChaser = true
|
|
|
+ output <- &MessageToSend{Topic: topic, partition: partition, flags: chaser, retries: msg.retries - 1}
|
|
|
+ Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
+ p.unrefBrokerWorker(leader)
|
|
|
+ output = nil
|
|
|
+ time.Sleep(p.config.RetryBackoff)
|
|
|
+ } else if highWatermark > 0 {
|
|
|
+ if msg.retries < highWatermark {
|
|
|
+ // we are retrying but not ready yet, buffer it until we can flush things in the right order (unless it's a chaser)
|
|
|
+ if msg.flags&chaser == chaser {
|
|
|
+ retryState[msg.retries].expectChaser = false
|
|
|
+ } else {
|
|
|
+ retryState[msg.retries].buf = append(retryState[msg.retries].buf, msg)
|
|
|
}
|
|
|
- Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
- }
|
|
|
+ continue
|
|
|
+ } else if msg.flags&chaser == chaser {
|
|
|
+ // msg.retries == highWatermark && chaser flag set, we can flush/reset at least some of our buffers
|
|
|
+ retryState[highWatermark].expectChaser = false
|
|
|
+ Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
+ for {
|
|
|
+ highWatermark--
|
|
|
+ Logger.Printf("producer/leader state change to [flushing-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
+
|
|
|
+ if output == nil {
|
|
|
+ if err := breaker.Run(doUpdate); err != nil {
|
|
|
+ p.returnErrors(retryState[highWatermark].buf, err)
|
|
|
+ goto flushDone
|
|
|
+ }
|
|
|
+ Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
+ }
|
|
|
|
|
|
- for _, msg := range backlog {
|
|
|
- output <- msg
|
|
|
- }
|
|
|
- Logger.Printf("producer/leader state change to [normal] on %s/%d\n", topic, partition)
|
|
|
+ for _, msg := range retryState[highWatermark].buf {
|
|
|
+ output <- msg
|
|
|
+ }
|
|
|
|
|
|
- backlog = nil
|
|
|
- continue
|
|
|
+ flushDone:
|
|
|
+ if retryState[highWatermark].expectChaser {
|
|
|
+ Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
+ if highWatermark == 0 {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ continue
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
if output == nil {
|