Browse Source

Merge pull request #57 from Shopify/retry_limit_shouldnt_be_configurable

RetryLimit in producer shouldn't be configurable.
Burke Libbey 12 years ago
parent
commit
f451b72323
3 changed files with 12 additions and 18 deletions
  1. 3 1
      produce_message.go
  2. 8 15
      producer.go
  3. 1 2
      producer_test.go

+ 3 - 1
produce_message.go

@@ -9,6 +9,8 @@ type produceMessage struct {
 	sync       bool
 }
 
+const retryLimit = 1
+
 type produceRequestBuilder []*produceMessage
 
 // If the message is synchronous, we manually send it and wait for a return.
@@ -33,7 +35,7 @@ func (msg *produceMessage) enqueue(p *Producer) error {
 }
 
 func (msg *produceMessage) reenqueue(p *Producer) error {
-	if msg.failures < p.config.MaxDeliveryRetries {
+	if msg.failures < retryLimit {
 		msg.failures++
 		return msg.enqueue(p)
 	}

+ 8 - 15
producer.go

@@ -17,13 +17,12 @@ import (
 // With MaxBufferTime and/or MaxBufferedBytes set to values > 0, sarama will
 // buffer messages before sending, to reduce traffic.
 type ProducerConfig struct {
-	Partitioner        Partitioner      // Chooses the partition to send messages to, or randomly if this is nil.
-	RequiredAcks       RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
-	Timeout            int32            // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
-	Compression        CompressionCodec // The type of compression to use on messages (defaults to no compression).
-	MaxBufferedBytes   uint32           // The maximum number of bytes to buffer per-broker before sending to Kafka.
-	MaxBufferTime      uint32           // The maximum number of milliseconds to buffer messages before sending to a broker.
-	MaxDeliveryRetries uint32           // The number of times to retry a failed message. You should always specify at least 1.
+	Partitioner      Partitioner      // Chooses the partition to send messages to, or randomly if this is nil.
+	RequiredAcks     RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
+	Timeout          int32            // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
+	Compression      CompressionCodec // The type of compression to use on messages (defaults to no compression).
+	MaxBufferedBytes uint32           // The maximum number of bytes to buffer per-broker before sending to Kafka.
+	MaxBufferTime    uint32           // The maximum number of milliseconds to buffer messages before sending to a broker.
 }
 
 // Producer publishes Kafka messages. It routes messages to the correct broker
@@ -82,10 +81,6 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 		return nil, ConfigurationError("Invalid Timeout")
 	}
 
-	if config.MaxDeliveryRetries < 1 {
-		Logger.Println("Warning: config.MaxDeliveryRetries is set dangerously low. This will lead to occasional data loss.")
-	}
-
 	if config.Partitioner == nil {
 		config.Partitioner = NewRandomPartitioner()
 	}
@@ -351,8 +346,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 			}
 		})
 		if overlimit > 0 {
-			Logger.Printf("[DATA LOSS] %d messages exceeded the retry limit of %d and were dropped.\n",
-				overlimit, p.config.MaxDeliveryRetries)
+			Logger.Printf("[DATA LOSS] cannot find a leader for %d messages, so they were dropped.\n", overlimit)
 			errorCb(fmt.Errorf("Dropped %d messages that exceeded the retry limit", overlimit))
 		}
 		return
@@ -390,8 +384,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 					}
 				})
 				if overlimit > 0 {
-					Logger.Printf("[DATA LOSS] %d messages exceeded the retry limit of %d and were dropped.\n",
-						overlimit, p.config.MaxDeliveryRetries)
+					Logger.Printf("[DATA LOSS] cannot find a leader for %d messages, so they were dropped.\n", overlimit)
 				}
 			default:
 				Logger.Printf("[DATA LOSS] Non-retriable error from kafka! Dropped up to %d messages for %s:%d.\n",

+ 1 - 2
producer_test.go

@@ -243,8 +243,7 @@ func TestFailureRetry(t *testing.T) {
 		RequiredAcks:  WaitForLocal,
 		MaxBufferTime: 1000000, // "never"
 		// So that we flush after the 2nd message.
-		MaxBufferedBytes:   uint32((len(TestMessage) * 2) - 1),
-		MaxDeliveryRetries: 1,
+		MaxBufferedBytes: uint32((len(TestMessage) * 2) - 1),
 	})
 	if err != nil {
 		t.Fatal(err)