Browse Source

producer: enable retry-backoff

Wait a bit (250ms by default) for the cluster to elect a new leader before
processing message retries, since otherwise chances are we'll just get stale
metadata from the broker we ask and the client won't know enough to back off on
its own (because stale metadata won't contain any errors).

Fixes #229.
Evan Huus 11 years ago
parent
commit
936da829d2
1 changed files with 7 additions and 0 deletions
  1. 7 0
      producer.go

+ 7 - 0
producer.go

@@ -24,6 +24,7 @@ type ProducerConfig struct {
 	AckSuccesses      bool                   // If enabled, successfully delivered messages will be returned on the Successes channel.
 	MaxMessageBytes   int                    // The maximum permitted size of a message (defaults to 1000000)
 	ChannelBufferSize int                    // The size of the buffers of the channels between the different goroutines. Defaults to 0 (unbuffered).
+	RetryBackoff      time.Duration          // The amount of time to wait for the cluster to elect a new leader before processing retries. Defaults to 250ms.
 }
 
 // NewProducerConfig creates a new ProducerConfig instance with sensible defaults.
@@ -32,6 +33,7 @@ func NewProducerConfig() *ProducerConfig {
 		Partitioner:     NewHashPartitioner,
 		RequiredAcks:    WaitForLocal,
 		MaxMessageBytes: 1000000,
+		RetryBackoff:    250 * time.Millisecond,
 	}
 }
 
@@ -72,6 +74,10 @@ func (config *ProducerConfig) Validate() error {
 		Logger.Println("ProducerConfig.MaxMessageBytes too close to MaxRequestSize; it will be ignored.")
 	}
 
+	if config.RetryBackoff < 0 {
+		return ConfigurationError("Invalid RetryBackoff")
+	}
+
 	return nil
 }
 
@@ -369,6 +375,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
 				backlog = make([]*MessageToSend, 0)
 				p.unrefBrokerWorker(leader)
 				output = nil
+				time.Sleep(p.config.RetryBackoff)
 			}
 		} else {
 			// retry *and* chaser flag set, flush the backlog and return to normal processing