|
|
@@ -18,11 +18,12 @@ type ProducerConfig struct {
|
|
|
RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
|
|
|
Timeout time.Duration // The maximum duration the broker will wait the receipt of the number of RequiredAcks. This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated.
|
|
|
Compression CompressionCodec // The type of compression to use on messages (defaults to no compression).
|
|
|
- FlushMsgCount int // The number of messages needed to trigger a flush.
|
|
|
- FlushFrequency time.Duration // If this amount of time elapses without a flush, one will be queued.
|
|
|
- FlushByteCount int // If this many bytes of messages are accumulated, a flush will be triggered.
|
|
|
+ FlushMsgCount int // The number of messages needed to trigger a flush. This is a minimum, not an upper limit (use MaxMessagesPerReq for that).
|
|
|
+ FlushFrequency time.Duration // If this amount of time elapses without a flush, one will be queued. This is a minimum, not an upper limit.
|
|
|
+ FlushByteCount int // If this many bytes of messages are accumulated, a flush will be triggered. This is a minimum, not an upper limit.
|
|
|
AckSuccesses bool // If enabled, successfully delivered messages will be returned on the Successes channel.
|
|
|
MaxMessageBytes int // The maximum permitted size of a message (defaults to 1000000)
|
|
|
+ MaxMessagesPerReq int // The maximum number of messages the producer will send in a single broker request. Defaults to 0 for unlimited. The global setting MaxRequestSize still applies.
|
|
|
ChannelBufferSize int // The size of the buffers of the channels between the different goroutines. Defaults to 0 (unbuffered).
|
|
|
RetryBackoff time.Duration // The amount of time to wait for the cluster to elect a new leader before processing retries. Defaults to 250ms.
|
|
|
}
|
|
|
@@ -74,6 +75,10 @@ func (config *ProducerConfig) Validate() error {
|
|
|
Logger.Println("ProducerConfig.MaxMessageBytes too close to MaxRequestSize; it will be ignored.")
|
|
|
}
|
|
|
|
|
|
+ if config.MaxMessagesPerReq < 0 {
|
|
|
+ return ConfigurationError("Invalid MaxMessagesPerReq")
|
|
|
+ }
|
|
|
+
|
|
|
if config.RetryBackoff < 0 {
|
|
|
return ConfigurationError("Invalid RetryBackoff")
|
|
|
}
|
|
|
@@ -437,8 +442,9 @@ func (p *Producer) messageAggregator(broker *Broker, input chan *MessageToSend)
|
|
|
}
|
|
|
|
|
|
if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
|
|
|
- (p.config.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.config.MaxMessageBytes) {
|
|
|
- Logger.Println("producer/aggregator hit maximum request size, forcing blocking flush")
|
|
|
+ (p.config.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.config.MaxMessageBytes) ||
|
|
|
+ (p.config.MaxMessagesPerReq > 0 && len(buffer) >= p.config.MaxMessagesPerReq) {
|
|
|
+ Logger.Println("producer/aggregator maximum request accumulated, forcing blocking flush")
|
|
|
flusher <- buffer
|
|
|
buffer = nil
|
|
|
doFlush = nil
|