|
|
@@ -6,6 +6,7 @@ import (
|
|
|
"time"
|
|
|
|
|
|
"github.com/eapache/go-resiliency/breaker"
|
|
|
+ "github.com/eapache/queue"
|
|
|
)
|
|
|
|
|
|
func forceFlushThreshold() int {
|
|
|
@@ -592,19 +593,21 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
// effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
|
|
|
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
|
|
|
func (p *asyncProducer) retryHandler() {
|
|
|
- var buf []*ProducerMessage
|
|
|
- var msg *ProducerMessage
|
|
|
- refs := 0
|
|
|
- shuttingDown := false
|
|
|
+ var (
|
|
|
+ msg *ProducerMessage
|
|
|
+ buf = queue.New()
|
|
|
+ refs = 0
|
|
|
+ shuttingDown = false
|
|
|
+ )
|
|
|
|
|
|
for {
|
|
|
- if len(buf) == 0 {
|
|
|
+ if buf.Length() == 0 {
|
|
|
msg = <-p.retries
|
|
|
} else {
|
|
|
select {
|
|
|
case msg = <-p.retries:
|
|
|
- case p.input <- buf[0]:
|
|
|
- buf = buf[1:]
|
|
|
+ case p.input <- buf.Peek().(*ProducerMessage):
|
|
|
+ buf.Remove()
|
|
|
continue
|
|
|
}
|
|
|
}
|
|
|
@@ -622,13 +625,14 @@ func (p *asyncProducer) retryHandler() {
|
|
|
break
|
|
|
}
|
|
|
} else {
|
|
|
- buf = append(buf, msg)
|
|
|
+ buf.Add(msg)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
close(p.retries)
|
|
|
- for i := range buf {
|
|
|
- p.input <- buf[i]
|
|
|
+ for buf.Length() != 0 {
|
|
|
+ p.input <- buf.Peek().(*ProducerMessage)
|
|
|
+ buf.Remove()
|
|
|
}
|
|
|
close(p.input)
|
|
|
}
|