Browse Source

Merge pull request #281 from Shopify/back-off-after-failed-update

Producer: back off after failing to find a leader
Evan Huus 11 years ago
parent
commit
6fa7dbe1d9
1 changed files with 9 additions and 7 deletions
  1. 9 7
      producer.go

+ 9 - 7
producer.go

@@ -27,7 +27,7 @@ type ProducerConfig struct {
 	AckSuccesses      bool                   // If enabled, successfully delivered messages will be returned on the Successes channel.
 	MaxMessageBytes   int                    // The maximum permitted size of a message (defaults to 1000000). Equivalent to the broker's `message.max.bytes`.
 	MaxMessagesPerReq int                    // The maximum number of messages the producer will send in a single broker request. Defaults to 0 for unlimited. The global setting MaxRequestSize still applies. Similar to `queue.buffering.max.messages` in the JVM producer.
-	ChannelBufferSize int                    // The size of the buffers of the channels between the different goroutines (defaults to 0, i.e. unbuffered).
+	ChannelBufferSize int                    // The size of the buffers of the channels between the different goroutines (defaults to 256).
 	RetryBackoff      time.Duration          // The amount of time to wait for the cluster to elect a new leader before processing retries (defaults to 250ms). Similar to the retry.backoff.ms setting of the JVM producer.
 	MaxRetries        int                    // The total number of times to retry sending a message. Currently only supports 0 or 1 (the default). Similar to the message.send.max.retries setting of the JVM producer.
 }
@@ -35,12 +35,13 @@ type ProducerConfig struct {
 // NewProducerConfig creates a new ProducerConfig instance with sensible defaults.
 func NewProducerConfig() *ProducerConfig {
 	return &ProducerConfig{
-		Partitioner:     NewHashPartitioner,
-		RequiredAcks:    WaitForLocal,
-		MaxMessageBytes: 1000000,
-		RetryBackoff:    250 * time.Millisecond,
-		Timeout:         10 * time.Second,
-		MaxRetries:      1,
+		Partitioner:       NewHashPartitioner,
+		RequiredAcks:      WaitForLocal,
+		MaxMessageBytes:   1000000,
+		ChannelBufferSize: 256,
+		RetryBackoff:      250 * time.Millisecond,
+		Timeout:           10 * time.Second,
+		MaxRetries:        1,
 	}
 }
 
@@ -438,6 +439,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
 		if output == nil {
 			if err := breaker.Run(doUpdate); err != nil {
 				p.returnError(msg, err)
+				time.Sleep(p.config.RetryBackoff)
 				continue
 			}
 			Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)