@@ -393,7 +393,8 @@ func (p *Producer) messageAggregator(broker *Broker, input chan *MessageToSend)
goto shutdown
}
- if bytesAccumulated+msg.byteSize() >= forceFlushThreshold() {
+ if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
+ (p.config.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.config.MaxMessageBytes) {
flusher <- buffer
buffer = nil
doFlush = nil