|
|
@@ -24,6 +24,7 @@ type ProducerConfig struct {
|
|
|
AckSuccesses bool // If enabled, successfully delivered messages will be returned on the Successes channel.
|
|
|
MaxMessageBytes int // The maximum permitted size of a message (defaults to 1000000)
|
|
|
ChannelBufferSize int // The size of the buffers of the channels between the different goroutines. Defaults to 0 (unbuffered).
|
|
|
+ RetryBackoff time.Duration // The amount of time to wait for the cluster to elect a new leader before processing retries. Defaults to 250ms.
|
|
|
}
|
|
|
|
|
|
// NewProducerConfig creates a new ProducerConfig instance with sensible defaults.
|
|
|
@@ -32,6 +33,7 @@ func NewProducerConfig() *ProducerConfig {
|
|
|
Partitioner: NewHashPartitioner,
|
|
|
RequiredAcks: WaitForLocal,
|
|
|
MaxMessageBytes: 1000000,
|
|
|
+ RetryBackoff: 250 * time.Millisecond,
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@@ -72,6 +74,10 @@ func (config *ProducerConfig) Validate() error {
|
|
|
Logger.Println("ProducerConfig.MaxMessageBytes too close to MaxRequestSize; it will be ignored.")
|
|
|
}
|
|
|
|
|
|
+ if config.RetryBackoff < 0 {
|
|
|
+ return ConfigurationError("Invalid RetryBackoff")
|
|
|
+ }
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -369,6 +375,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
backlog = make([]*MessageToSend, 0)
|
|
|
p.unrefBrokerWorker(leader)
|
|
|
output = nil
|
|
|
+ time.Sleep(p.config.RetryBackoff)
|
|
|
}
|
|
|
} else {
|
|
|
// retry *and* chaser flag set, flush the backlog and return to normal processing
|