|
|
@@ -56,8 +56,8 @@ type asyncProducer struct {
|
|
|
input, successes, retries chan *ProducerMessage
|
|
|
inFlight sync.WaitGroup
|
|
|
|
|
|
- brokers map[*Broker]chan *ProducerMessage
|
|
|
- brokerRefs map[chan *ProducerMessage]int
|
|
|
+ brokers map[*Broker]chan<- *ProducerMessage
|
|
|
+ brokerRefs map[chan<- *ProducerMessage]int
|
|
|
brokerLock sync.Mutex
|
|
|
}
|
|
|
|
|
|
@@ -91,8 +91,8 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
|
|
|
input: make(chan *ProducerMessage),
|
|
|
successes: make(chan *ProducerMessage),
|
|
|
retries: make(chan *ProducerMessage),
|
|
|
- brokers: make(map[*Broker]chan *ProducerMessage),
|
|
|
- brokerRefs: make(map[chan *ProducerMessage]int),
|
|
|
+ brokers: make(map[*Broker]chan<- *ProducerMessage),
|
|
|
+ brokerRefs: make(map[chan<- *ProducerMessage]int),
|
|
|
}
|
|
|
|
|
|
// launch our singleton dispatchers
|
|
|
@@ -347,7 +347,7 @@ type partitionProducer struct {
|
|
|
|
|
|
leader *Broker
|
|
|
breaker *breaker.Breaker
|
|
|
- output chan *ProducerMessage
|
|
|
+ output chan<- *ProducerMessage
|
|
|
|
|
|
// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
|
|
|
// all other messages get buffered in retryState[msg.retries].buf to preserve ordering
|
|
|
@@ -491,37 +491,63 @@ func (pp *partitionProducer) updateLeader() error {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
+func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
|
|
|
+ input := make(chan *ProducerMessage)
|
|
|
+ bridge := make(chan []*ProducerMessage)
|
|
|
+
|
|
|
+ a := &aggregator{
|
|
|
+ parent: p,
|
|
|
+ broker: broker,
|
|
|
+ input: input,
|
|
|
+ output: bridge,
|
|
|
+ }
|
|
|
+ go withRecover(a.run)
|
|
|
+
|
|
|
+ f := &flusher{
|
|
|
+ parent: p,
|
|
|
+ broker: broker,
|
|
|
+ input: bridge,
|
|
|
+ }
|
|
|
+ go withRecover(f.run)
|
|
|
+
|
|
|
+ return input
|
|
|
+}
|
|
|
+
|
|
|
// one per broker
|
|
|
// groups messages together into appropriately-sized batches for sending to the broker
|
|
|
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
|
|
|
-func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *ProducerMessage) {
|
|
|
+type aggregator struct {
|
|
|
+ parent *asyncProducer
|
|
|
+ broker *Broker
|
|
|
+ input <-chan *ProducerMessage
|
|
|
+ output chan<- []*ProducerMessage
|
|
|
+}
|
|
|
+
|
|
|
+func (a *aggregator) run() {
|
|
|
var (
|
|
|
timer <-chan time.Time
|
|
|
buffer []*ProducerMessage
|
|
|
- flushTriggered chan []*ProducerMessage
|
|
|
+ flushTriggered chan<- []*ProducerMessage
|
|
|
bytesAccumulated int
|
|
|
defaultFlush bool
|
|
|
)
|
|
|
|
|
|
- if p.conf.Producer.Flush.Frequency == 0 && p.conf.Producer.Flush.Bytes == 0 && p.conf.Producer.Flush.Messages == 0 {
|
|
|
+ if a.parent.conf.Producer.Flush.Frequency == 0 && a.parent.conf.Producer.Flush.Bytes == 0 && a.parent.conf.Producer.Flush.Messages == 0 {
|
|
|
defaultFlush = true
|
|
|
}
|
|
|
|
|
|
- output := make(chan []*ProducerMessage)
|
|
|
- go withRecover(func() { p.flusher(broker, output) })
|
|
|
-
|
|
|
for {
|
|
|
select {
|
|
|
- case msg := <-input:
|
|
|
+ case msg := <-a.input:
|
|
|
if msg == nil {
|
|
|
goto shutdown
|
|
|
}
|
|
|
|
|
|
if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
|
|
|
- (p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) ||
|
|
|
- (p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) {
|
|
|
- Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", broker.ID())
|
|
|
- output <- buffer
|
|
|
+ (a.parent.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= a.parent.conf.Producer.MaxMessageBytes) ||
|
|
|
+ (a.parent.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.MaxMessages) {
|
|
|
+ Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID())
|
|
|
+ a.output <- buffer
|
|
|
timer = nil
|
|
|
buffer = nil
|
|
|
flushTriggered = nil
|
|
|
@@ -533,14 +559,14 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *Producer
|
|
|
|
|
|
if defaultFlush ||
|
|
|
msg.flags&chaser == chaser ||
|
|
|
- (p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) ||
|
|
|
- (p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
|
|
|
- flushTriggered = output
|
|
|
- } else if p.conf.Producer.Flush.Frequency > 0 && timer == nil {
|
|
|
- timer = time.After(p.conf.Producer.Flush.Frequency)
|
|
|
+ (a.parent.conf.Producer.Flush.Messages > 0 && len(buffer) >= a.parent.conf.Producer.Flush.Messages) ||
|
|
|
+ (a.parent.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= a.parent.conf.Producer.Flush.Bytes) {
|
|
|
+ flushTriggered = a.output
|
|
|
+ } else if a.parent.conf.Producer.Flush.Frequency > 0 && timer == nil {
|
|
|
+ timer = time.After(a.parent.conf.Producer.Flush.Frequency)
|
|
|
}
|
|
|
case <-timer:
|
|
|
- flushTriggered = output
|
|
|
+ flushTriggered = a.output
|
|
|
case flushTriggered <- buffer:
|
|
|
timer = nil
|
|
|
buffer = nil
|
|
|
@@ -551,21 +577,27 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *Producer
|
|
|
|
|
|
shutdown:
|
|
|
if len(buffer) > 0 {
|
|
|
- output <- buffer
|
|
|
+ a.output <- buffer
|
|
|
}
|
|
|
- close(output)
|
|
|
+ close(a.output)
|
|
|
}
|
|
|
|
|
|
// one per broker
|
|
|
// takes a batch at a time from the messageAggregator and sends to the broker
|
|
|
-func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) {
|
|
|
+type flusher struct {
|
|
|
+ parent *asyncProducer
|
|
|
+ broker *Broker
|
|
|
+ input <-chan []*ProducerMessage
|
|
|
+}
|
|
|
+
|
|
|
+func (f *flusher) run() {
|
|
|
var closing error
|
|
|
currentRetries := make(map[string]map[int32]error)
|
|
|
- Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
|
|
|
+ Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID())
|
|
|
|
|
|
- for batch := range input {
|
|
|
+ for batch := range f.input {
|
|
|
if closing != nil {
|
|
|
- p.retryMessages(batch, closing)
|
|
|
+ f.parent.retryMessages(batch, closing)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -576,10 +608,10 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
|
|
|
if msg.flags&chaser == chaser {
|
|
|
// we can start processing this topic/partition again
|
|
|
Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
|
|
|
- broker.ID(), msg.Topic, msg.Partition)
|
|
|
+ f.broker.ID(), msg.Topic, msg.Partition)
|
|
|
currentRetries[msg.Topic][msg.Partition] = nil
|
|
|
}
|
|
|
- p.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.Partition])
|
|
|
+ f.parent.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.Partition])
|
|
|
batch[i] = nil // to prevent it being returned/retried twice
|
|
|
continue
|
|
|
}
|
|
|
@@ -593,31 +625,31 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
|
|
|
partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg)
|
|
|
}
|
|
|
|
|
|
- request := p.buildRequest(msgSets)
|
|
|
+ request := f.parent.buildRequest(msgSets)
|
|
|
if request == nil {
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- response, err := broker.Produce(request)
|
|
|
+ response, err := f.broker.Produce(request)
|
|
|
|
|
|
switch err.(type) {
|
|
|
case nil:
|
|
|
break
|
|
|
case PacketEncodingError:
|
|
|
- p.returnErrors(batch, err)
|
|
|
+ f.parent.returnErrors(batch, err)
|
|
|
continue
|
|
|
default:
|
|
|
- Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
|
|
|
- p.abandonBrokerConnection(broker)
|
|
|
- _ = broker.Close()
|
|
|
+ Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err)
|
|
|
+ f.parent.abandonBrokerConnection(f.broker)
|
|
|
+ _ = f.broker.Close()
|
|
|
closing = err
|
|
|
- p.retryMessages(batch, err)
|
|
|
+ f.parent.retryMessages(batch, err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
if response == nil {
|
|
|
// this only happens when RequiredAcks is NoResponse, so we have to assume success
|
|
|
- p.returnSuccesses(batch)
|
|
|
+ f.parent.returnSuccesses(batch)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -628,7 +660,7 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
|
|
|
|
|
|
block := response.GetBlock(topic, partition)
|
|
|
if block == nil {
|
|
|
- p.returnErrors(msgs, ErrIncompleteResponse)
|
|
|
+ f.parent.returnErrors(msgs, ErrIncompleteResponse)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -638,23 +670,23 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
|
|
|
for i := range msgs {
|
|
|
msgs[i].Offset = block.Offset + int64(i)
|
|
|
}
|
|
|
- p.returnSuccesses(msgs)
|
|
|
+ f.parent.returnSuccesses(msgs)
|
|
|
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
|
|
|
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
|
|
|
Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
|
|
|
- broker.ID(), topic, partition, block.Err)
|
|
|
+ f.broker.ID(), topic, partition, block.Err)
|
|
|
if currentRetries[topic] == nil {
|
|
|
currentRetries[topic] = make(map[int32]error)
|
|
|
}
|
|
|
currentRetries[topic][partition] = block.Err
|
|
|
- p.retryMessages(msgs, block.Err)
|
|
|
+ f.parent.retryMessages(msgs, block.Err)
|
|
|
default:
|
|
|
- p.returnErrors(msgs, block.Err)
|
|
|
+ f.parent.returnErrors(msgs, block.Err)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- Logger.Printf("producer/flusher/%d shut down\n", broker.ID())
|
|
|
+ Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
|
|
|
}
|
|
|
|
|
|
// singleton
|
|
|
@@ -814,17 +846,16 @@ func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
|
|
|
+func (p *asyncProducer) getBrokerProducer(broker *Broker) chan<- *ProducerMessage {
|
|
|
p.brokerLock.Lock()
|
|
|
defer p.brokerLock.Unlock()
|
|
|
|
|
|
bp := p.brokers[broker]
|
|
|
|
|
|
if bp == nil {
|
|
|
- bp = make(chan *ProducerMessage)
|
|
|
+ bp = p.newBrokerProducer(broker)
|
|
|
p.brokers[broker] = bp
|
|
|
p.brokerRefs[bp] = 0
|
|
|
- go withRecover(func() { p.messageAggregator(broker, bp) })
|
|
|
}
|
|
|
|
|
|
p.brokerRefs[bp]++
|
|
|
@@ -832,7 +863,7 @@ func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage
|
|
|
return bp
|
|
|
}
|
|
|
|
|
|
-func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan *ProducerMessage) {
|
|
|
+func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan<- *ProducerMessage) {
|
|
|
p.brokerLock.Lock()
|
|
|
defer p.brokerLock.Unlock()
|
|
|
|