浏览代码

Configurable hard limit on the max size of requests

Also add a BackPressureThresholdBytes to the producer to make it behave better
in cases where we get too close to that limit.

See discussion on:
https://github.com/tylertreat/mq-benchmarking/pull/2
Evan Huus 11 年之前
父节点
当前提交
396afc57f0
共有 3 个文件被更改,包括 48 次插入15 次删除
  1. 4 0
      encoder_decoder.go
  2. 38 15
      producer.go
  3. 6 0
      sarama.go

+ 4 - 0
encoder_decoder.go

@@ -20,6 +20,10 @@ func encode(in encoder) ([]byte, error) {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
+	if prepEnc.length < 0 || uint32(prepEnc.length) > MaxRequestSize {
+		return nil, EncodingError
+	}
+
 	realEnc.raw = make([]byte, prepEnc.length)
 	realEnc.raw = make([]byte, prepEnc.length)
 	err = in.encode(&realEnc)
 	err = in.encode(&realEnc)
 	if err != nil {
 	if err != nil {

+ 38 - 15
producer.go

@@ -14,12 +14,22 @@ import (
 // mode, errors are not returned from SendMessage, but over the Errors()
 // mode, errors are not returned from SendMessage, but over the Errors()
 // channel.
 // channel.
 type ProducerConfig struct {
 type ProducerConfig struct {
-	Partitioner      Partitioner      // Chooses the partition to send messages to, or randomly if this is nil.
-	RequiredAcks     RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
-	Timeout          time.Duration    // The maximum duration the broker will wait the receipt of the number of RequiredAcks. This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated.
+	Partitioner  Partitioner  // Chooses the partition to send messages to, or randomly if this is nil.
+	RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
+
+	// The maximum duration the broker will wait the receipt of the number of RequiredAcks.
+	// This is only relevant when RequiredAcks is set to WaitForAll or a number > 1.
+	// Only supports millisecond resolution, nanoseconds will be truncated.
+	Timeout time.Duration
+
 	Compression      CompressionCodec // The type of compression to use on messages (defaults to no compression).
 	Compression      CompressionCodec // The type of compression to use on messages (defaults to no compression).
-	MaxBufferedBytes uint32           // The maximum number of bytes to buffer per-broker before sending to Kafka.
-	MaxBufferTime    time.Duration    // The maximum duration to buffer messages before sending to a broker.
+	MaxBufferedBytes uint32           // The threshold number of bytes buffered before triggering a flush to the broker.
+	MaxBufferTime    time.Duration    // The maximum duration to buffer messages before triggering a flush to the broker.
+
+	// The maximum number of bytes allowed to accumulare in the buffer before back-pressure is applied to QueueMessage.
+	// Without this, queueing messages too fast will cause the producer to construct requests larger than the MaxRequestSize.
+	// Defaults to 50 MiB, cannot be more than (MaxRequestSize - 10 KiB).
+	BackPressureThresholdBytes uint32
 }
 }
 
 
 // Producer publishes Kafka messages. It routes messages to the correct broker
 // Producer publishes Kafka messages. It routes messages to the correct broker
@@ -177,7 +187,7 @@ func (p *Producer) addMessage(msg *produceMessage) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	bp.addMessage(msg, p.config.MaxBufferedBytes)
+	bp.addMessage(msg, p.config.MaxBufferedBytes, p.config.BackPressureThresholdBytes)
 	return nil
 	return nil
 }
 }
 
 
@@ -250,7 +260,7 @@ func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
 	return bp
 	return bp
 }
 }
 
 
-func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32) {
+func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes, backPressureThreshold uint32) {
 	bp.mapM.Lock()
 	bp.mapM.Lock()
 	if msg.retried {
 	if msg.retried {
 		// Prepend: Deliver first, before any more recently-added messages.
 		// Prepend: Deliver first, before any more recently-added messages.
@@ -267,14 +277,18 @@ func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32)
 	}
 	}
 
 
 	bp.mapM.Unlock()
 	bp.mapM.Unlock()
-	bp.flushIfOverCapacity(maxBufferBytes)
+	bp.flushIfOverCapacity(maxBufferBytes, backPressureThreshold)
 }
 }
 
 
-func (bp *brokerProducer) flushIfOverCapacity(maxBufferBytes uint32) {
+func (bp *brokerProducer) flushIfOverCapacity(maxBufferBytes, backPressureThreshold uint32) {
 	bp.mapM.Lock()
 	bp.mapM.Lock()
-	over := bp.bufferedBytes > maxBufferBytes
+	softlimit := bp.bufferedBytes > maxBufferBytes
+	hardlimit := bp.bufferedBytes > backPressureThreshold
 	bp.mapM.Unlock()
 	bp.mapM.Unlock()
-	if over {
+
+	if hardlimit {
+		bp.flushNow <- true
+	} else if softlimit {
 		select {
 		select {
 		case bp.flushNow <- true:
 		case bp.flushNow <- true:
 		default:
 		default:
@@ -473,10 +487,11 @@ func (p *Producer) choosePartition(topic string, key Encoder) (int32, error) {
 // NewProducerConfig creates a new ProducerConfig instance with sensible defaults.
 // NewProducerConfig creates a new ProducerConfig instance with sensible defaults.
 func NewProducerConfig() *ProducerConfig {
 func NewProducerConfig() *ProducerConfig {
 	return &ProducerConfig{
 	return &ProducerConfig{
-		Partitioner:      NewRandomPartitioner(),
-		RequiredAcks:     WaitForLocal,
-		MaxBufferTime:    1 * time.Millisecond,
-		MaxBufferedBytes: 1,
+		Partitioner:                NewRandomPartitioner(),
+		RequiredAcks:               WaitForLocal,
+		MaxBufferTime:              1 * time.Millisecond,
+		MaxBufferedBytes:           1,
+		BackPressureThresholdBytes: 50 * 1024 * 1024,
 	}
 	}
 }
 }
 
 
@@ -505,5 +520,13 @@ func (config *ProducerConfig) Validate() error {
 		return ConfigurationError("No partitioner set")
 		return ConfigurationError("No partitioner set")
 	}
 	}
 
 
+	if config.BackPressureThresholdBytes < config.MaxBufferedBytes {
+		return ConfigurationError("BackPressureThresholdBytes cannot be less than MaxBufferedBytes")
+	}
+
+	if config.BackPressureThresholdBytes > MaxRequestSize-10*1024 {
+		return ConfigurationError("BackPressureThresholdBytes must be at least 10KiB less than MaxRequestSize")
+	}
+
 	return nil
 	return nil
 }
 }

+ 6 - 0
sarama.go

@@ -19,3 +19,9 @@ var Logger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags)
 // PanicHandler is called for recovering from panics spawned internally to the library (and thus
 // PanicHandler is called for recovering from panics spawned internally to the library (and thus
 // not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
 // not recoverable by the caller's goroutine). Defaults to nil, which means panics are not recovered.
 var PanicHandler func(interface{})
 var PanicHandler func(interface{})
+
+// MaxRequestSize is the maximum size (in bytes) of any request that Sarama will attempt to send. Trying
+// to send a request larger than this will result in an EncodingError. The default of 100 MiB is aligned
+// with Kafka's default `socket.request.max.bytes`, which is the largest request the broker will attempt
+// to process.
+var MaxRequestSize uint32 = 100 * 1024 * 1024