浏览代码

producer: wrap leader updates in a circuit-breaker

Otherwise a partition being entirely gone can slow things right down, since
we'll do a complete metadata refresh on every single message.
Evan Huus 11 年之前
父节点
当前提交
b9418bb76f
共有 1 个文件被更改,包括 32 次插入20 次删除
  1. 32 20
      producer.go

+ 32 - 20
producer.go

@@ -4,6 +4,8 @@ import (
 	"fmt"
 	"sync"
 	"time"
+
+	"github.com/eapache/go-resiliency/breaker"
 )
 
 func forceFlushThreshold() int {
@@ -329,6 +331,14 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
 	var leader *Broker
 	var output chan *MessageToSend
 	var backlog []*MessageToSend
+	breaker := breaker.New(3, 1, 10*time.Second)
+
+	// try to prefetch the leader; if this doesn't work, we'll do a proper breaker-protected refresh-and-fetch
+	// on the first message
+	leader, _ = p.client.Leader(topic, partition)
+	if leader != nil {
+		output = p.getBrokerWorker(leader)
+	}
 
 	for msg := range input {
 		if msg.flags&retried == 0 {
@@ -352,21 +362,16 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
 			// retry *and* chaser flag set, flush the backlog and return to normal processing
 			Logger.Printf("producer/leader state change to [flushing] on %s/%d\n", topic, partition)
 			if output == nil {
-				err := p.client.RefreshTopicMetadata(topic)
-				if err != nil {
-					p.returnErrors(backlog, err)
-					backlog = nil
-					continue
-				}
+				err := breaker.Run(func() (err error) {
+					leader, output, err = p.updateLeaderAndWorker(topic, partition)
+					return err
+				})
 
-				leader, err = p.client.Leader(topic, partition)
 				if err != nil {
 					p.returnErrors(backlog, err)
 					backlog = nil
 					continue
 				}
-
-				output = p.getBrokerWorker(leader)
 			}
 
 			for _, msg := range backlog {
@@ -379,22 +384,15 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
 		}
 
 		if output == nil {
-			var err error
-			if backlog != nil {
-				err = p.client.RefreshTopicMetadata(topic)
-				if err != nil {
-					p.returnError(msg, err)
-					continue
-				}
-			}
+			err := breaker.Run(func() (err error) {
+				leader, output, err = p.updateLeaderAndWorker(topic, partition)
+				return err
+			})
 
-			leader, err = p.client.Leader(topic, partition)
 			if err != nil {
 				p.returnError(msg, err)
 				continue
 			}
-
-			output = p.getBrokerWorker(leader)
 		}
 
 		output <- msg
@@ -787,3 +785,17 @@ func (p *Producer) unrefBrokerWorker(broker *Broker) {
 		}
 	}
 }
+
+func (p *Producer) updateLeaderAndWorker(topic string, partition int32) (leader *Broker, worker chan *MessageToSend, err error) {
+	if err = p.client.RefreshTopicMetadata(topic); err != nil {
+		return nil, nil, err
+	}
+
+	if leader, err = p.client.Leader(topic, partition); err != nil {
+		return nil, nil, err
+	}
+
+	worker = p.getBrokerWorker(leader)
+
+	return leader, worker, nil
+}