Browse Source

Merge pull request #396 from Shopify/fix-retry-queue

Improve memory of producer retry queue
Evan Huus 10 years ago
parent
commit
d7cc796147
1 changed files with 14 additions and 10 deletions
  1. 14 10
      async_producer.go

+ 14 - 10
async_producer.go

@@ -6,6 +6,7 @@ import (
 	"time"
 
 	"github.com/eapache/go-resiliency/breaker"
+	"github.com/eapache/queue"
 )
 
 func forceFlushThreshold() int {
@@ -592,19 +593,21 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
 // effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
 // based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
 func (p *asyncProducer) retryHandler() {
-	var buf []*ProducerMessage
-	var msg *ProducerMessage
-	refs := 0
-	shuttingDown := false
+	var (
+		msg          *ProducerMessage
+		buf          = queue.New()
+		refs         = 0
+		shuttingDown = false
+	)
 
 	for {
-		if len(buf) == 0 {
+		if buf.Length() == 0 {
 			msg = <-p.retries
 		} else {
 			select {
 			case msg = <-p.retries:
-			case p.input <- buf[0]:
-				buf = buf[1:]
+			case p.input <- buf.Peek().(*ProducerMessage):
+				buf.Remove()
 				continue
 			}
 		}
@@ -622,13 +625,14 @@ func (p *asyncProducer) retryHandler() {
 				break
 			}
 		} else {
-			buf = append(buf, msg)
+			buf.Add(msg)
 		}
 	}
 
 	close(p.retries)
-	for i := range buf {
-		p.input <- buf[i]
+	for buf.Length() != 0 {
+		p.input <- buf.Peek().(*ProducerMessage)
+		buf.Remove()
 	}
 	close(p.input)
 }