|
|
@@ -15,11 +15,11 @@ import (
|
|
|
// channel.
|
|
|
type ProducerConfig struct {
|
|
|
Partitioner Partitioner // Chooses the partition to send messages to, or randomly if this is nil.
|
|
|
- RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
|
|
|
- Timeout int32 // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
|
|
|
+ RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to WaitForLocal).
|
|
|
+ Timeout time.Duration // The maximum duration the broker will wait the receipt of the number of RequiredAcks. This is only relevant when RequiredAcks is set to WaitForAll or a number > 1. Only supports millisecond resolution, nanoseconds will be truncated.
|
|
|
Compression CompressionCodec // The type of compression to use on messages (defaults to no compression).
|
|
|
MaxBufferedBytes uint32 // The maximum number of bytes to buffer per-broker before sending to Kafka.
|
|
|
- MaxBufferTime uint32 // The maximum number of milliseconds to buffer messages before sending to a broker.
|
|
|
+ MaxBufferTime time.Duration // The maximum duration to buffer messages before sending to a broker.
|
|
|
}
|
|
|
|
|
|
// Producer publishes Kafka messages. It routes messages to the correct broker
|
|
|
@@ -207,13 +207,11 @@ func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
|
|
|
hasMessages: make(chan bool, 1),
|
|
|
}
|
|
|
|
|
|
- maxBufferTime := time.Duration(p.config.MaxBufferTime) * time.Millisecond
|
|
|
-
|
|
|
var wg sync.WaitGroup
|
|
|
wg.Add(1)
|
|
|
|
|
|
go func() {
|
|
|
- timer := time.NewTimer(maxBufferTime)
|
|
|
+ timer := time.NewTimer(p.config.MaxBufferTime)
|
|
|
var shutdownRequired bool
|
|
|
wg.Done()
|
|
|
for {
|
|
|
@@ -229,7 +227,7 @@ func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
|
|
|
case <-bp.stopper:
|
|
|
goto shutdown
|
|
|
}
|
|
|
- timer.Reset(maxBufferTime)
|
|
|
+ timer.Reset(p.config.MaxBufferTime)
|
|
|
}
|
|
|
shutdown:
|
|
|
delete(p.brokerProducers, bp.broker)
|
|
|
@@ -456,7 +454,7 @@ func NewProducerConfig() *ProducerConfig {
|
|
|
return &ProducerConfig{
|
|
|
Partitioner: NewRandomPartitioner(),
|
|
|
RequiredAcks: WaitForLocal,
|
|
|
- MaxBufferTime: 1,
|
|
|
+ MaxBufferTime: 1 * time.Millisecond,
|
|
|
MaxBufferedBytes: 1,
|
|
|
}
|
|
|
}
|
|
|
@@ -470,6 +468,8 @@ func (config *ProducerConfig) Validate() error {
|
|
|
|
|
|
if config.Timeout < 0 {
|
|
|
return ConfigurationError("Invalid Timeout")
|
|
|
+ } else if config.Timeout%time.Millisecond != 0 {
|
|
|
+ Logger.Println("ProducerConfig.Timeout only supports millisecond resolution; nanoseconds will be truncated.")
|
|
|
}
|
|
|
|
|
|
if config.MaxBufferedBytes == 0 {
|