Browse Source

Move retries to its own field and make MaxRetries configurable

Doesn't work yet, but these are the superficial changes necessary to support
multiple retries in the producer.
Evan Huus 11 years ago
parent
commit
979f5e8e25
1 changed files with 14 additions and 7 deletions
  1. 14 7
      producer.go

+ 14 - 7
producer.go

@@ -28,7 +28,8 @@ type ProducerConfig struct {
 	MaxMessageBytes   int                    // The maximum permitted size of a message (defaults to 1000000). Equivalent to the broker's `message.max.bytes`.
 	MaxMessagesPerReq int                    // The maximum number of messages the producer will send in a single broker request. Defaults to 0 for unlimited. The global setting MaxRequestSize still applies. Similar to `queue.buffering.max.messages` in the JVM producer.
 	ChannelBufferSize int                    // The size of the buffers of the channels between the different goroutines (defaults to 0, i.e. unbuffered).
-	RetryBackoff      time.Duration          // The amount of time to wait for the cluster to elect a new leader before processing retries (defaults to 250ms). Similar to the retry.backoff.ms setting of the JVM producer
+	RetryBackoff      time.Duration          // The amount of time to wait for the cluster to elect a new leader before processing retries (defaults to 250ms). Similar to the retry.backoff.ms setting of the JVM producer.
+	MaxRetries        int                    // The total number of times to retry sending a message. Currently only supports 0 or 1 (the default). Similar to the message.send.max.retries setting of the JVM producer.
 }
 
 // NewProducerConfig creates a new ProducerConfig instance with sensible defaults.
@@ -39,6 +40,7 @@ func NewProducerConfig() *ProducerConfig {
 		MaxMessageBytes: 1000000,
 		RetryBackoff:    250 * time.Millisecond,
 		Timeout:         10 * time.Second,
+		MaxRetries:      1,
 	}
 }
 
@@ -93,6 +95,10 @@ func (config *ProducerConfig) Validate() error {
 		return ConfigurationError("Invalid RetryBackoff")
 	}
 
+	if config.MaxRetries < 0 || config.MaxRetries > 1 {
+		return ConfigurationError("Invalid MaxRetries")
+	}
+
 	return nil
 }
 
@@ -150,8 +156,7 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 type flagSet int8
 
 const (
-	retried  flagSet = 1 << iota // message has been retried
-	chaser                       // message is last in a group that failed
+	chaser   flagSet = 1 << iota // message is last in a group that failed
 	ref                          // add a reference to a singleton channel
 	unref                        // remove a reference from a singleton channel
 	shutdown                     // start the shutdown process
@@ -167,6 +172,7 @@ type MessageToSend struct {
 	// these are filled in by the producer as the message is processed
 	offset    int64
 	partition int32
+	retries   int
 	flags     flagSet
 }
 
@@ -326,7 +332,7 @@ func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend)
 	partitioner := p.config.Partitioner()
 
 	for msg := range input {
-		if msg.flags&retried == 0 {
+		if msg.retries == 0 {
 			err := p.assignPartition(partitioner, msg)
 			if err != nil {
 				p.returnError(msg, err)
@@ -387,7 +393,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
 	}
 
 	for msg := range input {
-		if msg.flags&retried == 0 {
+		if msg.retries == 0 {
 			// normal case
 			if backlog != nil {
 				backlog = append(backlog, msg)
@@ -755,6 +761,7 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*MessageToSend) *Pr
 
 func (p *Producer) returnError(msg *MessageToSend, err error) {
 	msg.flags = 0
+	msg.retries = 0
 	p.errors <- &ProduceError{Msg: msg, Err: err}
 }
 
@@ -780,10 +787,10 @@ func (p *Producer) retryMessages(batch []*MessageToSend, err error) {
 		if msg == nil {
 			continue
 		}
-		if msg.flags&retried == retried {
+		if msg.retries >= p.config.MaxRetries {
 			p.returnError(msg, err)
 		} else {
-			msg.flags |= retried
+			msg.retries++
 			p.retries <- msg
 		}
 	}