Browse Source

Merge pull request #279 from Shopify/retries-field

Move retries to its own field and make MaxRetries configurable
Evan Huus 11 years ago
parent
commit
25ce8d2c02
1 changed files with 14 additions and 7 deletions
  1. 14 7
      producer.go

+ 14 - 7
producer.go

@@ -28,7 +28,8 @@ type ProducerConfig struct {
 	MaxMessageBytes   int                    // The maximum permitted size of a message (defaults to 1000000). Equivalent to the broker's `message.max.bytes`.
 	MaxMessagesPerReq int                    // The maximum number of messages the producer will send in a single broker request. Defaults to 0 for unlimited. The global setting MaxRequestSize still applies. Similar to `queue.buffering.max.messages` in the JVM producer.
 	ChannelBufferSize int                    // The size of the buffers of the channels between the different goroutines (defaults to 0, i.e. unbuffered).
-	RetryBackoff      time.Duration          // The amount of time to wait for the cluster to elect a new leader before processing retries (defaults to 250ms). Similar to the retry.backoff.ms setting of the JVM producer
+	RetryBackoff      time.Duration          // The amount of time to wait for the cluster to elect a new leader before processing retries (defaults to 250ms). Similar to the retry.backoff.ms setting of the JVM producer.
+	MaxRetries        int                    // The total number of times to retry sending a message. Currently only supports 0 or 1 (the default). Similar to the message.send.max.retries setting of the JVM producer.
 }
 
 // NewProducerConfig creates a new ProducerConfig instance with sensible defaults.
@@ -39,6 +40,7 @@ func NewProducerConfig() *ProducerConfig {
 		MaxMessageBytes: 1000000,
 		RetryBackoff:    250 * time.Millisecond,
 		Timeout:         10 * time.Second,
+		MaxRetries:      1,
 	}
 }
 
@@ -93,6 +95,10 @@ func (config *ProducerConfig) Validate() error {
 		return ConfigurationError("Invalid RetryBackoff")
 	}
 
+	if config.MaxRetries < 0 || config.MaxRetries > 1 {
+		return ConfigurationError("Invalid MaxRetries")
+	}
+
 	return nil
 }
 
@@ -150,8 +156,7 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 type flagSet int8
 
 const (
-	retried  flagSet = 1 << iota // message has been retried
-	chaser                       // message is last in a group that failed
+	chaser   flagSet = 1 << iota // message is last in a group that failed
 	ref                          // add a reference to a singleton channel
 	unref                        // remove a reference from a singleton channel
 	shutdown                     // start the shutdown process
@@ -167,6 +172,7 @@ type MessageToSend struct {
 	// these are filled in by the producer as the message is processed
 	offset    int64
 	partition int32
+	retries   int
 	flags     flagSet
 }
 
@@ -326,7 +332,7 @@ func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend)
 	partitioner := p.config.Partitioner()
 
 	for msg := range input {
-		if msg.flags&retried == 0 {
+		if msg.retries == 0 {
 			err := p.assignPartition(partitioner, msg)
 			if err != nil {
 				p.returnError(msg, err)
@@ -388,7 +394,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
 	}
 
 	for msg := range input {
-		if msg.flags&retried == 0 {
+		if msg.retries == 0 {
 			// normal case
 			if backlog != nil {
 				backlog = append(backlog, msg)
@@ -756,6 +762,7 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*MessageToSend) *Pr
 
 func (p *Producer) returnError(msg *MessageToSend, err error) {
 	msg.flags = 0
+	msg.retries = 0
 	p.errors <- &ProduceError{Msg: msg, Err: err}
 }
 
@@ -781,10 +788,10 @@ func (p *Producer) retryMessages(batch []*MessageToSend, err error) {
 		if msg == nil {
 			continue
 		}
-		if msg.flags&retried == retried {
+		if msg.retries >= p.config.MaxRetries {
 			p.returnError(msg, err)
 		} else {
-			msg.flags |= retried
+			msg.retries++
 			p.retries <- msg
 		}
 	}