Просмотр исходного кода

producer: permit throttling max messages per request

Related to #233
Evan Huus 11 лет назад
Родитель
Сommit
65d594f40d
1 измененных файлов с 8 добавлено и 2 удалено
  1. 8 2
      producer.go

+ 8 - 2
producer.go

@@ -23,6 +23,7 @@ type ProducerConfig struct {
 	FlushByteCount    int                    // If this many bytes of messages are accumulated, a flush will be triggered.
 	AckSuccesses      bool                   // If enabled, successfully delivered messages will be returned on the Successes channel.
 	MaxMessageBytes   int                    // The maximum permitted size of a message (defaults to 1000000)
+	MaxMessagesPerReq int                    // The maximum number of messages the producer will send in a single broker request. Defaults to 0 for unlimited. The global setting MaxRequestSize still applies.
 	ChannelBufferSize int                    // The size of the buffers of the channels between the different goroutines. Defaults to 0 (unbuffered).
 	RetryBackoff      time.Duration          // The amount of time to wait for the cluster to elect a new leader before processing retries. Defaults to 250ms.
 }
@@ -74,6 +75,10 @@ func (config *ProducerConfig) Validate() error {
 		Logger.Println("ProducerConfig.MaxMessageBytes too close to MaxRequestSize; it will be ignored.")
 	}
 
+	if config.MaxMessagesPerReq < 0 {
+		return ConfigurationError("Invalid MaxMessagesPerReq")
+	}
+
 	if config.RetryBackoff < 0 {
 		return ConfigurationError("Invalid RetryBackoff")
 	}
@@ -437,8 +442,9 @@ func (p *Producer) messageAggregator(broker *Broker, input chan *MessageToSend)
 			}
 
 			if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
-				(p.config.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.config.MaxMessageBytes) {
-				Logger.Println("producer/aggregator hit maximum request size, forcing blocking flush")
+				(p.config.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.config.MaxMessageBytes) ||
+				(p.config.MaxMessagesPerReq > 0 && len(buffer) >= p.config.MaxMessagesPerReq) {
+				Logger.Println("producer/aggregator maximum request accumulated, forcing blocking flush")
 				flusher <- buffer
 				buffer = nil
 				doFlush = nil