|
|
@@ -96,7 +96,7 @@ func (config *ProducerConfig) Validate() error {
|
|
|
return ConfigurationError("Invalid RetryBackoff")
|
|
|
}
|
|
|
|
|
|
- if config.MaxRetries < 0 || config.MaxRetries > 1 {
|
|
|
+ if config.MaxRetries < 0 {
|
|
|
return ConfigurationError("Invalid MaxRetries")
|
|
|
}
|
|
|
|
|
|
@@ -367,7 +367,7 @@ func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend)
|
|
|
func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *MessageToSend) {
|
|
|
var leader *Broker
|
|
|
var output chan *MessageToSend
|
|
|
- var backlog []*MessageToSend
|
|
|
+
|
|
|
breaker := breaker.New(3, 1, 10*time.Second)
|
|
|
doUpdate := func() (err error) {
|
|
|
if err = p.client.RefreshTopicMetadata(topic); err != nil {
|
|
|
@@ -394,48 +394,78 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
output = p.getBrokerWorker(leader)
|
|
|
}
|
|
|
|
|
|
+ // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
|
|
|
+ // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
|
|
|
+ // retryState[msg.retries].expectChaser simply tracks whether we've seen a chaser message for a given level (and
|
|
|
+ // therefore whether our buffer is complete and safe to flush)
|
|
|
+ highWatermark := 0
|
|
|
+ retryState := make([]struct {
|
|
|
+ buf []*MessageToSend
|
|
|
+ expectChaser bool
|
|
|
+ }, p.config.MaxRetries+1)
|
|
|
+
|
|
|
for msg := range input {
|
|
|
- if msg.retries == 0 {
|
|
|
- // normal case
|
|
|
- if backlog != nil {
|
|
|
- backlog = append(backlog, msg)
|
|
|
- continue
|
|
|
- }
|
|
|
- } else if msg.flags&chaser == 0 {
|
|
|
- // retry flag set, chaser flag not set
|
|
|
- if backlog == nil {
|
|
|
- // on the very first retried message we send off a chaser so that we know when everything "in between" has made it
|
|
|
- // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
|
|
|
- Logger.Printf("producer/leader state change to [retrying] on %s/%d\n", topic, partition)
|
|
|
- Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
- output <- &MessageToSend{Topic: topic, partition: partition, flags: chaser}
|
|
|
- backlog = make([]*MessageToSend, 0)
|
|
|
- p.unrefBrokerWorker(leader)
|
|
|
- output = nil
|
|
|
- time.Sleep(p.config.RetryBackoff)
|
|
|
- }
|
|
|
- } else {
|
|
|
- // retry *and* chaser flag set, flush the backlog and return to normal processing
|
|
|
- Logger.Printf("producer/leader state change to [flushing] on %s/%d\n", topic, partition)
|
|
|
- if output == nil {
|
|
|
- if err := breaker.Run(doUpdate); err != nil {
|
|
|
- Logger.Printf("producer/leader state change to [normal] after \"%s\" on %s/%d\n", err, topic, partition)
|
|
|
- p.returnErrors(backlog, err)
|
|
|
- backlog = nil
|
|
|
- continue
|
|
|
+ if msg.retries > highWatermark {
|
|
|
+ // new, higher, retry level; send off a chaser so that we know when everything "in between" has made it
|
|
|
+ // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
|
|
|
+ highWatermark = msg.retries
|
|
|
+ Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
+ retryState[msg.retries].expectChaser = true
|
|
|
+ output <- &MessageToSend{Topic: topic, partition: partition, flags: chaser, retries: msg.retries - 1}
|
|
|
+ Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
+ p.unrefBrokerWorker(leader)
|
|
|
+ output = nil
|
|
|
+ time.Sleep(p.config.RetryBackoff)
|
|
|
+ } else if highWatermark > 0 {
|
|
|
+ // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
|
|
|
+ if msg.retries < highWatermark {
|
|
|
+ // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a chaser)
|
|
|
+ if msg.flags&chaser == chaser {
|
|
|
+ retryState[msg.retries].expectChaser = false
|
|
|
+ } else {
|
|
|
+ retryState[msg.retries].buf = append(retryState[msg.retries].buf, msg)
|
|
|
}
|
|
|
- Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
- }
|
|
|
+ continue
|
|
|
+ } else if msg.flags&chaser == chaser {
|
|
|
+ // this message is of the current retry level (msg.retries == highWatermark) and the chaser flag is set,
|
|
|
+ // meaning this retry level is done and we can go down (at least) one level and flush that
|
|
|
+ retryState[highWatermark].expectChaser = false
|
|
|
+ Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
+ for {
|
|
|
+ highWatermark--
|
|
|
+ Logger.Printf("producer/leader state change to [flushing-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
+
|
|
|
+ if output == nil {
|
|
|
+ if err := breaker.Run(doUpdate); err != nil {
|
|
|
+ p.returnErrors(retryState[highWatermark].buf, err)
|
|
|
+ goto flushDone
|
|
|
+ }
|
|
|
+ Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
+ }
|
|
|
|
|
|
- for _, msg := range backlog {
|
|
|
- output <- msg
|
|
|
- }
|
|
|
- Logger.Printf("producer/leader state change to [normal] on %s/%d\n", topic, partition)
|
|
|
+ for _, msg := range retryState[highWatermark].buf {
|
|
|
+ output <- msg
|
|
|
+ }
|
|
|
|
|
|
- backlog = nil
|
|
|
- continue
|
|
|
+ flushDone:
|
|
|
+ if retryState[highWatermark].expectChaser {
|
|
|
+ Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
+ break
|
|
|
+ } else {
|
|
|
+ Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
+ if highWatermark == 0 {
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+ continue
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+ // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
|
|
|
+ // without breaking any of our ordering guarantees
|
|
|
+
|
|
|
if output == nil {
|
|
|
if err := breaker.Run(doUpdate); err != nil {
|
|
|
p.returnError(msg, err)
|