Browse Source

RetryLimit in producer shouldn't be configurable.

Burke Libbey 12 years ago
parent
commit
f6ca051218
2 changed files with 11 additions and 16 deletions
  1. 3 1
      produce_message.go
  2. 8 15
      producer.go

+ 3 - 1
produce_message.go

@@ -9,6 +9,8 @@ type produceMessage struct {
 	sync       bool
 }
 
+const retryLimit = 1
+
 type produceRequestBuilder []*produceMessage
 
 // If the message is synchronous, we manually send it and wait for a return.
@@ -33,7 +35,7 @@ func (msg *produceMessage) enqueue(p *Producer) error {
 }
 
 func (msg *produceMessage) reenqueue(p *Producer) error {
-	if msg.failures < p.config.MaxDeliveryRetries {
+	if msg.failures < retryLimit {
 		msg.failures++
 		return msg.enqueue(p)
 	}

+ 8 - 15
producer.go

@@ -17,13 +17,12 @@ import (
 // With MaxBufferTime and/or MaxBufferedBytes set to values > 0, sarama will
 // buffer messages before sending, to reduce traffic.
 type ProducerConfig struct {
-	Partitioner        Partitioner      // Chooses the partition to send messages to, or randomly if this is nil.
-	RequiredAcks       RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
-	Timeout            int32            // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
-	Compression        CompressionCodec // The type of compression to use on messages (defaults to no compression).
-	MaxBufferedBytes   uint32           // The maximum number of bytes to buffer per-broker before sending to Kafka.
-	MaxBufferTime      uint32           // The maximum number of milliseconds to buffer messages before sending to a broker.
-	MaxDeliveryRetries uint32           // The number of times to retry a failed message. You should always specify at least 1.
+	Partitioner      Partitioner      // Chooses the partition to send messages to, or randomly if this is nil.
+	RequiredAcks     RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
+	Timeout          int32            // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
+	Compression      CompressionCodec // The type of compression to use on messages (defaults to no compression).
+	MaxBufferedBytes uint32           // The maximum number of bytes to buffer per-broker before sending to Kafka.
+	MaxBufferTime    uint32           // The maximum number of milliseconds to buffer messages before sending to a broker.
 }
 
 // Producer publishes Kafka messages. It routes messages to the correct broker
@@ -82,10 +81,6 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 		return nil, ConfigurationError("Invalid Timeout")
 	}
 
-	if config.MaxDeliveryRetries < 1 {
-		Logger.Println("Warning: config.MaxDeliveryRetries is set dangerously low. This will lead to occasional data loss.")
-	}
-
 	if config.Partitioner == nil {
 		config.Partitioner = NewRandomPartitioner()
 	}
@@ -351,8 +346,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 			}
 		})
 		if overlimit > 0 {
-			Logger.Printf("[DATA LOSS] %d messages exceeded the retry limit of %d and were dropped.\n",
-				overlimit, p.config.MaxDeliveryRetries)
+			Logger.Printf("[DATA LOSS] %d cannot find a leader for %d messages %d, so they were dropped.\n", overlimit)
 			errorCb(fmt.Errorf("Dropped %d messages that exceeded the retry limit", overlimit))
 		}
 		return
@@ -390,8 +384,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 					}
 				})
 				if overlimit > 0 {
-					Logger.Printf("[DATA LOSS] %d messages exceeded the retry limit of %d and were dropped.\n",
-						overlimit, p.config.MaxDeliveryRetries)
+					Logger.Printf("[DATA LOSS] %d cannot find a leader for %d messages %d, so they were dropped.\n", overlimit)
 				}
 			default:
 				Logger.Printf("[DATA LOSS] Non-retriable error from kafka! Dropped up to %d messages for %s:%d.\n",