|
|
@@ -387,12 +387,6 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
- highWatermark := 0
|
|
|
- retryState := make([]struct {
|
|
|
- buf []*MessageToSend
|
|
|
- expectChaser bool
|
|
|
- }, p.config.MaxRetries+1)
|
|
|
-
|
|
|
// try to prefetch the leader; if this doesn't work, we'll do a proper breaker-protected refresh-and-fetch
|
|
|
// on the first message
|
|
|
leader, _ = p.client.Leader(topic, partition)
|
|
|
@@ -400,9 +394,19 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
output = p.getBrokerWorker(leader)
|
|
|
}
|
|
|
|
|
|
+ // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
|
|
|
+ // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
|
|
|
+ // retryState[msg.retries].expectChaser simply tracks whether we've seen a chaser message for a given level (and
|
|
|
+ // therefore whether our buffer is complete and safe to flush)
|
|
|
+ highWatermark := 0
|
|
|
+ retryState := make([]struct {
|
|
|
+ buf []*MessageToSend
|
|
|
+ expectChaser bool
|
|
|
+ }, p.config.MaxRetries+1)
|
|
|
+
|
|
|
for msg := range input {
|
|
|
if msg.retries > highWatermark {
|
|
|
- // on the very first retried message we send off a chaser so that we know when everything "in between" has made it
|
|
|
+ // new, higher, retry level; send off a chaser so that we know when everything "in between" has made it
|
|
|
// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
|
|
|
highWatermark = msg.retries
|
|
|
Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
@@ -413,8 +417,9 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
output = nil
|
|
|
time.Sleep(p.config.RetryBackoff)
|
|
|
} else if highWatermark > 0 {
|
|
|
+ // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
|
|
|
if msg.retries < highWatermark {
|
|
|
- // we are retrying but not ready yet, buffer it until we can flush things in the right order (unless it's a chaser)
|
|
|
+ // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a chaser)
|
|
|
if msg.flags&chaser == chaser {
|
|
|
retryState[msg.retries].expectChaser = false
|
|
|
} else {
|
|
|
@@ -422,7 +427,8 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
}
|
|
|
continue
|
|
|
} else if msg.flags&chaser == chaser {
|
|
|
- // msg.retries == highWatermark && chaser flag set, we can flush/reset at least some of our buffers
|
|
|
+ // this message is of the current retry level (msg.retries == highWatermark) and the chaser flag is set,
|
|
|
+ // meaning this retry level is done and we can go down (at least) one level and flush that
|
|
|
retryState[highWatermark].expectChaser = false
|
|
|
Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
for {
|
|
|
@@ -457,6 +463,9 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
|
|
|
+ // without breaking any of our ordering guarantees
|
|
|
+
|
|
|
if output == nil {
|
|
|
if err := breaker.Run(doUpdate); err != nil {
|
|
|
p.returnError(msg, err)
|