Browse Source

retried as a boolean rather than failures as an int

Burke Libbey 11 years ago
parent
commit
9d35a0c617
2 changed files with 8 additions and 11 deletions
  1. 3 5
      produce_message.go
  2. 5 6
      producer.go

+ 3 - 5
produce_message.go

@@ -5,12 +5,10 @@ import "log"
 type produceMessage struct {
 	tp         topicPartition
 	key, value []byte
-	failures   uint32
+	retried    bool
 	sync       bool
 }
 
-const retryLimit = 1
-
 type produceRequestBuilder []*produceMessage
 
 // If the message is synchronous, we manually send it and wait for a return.
@@ -35,8 +33,8 @@ func (msg *produceMessage) enqueue(p *Producer) error {
 }
 
 func (msg *produceMessage) reenqueue(p *Producer) error {
-	if msg.failures < retryLimit {
-		msg.failures++
+	if !msg.retried {
+		msg.retried = true
 		return msg.enqueue(p)
 	}
 	return nil

+ 5 - 6
producer.go

@@ -172,11 +172,10 @@ func (p *Producer) genericSendMessage(topic string, key, value Encoder, synchron
 
 	// produce_message.go
 	msg := &produceMessage{
-		tp:       topicPartition{topic, partition},
-		key:      keyBytes,
-		value:    valBytes,
-		failures: 0,
-		sync:     synchronous,
+		tp:    topicPartition{topic, partition},
+		key:   keyBytes,
+		value: valBytes,
+		sync:  synchronous,
 	}
 
 	// produce_message.go
@@ -257,7 +256,7 @@ func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
 
 func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32) {
 	bp.mapM.Lock()
-	if msg.failures > 0 {
+	if msg.retried {
 		// Prepend: Deliver first, before any more recently-added messages.
 		bp.messages[msg.tp] = append([]*produceMessage{msg}, bp.messages[msg.tp]...)
 	} else {