|
|
@@ -2,9 +2,10 @@ package sarama
|
|
|
|
|
|
// ProducerConfig is used to pass multiple configuration options to NewProducer.
|
|
|
type ProducerConfig struct {
|
|
|
- Partitioner Partitioner // Chooses the partition to send messages to, or randomly if this is nil.
|
|
|
- RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
|
|
|
- Timeout int32 // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
|
|
|
+ Partitioner Partitioner // Chooses the partition to send messages to, or randomly if this is nil.
|
|
|
+ RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
|
|
|
+ Timeout int32 // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
|
|
|
+ Compression CompressionCodec // The type of compression to use on messages (defaults to no compression).
|
|
|
}
|
|
|
|
|
|
// Producer publishes Kafka messages on a given topic. It routes messages to the correct broker, refreshing metadata as appropriate,
|
|
|
@@ -98,7 +99,7 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
|
|
|
}
|
|
|
|
|
|
request := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: p.config.Timeout}
|
|
|
- request.AddMessage(p.topic, partition, &Message{Key: keyBytes, Value: valBytes})
|
|
|
+ request.AddMessage(p.topic, partition, &Message{Codec: p.config.Compression, Key: keyBytes, Value: valBytes})
|
|
|
|
|
|
response, err := broker.Produce(p.client.id, request)
|
|
|
switch err {
|