|
|
@@ -505,216 +505,251 @@ func (pp *partitionProducer) updateLeader() error {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
-// one per broker, constructs both an aggregator and a flusher
|
|
|
+// one per broker; also constructs an associated flusher
|
|
|
func (p *asyncProducer) newBrokerProducer(broker *Broker) chan<- *ProducerMessage {
|
|
|
- input := make(chan *ProducerMessage)
|
|
|
- bridge := make(chan *produceSet)
|
|
|
+ var (
|
|
|
+ input = make(chan *ProducerMessage)
|
|
|
+ bridge = make(chan *produceSet)
|
|
|
+ responses = make(chan *brokerProducerResponse)
|
|
|
+ )
|
|
|
|
|
|
- a := &aggregator{
|
|
|
- parent: p,
|
|
|
- broker: broker,
|
|
|
- input: input,
|
|
|
- output: bridge,
|
|
|
- buffer: newProduceSet(p),
|
|
|
- }
|
|
|
- go withRecover(a.run)
|
|
|
-
|
|
|
- f := &flusher{
|
|
|
+ bp := &brokerProducer{
|
|
|
parent: p,
|
|
|
broker: broker,
|
|
|
- input: bridge,
|
|
|
+ input: input,
|
|
|
+ output: bridge,
|
|
|
+ responses: responses,
|
|
|
+ buffer: newProduceSet(p),
|
|
|
currentRetries: make(map[string]map[int32]error),
|
|
|
}
|
|
|
- go withRecover(f.run)
|
|
|
+ go withRecover(bp.run)
|
|
|
+
|
|
|
+ // minimal bridge to make the network response `select`able
|
|
|
+ go withRecover(func() {
|
|
|
+ for set := range bridge {
|
|
|
+ request := set.buildRequest()
|
|
|
+
|
|
|
+ response, err := broker.Produce(request)
|
|
|
+
|
|
|
+ responses <- &brokerProducerResponse{
|
|
|
+ set: set,
|
|
|
+ err: err,
|
|
|
+ res: response,
|
|
|
+ }
|
|
|
+ }
|
|
|
+ close(responses)
|
|
|
+ })
|
|
|
|
|
|
return input
|
|
|
}
|
|
|
|
|
|
+type brokerProducerResponse struct {
|
|
|
+ set *produceSet
|
|
|
+ err error
|
|
|
+ res *ProduceResponse
|
|
|
+}
|
|
|
+
|
|
|
// groups messages together into appropriately-sized batches for sending to the broker
|
|
|
-// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
|
|
|
-type aggregator struct {
|
|
|
+// handles state related to retries etc
|
|
|
+type brokerProducer struct {
|
|
|
parent *asyncProducer
|
|
|
broker *Broker
|
|
|
- input <-chan *ProducerMessage
|
|
|
- output chan<- *produceSet
|
|
|
|
|
|
- buffer *produceSet
|
|
|
- timer <-chan time.Time
|
|
|
+ input <-chan *ProducerMessage
|
|
|
+ output chan<- *produceSet
|
|
|
+ responses <-chan *brokerProducerResponse
|
|
|
+
|
|
|
+ buffer *produceSet
|
|
|
+ timer <-chan time.Time
|
|
|
+ timerFired bool
|
|
|
+
|
|
|
+ closing error
|
|
|
+ currentRetries map[string]map[int32]error
|
|
|
}
|
|
|
|
|
|
-func (a *aggregator) run() {
|
|
|
+func (bp *brokerProducer) run() {
|
|
|
var output chan<- *produceSet
|
|
|
+ Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
|
|
|
|
|
|
for {
|
|
|
select {
|
|
|
- case msg := <-a.input:
|
|
|
+ case msg := <-bp.input:
|
|
|
if msg == nil {
|
|
|
goto shutdown
|
|
|
}
|
|
|
|
|
|
- if a.buffer.wouldOverflow(msg) {
|
|
|
- Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", a.broker.ID())
|
|
|
- a.output <- a.buffer
|
|
|
- a.reset()
|
|
|
- output = nil
|
|
|
+ if reason := bp.needsRetry(msg); reason != nil {
|
|
|
+ bp.parent.retryMessage(msg, reason)
|
|
|
+
|
|
|
+ if bp.closing == nil && msg.flags&chaser == chaser {
|
|
|
+ // we were retrying this partition but we can start processing again
|
|
|
+ delete(bp.currentRetries[msg.Topic], msg.Partition)
|
|
|
+ Logger.Printf("producer/broker/%d state change to [normal] on %s/%d\n",
|
|
|
+ bp.broker.ID(), msg.Topic, msg.Partition)
|
|
|
+ }
|
|
|
+
|
|
|
+ continue
|
|
|
+ }
|
|
|
+
|
|
|
+ if bp.buffer.wouldOverflow(msg) {
|
|
|
+ if err := bp.waitForSpace(msg); err != nil {
|
|
|
+ bp.parent.retryMessage(msg, err)
|
|
|
+ continue
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
- if err := a.buffer.add(msg); err != nil {
|
|
|
- a.parent.returnError(msg, err)
|
|
|
+ if err := bp.buffer.add(msg); err != nil {
|
|
|
+ bp.parent.returnError(msg, err)
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- if a.buffer.readyToFlush(msg) {
|
|
|
- output = a.output
|
|
|
- } else if a.parent.conf.Producer.Flush.Frequency > 0 && a.timer == nil {
|
|
|
- a.timer = time.After(a.parent.conf.Producer.Flush.Frequency)
|
|
|
+ if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
|
|
|
+ bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
|
|
|
}
|
|
|
- case <-a.timer:
|
|
|
- output = a.output
|
|
|
- case output <- a.buffer:
|
|
|
- a.reset()
|
|
|
+ case <-bp.timer:
|
|
|
+ bp.timerFired = true
|
|
|
+ case output <- bp.buffer:
|
|
|
+ bp.rollOver()
|
|
|
+ case response := <-bp.responses:
|
|
|
+ bp.handleResponse(response)
|
|
|
+ }
|
|
|
+
|
|
|
+ if bp.timerFired || bp.buffer.readyToFlush() {
|
|
|
+ output = bp.output
|
|
|
+ } else {
|
|
|
output = nil
|
|
|
}
|
|
|
}
|
|
|
|
|
|
shutdown:
|
|
|
- if !a.buffer.empty() {
|
|
|
- a.output <- a.buffer
|
|
|
+ for !bp.buffer.empty() {
|
|
|
+ select {
|
|
|
+ case response := <-bp.responses:
|
|
|
+ bp.handleResponse(response)
|
|
|
+ case bp.output <- bp.buffer:
|
|
|
+ bp.rollOver()
|
|
|
+ }
|
|
|
+ }
|
|
|
+ close(bp.output)
|
|
|
+ for response := range bp.responses {
|
|
|
+ bp.handleResponse(response)
|
|
|
}
|
|
|
- close(a.output)
|
|
|
-}
|
|
|
-
|
|
|
-func (a *aggregator) reset() {
|
|
|
- a.timer = nil
|
|
|
- a.buffer = newProduceSet(a.parent)
|
|
|
-}
|
|
|
-
|
|
|
-// takes a batch at a time from the aggregator and sends to the broker
|
|
|
-type flusher struct {
|
|
|
- parent *asyncProducer
|
|
|
- broker *Broker
|
|
|
- input <-chan *produceSet
|
|
|
|
|
|
- currentRetries map[string]map[int32]error
|
|
|
+ Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
|
|
|
}
|
|
|
|
|
|
-func (f *flusher) run() {
|
|
|
- var closing error
|
|
|
-
|
|
|
- Logger.Printf("producer/flusher/%d starting up\n", f.broker.ID())
|
|
|
+func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
|
|
|
+ if bp.closing != nil {
|
|
|
+ return bp.closing
|
|
|
+ }
|
|
|
|
|
|
- for batch := range f.input {
|
|
|
- if closing != nil {
|
|
|
- batch.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
|
|
|
- f.parent.retryMessages(msgs, closing)
|
|
|
- })
|
|
|
- continue
|
|
|
- }
|
|
|
+ if bp.currentRetries[msg.Topic] == nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
|
|
|
- set := f.filter(batch)
|
|
|
- if set.empty() {
|
|
|
- continue
|
|
|
- }
|
|
|
+ return bp.currentRetries[msg.Topic][msg.Partition]
|
|
|
+}
|
|
|
|
|
|
- request := set.buildRequest()
|
|
|
- response, err := f.broker.Produce(request)
|
|
|
+func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
|
|
|
+ Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
|
|
|
|
|
|
- switch err.(type) {
|
|
|
- case nil:
|
|
|
- break
|
|
|
- case PacketEncodingError:
|
|
|
- set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
|
|
|
- f.parent.returnErrors(msgs, err)
|
|
|
- })
|
|
|
- continue
|
|
|
- default:
|
|
|
- Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", f.broker.ID(), err)
|
|
|
- f.parent.abandonBrokerConnection(f.broker)
|
|
|
- _ = f.broker.Close()
|
|
|
- closing = err
|
|
|
- set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
|
|
|
- f.parent.retryMessages(msgs, err)
|
|
|
- })
|
|
|
- continue
|
|
|
- }
|
|
|
-
|
|
|
- if response == nil {
|
|
|
- // this only happens when RequiredAcks is NoResponse, so we have to assume success
|
|
|
- set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
|
|
|
- f.parent.returnSuccesses(msgs)
|
|
|
- })
|
|
|
- continue
|
|
|
+ for {
|
|
|
+ select {
|
|
|
+ case response := <-bp.responses:
|
|
|
+ bp.handleResponse(response)
|
|
|
+ // handling a response can change our state, so re-check some things
|
|
|
+ if reason := bp.needsRetry(msg); reason != nil {
|
|
|
+ return reason
|
|
|
+ } else if !bp.buffer.wouldOverflow(msg) {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ case bp.output <- bp.buffer:
|
|
|
+ bp.rollOver()
|
|
|
+ return nil
|
|
|
}
|
|
|
-
|
|
|
- f.parseResponse(set, response)
|
|
|
}
|
|
|
- Logger.Printf("producer/flusher/%d shut down\n", f.broker.ID())
|
|
|
}
|
|
|
|
|
|
-func (f *flusher) filter(batch *produceSet) *produceSet {
|
|
|
- set := newProduceSet(f.parent)
|
|
|
-
|
|
|
- batch.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
|
|
|
- for _, msg := range msgs {
|
|
|
-
|
|
|
- if f.currentRetries[msg.Topic] != nil && f.currentRetries[msg.Topic][msg.Partition] != nil {
|
|
|
- // we're currently retrying this partition so we need to filter out this message
|
|
|
- f.parent.retryMessages([]*ProducerMessage{msg}, f.currentRetries[msg.Topic][msg.Partition])
|
|
|
-
|
|
|
- if msg.flags&chaser == chaser {
|
|
|
- // ...but now we can start processing future messages again
|
|
|
- Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
|
|
|
- f.broker.ID(), msg.Topic, msg.Partition)
|
|
|
- delete(f.currentRetries[msg.Topic], msg.Partition)
|
|
|
- }
|
|
|
-
|
|
|
- continue
|
|
|
- }
|
|
|
+func (bp *brokerProducer) rollOver() {
|
|
|
+ bp.timer = nil
|
|
|
+ bp.timerFired = false
|
|
|
+ bp.buffer = newProduceSet(bp.parent)
|
|
|
+}
|
|
|
|
|
|
- if err := set.add(msg); err != nil {
|
|
|
- f.parent.returnError(msg, err)
|
|
|
- continue
|
|
|
- }
|
|
|
- }
|
|
|
- })
|
|
|
+func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
|
|
|
+ if response.err != nil {
|
|
|
+ bp.handleError(response.set, response.err)
|
|
|
+ } else {
|
|
|
+ bp.handleSuccess(response.set, response.res)
|
|
|
+ }
|
|
|
|
|
|
- return set
|
|
|
+ if bp.buffer.empty() {
|
|
|
+ bp.rollOver() // this can happen if the response invalidated our buffer
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-func (f *flusher) parseResponse(set *produceSet, response *ProduceResponse) {
|
|
|
+func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
|
|
|
// we iterate through the blocks in the request set, not the response, so that we notice
|
|
|
// if the response is missing a block completely
|
|
|
- set.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
|
|
|
+ sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
|
|
|
+ if response == nil {
|
|
|
+ // this only happens when RequiredAcks is NoResponse, so we have to assume success
|
|
|
+ bp.parent.returnSuccesses(msgs)
|
|
|
+ return
|
|
|
+ }
|
|
|
+
|
|
|
block := response.GetBlock(topic, partition)
|
|
|
if block == nil {
|
|
|
- f.parent.returnErrors(msgs, ErrIncompleteResponse)
|
|
|
+ bp.parent.returnErrors(msgs, ErrIncompleteResponse)
|
|
|
return
|
|
|
}
|
|
|
|
|
|
switch block.Err {
|
|
|
// Success
|
|
|
case ErrNoError:
|
|
|
- for i := range msgs {
|
|
|
- msgs[i].Offset = block.Offset + int64(i)
|
|
|
+ for i, msg := range msgs {
|
|
|
+ msg.Offset = block.Offset + int64(i)
|
|
|
}
|
|
|
- f.parent.returnSuccesses(msgs)
|
|
|
+ bp.parent.returnSuccesses(msgs)
|
|
|
// Retriable errors
|
|
|
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
|
|
|
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
|
|
|
- Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
|
|
|
- f.broker.ID(), topic, partition, block.Err)
|
|
|
- if f.currentRetries[topic] == nil {
|
|
|
- f.currentRetries[topic] = make(map[int32]error)
|
|
|
+ Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
|
|
|
+ bp.broker.ID(), topic, partition, block.Err)
|
|
|
+ if bp.currentRetries[topic] == nil {
|
|
|
+ bp.currentRetries[topic] = make(map[int32]error)
|
|
|
}
|
|
|
- f.currentRetries[topic][partition] = block.Err
|
|
|
- f.parent.retryMessages(msgs, block.Err)
|
|
|
+ bp.currentRetries[topic][partition] = block.Err
|
|
|
+ bp.parent.retryMessages(msgs, block.Err)
|
|
|
+ bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
|
|
|
// Other non-retriable errors
|
|
|
default:
|
|
|
- f.parent.returnErrors(msgs, block.Err)
|
|
|
+ bp.parent.returnErrors(msgs, block.Err)
|
|
|
}
|
|
|
})
|
|
|
}
|
|
|
|
|
|
+func (bp *brokerProducer) handleError(sent *produceSet, err error) {
|
|
|
+ switch err.(type) {
|
|
|
+ case PacketEncodingError:
|
|
|
+ sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
|
|
|
+ bp.parent.returnErrors(msgs, err)
|
|
|
+ })
|
|
|
+ default:
|
|
|
+ Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
|
|
|
+ bp.parent.abandonBrokerConnection(bp.broker)
|
|
|
+ _ = bp.broker.Close()
|
|
|
+ bp.closing = err
|
|
|
+ sent.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
|
|
|
+ bp.parent.retryMessages(msgs, err)
|
|
|
+ })
|
|
|
+ bp.buffer.eachPartition(func(topic string, partition int32, msgs []*ProducerMessage) {
|
|
|
+ bp.parent.retryMessages(msgs, err)
|
|
|
+ })
|
|
|
+ bp.rollOver()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
// singleton
|
|
|
// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
|
|
|
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
|
|
|
@@ -844,6 +879,20 @@ func (ps *produceSet) eachPartition(cb func(topic string, partition int32, msgs
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMessage {
|
|
|
+ if ps.msgs[topic] == nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ set := ps.msgs[topic][partition]
|
|
|
+ if set == nil {
|
|
|
+ return nil
|
|
|
+ }
|
|
|
+ ps.bufferBytes -= set.bufferBytes
|
|
|
+ ps.bufferCount -= len(set.msgs)
|
|
|
+ delete(ps.msgs[topic], partition)
|
|
|
+ return set.msgs
|
|
|
+}
|
|
|
+
|
|
|
func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
|
|
|
switch {
|
|
|
// Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
|
|
|
@@ -862,7 +911,7 @@ func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (ps *produceSet) readyToFlush(msg *ProducerMessage) bool {
|
|
|
+func (ps *produceSet) readyToFlush() bool {
|
|
|
switch {
|
|
|
// If we don't have any messages, nothing else matters
|
|
|
case ps.empty():
|
|
|
@@ -870,9 +919,6 @@ func (ps *produceSet) readyToFlush(msg *ProducerMessage) bool {
|
|
|
// If all three config values are 0, we always flush as-fast-as-possible
|
|
|
case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0:
|
|
|
return true
|
|
|
- // If the messages is ps chaser we must flush to maintain the state-machine
|
|
|
- case msg.flags&chaser == chaser:
|
|
|
- return true
|
|
|
// If we've passed the message trigger-point
|
|
|
case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages:
|
|
|
return true
|