|
|
@@ -96,7 +96,7 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
|
|
|
}
|
|
|
|
|
|
// launch our singleton dispatchers
|
|
|
- go withRecover(p.topicDispatcher)
|
|
|
+ go withRecover(p.dispatcher)
|
|
|
go withRecover(p.retryHandler)
|
|
|
|
|
|
return p, nil
|
|
|
@@ -200,16 +200,9 @@ func (p *asyncProducer) AsyncClose() {
|
|
|
go withRecover(p.shutdown)
|
|
|
}
|
|
|
|
|
|
-///////////////////////////////////////////
|
|
|
-// In normal processing, a message flows through the following functions from top to bottom,
|
|
|
-// starting at topicDispatcher (which reads from Producer.input) and ending in flusher
|
|
|
-// (which sends the message to the broker). In cases where a message must be retried, it goes
|
|
|
-// through retryHandler before being returned to the top of the flow.
|
|
|
-///////////////////////////////////////////
|
|
|
-
|
|
|
// singleton
|
|
|
// dispatches messages by topic
|
|
|
-func (p *asyncProducer) topicDispatcher() {
|
|
|
+func (p *asyncProducer) dispatcher() {
|
|
|
handlers := make(map[string]chan *ProducerMessage)
|
|
|
shuttingDown := false
|
|
|
|
|
|
@@ -247,10 +240,8 @@ func (p *asyncProducer) topicDispatcher() {
|
|
|
|
|
|
handler := handlers[msg.Topic]
|
|
|
if handler == nil {
|
|
|
- newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
|
|
|
- topic := msg.Topic // block local because go's closure semantics suck
|
|
|
- go withRecover(func() { p.partitionDispatcher(topic, newHandler) })
|
|
|
- handler = newHandler
|
|
|
+ handler = make(chan *ProducerMessage, p.conf.ChannelBufferSize)
|
|
|
+ p.newTopicProducer(msg.Topic, handler)
|
|
|
handlers[msg.Topic] = handler
|
|
|
}
|
|
|
|
|
|
@@ -264,133 +255,158 @@ func (p *asyncProducer) topicDispatcher() {
|
|
|
|
|
|
// one per topic
|
|
|
// partitions messages, then dispatches them by partition
|
|
|
-func (p *asyncProducer) partitionDispatcher(topic string, input <-chan *ProducerMessage) {
|
|
|
- handlers := make(map[int32]chan *ProducerMessage)
|
|
|
- partitioner := p.conf.Producer.Partitioner(topic)
|
|
|
- breaker := breaker.New(3, 1, 10*time.Second)
|
|
|
+type topicProducer struct {
|
|
|
+ parent *asyncProducer
|
|
|
+ topic string
|
|
|
+ input <-chan *ProducerMessage
|
|
|
+
|
|
|
+ breaker *breaker.Breaker
|
|
|
+ handlers map[int32]chan *ProducerMessage
|
|
|
+ partitioner Partitioner
|
|
|
+}
|
|
|
+
|
|
|
+func (p *asyncProducer) newTopicProducer(topic string, input <-chan *ProducerMessage) *topicProducer {
|
|
|
+ tp := &topicProducer{
|
|
|
+ parent: p,
|
|
|
+ topic: topic,
|
|
|
+ input: input,
|
|
|
+ breaker: breaker.New(3, 1, 10*time.Second),
|
|
|
+ handlers: make(map[int32]chan *ProducerMessage),
|
|
|
+ partitioner: p.conf.Producer.Partitioner(topic),
|
|
|
+ }
|
|
|
+ go withRecover(tp.dispatch)
|
|
|
+ return tp
|
|
|
+}
|
|
|
|
|
|
- for msg := range input {
|
|
|
+func (tp *topicProducer) dispatch() {
|
|
|
+ for msg := range tp.input {
|
|
|
if msg.retries == 0 {
|
|
|
- if err := p.assignPartition(breaker, partitioner, msg); err != nil {
|
|
|
- p.returnError(msg, err)
|
|
|
+ if err := tp.partitionMessage(msg); err != nil {
|
|
|
+ tp.parent.returnError(msg, err)
|
|
|
continue
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- handler := handlers[msg.Partition]
|
|
|
+ handler := tp.handlers[msg.Partition]
|
|
|
if handler == nil {
|
|
|
- newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
|
|
|
- topic := msg.Topic // block local because go's closure semantics suck
|
|
|
- partition := msg.Partition // block local because go's closure semantics suck
|
|
|
- go withRecover(func() { p.leaderDispatcher(topic, partition, newHandler) })
|
|
|
- handler = newHandler
|
|
|
- handlers[msg.Partition] = handler
|
|
|
+ handler = make(chan *ProducerMessage, tp.parent.conf.ChannelBufferSize)
|
|
|
+ tp.parent.newPartitionProducer(msg.Topic, msg.Partition, handler)
|
|
|
+ tp.handlers[msg.Partition] = handler
|
|
|
}
|
|
|
|
|
|
handler <- msg
|
|
|
}
|
|
|
|
|
|
- for _, handler := range handlers {
|
|
|
+ for _, handler := range tp.handlers {
|
|
|
close(handler)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-// one per partition per topic
|
|
|
-// dispatches messages to the appropriate broker
|
|
|
-// also responsible for maintaining message order during retries
|
|
|
-func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input <-chan *ProducerMessage) {
|
|
|
- var leader *Broker
|
|
|
- var output chan *ProducerMessage
|
|
|
+func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
|
|
|
+ var partitions []int32
|
|
|
|
|
|
- breaker := breaker.New(3, 1, 10*time.Second)
|
|
|
- doUpdate := func() (err error) {
|
|
|
- if err = p.client.RefreshMetadata(topic); err != nil {
|
|
|
- return err
|
|
|
+ err := tp.breaker.Run(func() (err error) {
|
|
|
+ if tp.partitioner.RequiresConsistency() {
|
|
|
+ partitions, err = tp.parent.client.Partitions(msg.Topic)
|
|
|
+ } else {
|
|
|
+ partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
|
|
|
}
|
|
|
+ return
|
|
|
+ })
|
|
|
|
|
|
- if leader, err = p.client.Leader(topic, partition); err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
|
|
|
- output = p.getBrokerProducer(leader)
|
|
|
- return nil
|
|
|
+ numPartitions := int32(len(partitions))
|
|
|
+
|
|
|
+ if numPartitions == 0 {
|
|
|
+ return ErrLeaderNotAvailable
|
|
|
}
|
|
|
|
|
|
- // try to prefetch the leader; if this doesn't work, we'll do a proper breaker-protected refresh-and-fetch
|
|
|
- // on the first message
|
|
|
- leader, _ = p.client.Leader(topic, partition)
|
|
|
- if leader != nil {
|
|
|
- output = p.getBrokerProducer(leader)
|
|
|
+ choice, err := tp.partitioner.Partition(msg, numPartitions)
|
|
|
+
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ } else if choice < 0 || choice >= numPartitions {
|
|
|
+ return ErrInvalidPartition
|
|
|
}
|
|
|
|
|
|
+ msg.Partition = partitions[choice]
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+// one per partition per topic
|
|
|
+// dispatches messages to the appropriate broker
|
|
|
+// also responsible for maintaining message order during retries
|
|
|
+type partitionProducer struct {
|
|
|
+ parent *asyncProducer
|
|
|
+ topic string
|
|
|
+ partition int32
|
|
|
+ input <-chan *ProducerMessage
|
|
|
+
|
|
|
+ leader *Broker
|
|
|
+ breaker *breaker.Breaker
|
|
|
+ output chan *ProducerMessage
|
|
|
+
|
|
|
// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
|
|
|
// all other messages get buffered in retryState[msg.retries].buf to preserve ordering
|
|
|
// retryState[msg.retries].expectChaser simply tracks whether we've seen a chaser message for a given level (and
|
|
|
// therefore whether our buffer is complete and safe to flush)
|
|
|
- highWatermark := 0
|
|
|
- retryState := make([]struct {
|
|
|
- buf []*ProducerMessage
|
|
|
- expectChaser bool
|
|
|
- }, p.conf.Producer.Retry.Max+1)
|
|
|
-
|
|
|
- for msg := range input {
|
|
|
- if msg.retries > highWatermark {
|
|
|
- // new, higher, retry level; send off a chaser so that we know when everything "in between" has made it
|
|
|
- // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
|
|
|
- highWatermark = msg.retries
|
|
|
- Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", topic, partition, highWatermark)
|
|
|
- retryState[msg.retries].expectChaser = true
|
|
|
- p.inFlight.Add(1) // we're generating a chaser message; track it so we don't shut down while it's still inflight
|
|
|
- output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1}
|
|
|
- Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", topic, partition, leader.ID())
|
|
|
- p.unrefBrokerProducer(leader, output)
|
|
|
- output = nil
|
|
|
- time.Sleep(p.conf.Producer.Retry.Backoff)
|
|
|
- } else if highWatermark > 0 {
|
|
|
+ highWatermark int
|
|
|
+ retryState []partitionRetryState
|
|
|
+}
|
|
|
+
|
|
|
+type partitionRetryState struct {
|
|
|
+ buf []*ProducerMessage
|
|
|
+ expectChaser bool
|
|
|
+}
|
|
|
+
|
|
|
+func (p *asyncProducer) newPartitionProducer(topic string, partition int32, input <-chan *ProducerMessage) *partitionProducer {
|
|
|
+ pp := &partitionProducer{
|
|
|
+ parent: p,
|
|
|
+ topic: topic,
|
|
|
+ partition: partition,
|
|
|
+ input: input,
|
|
|
+
|
|
|
+ breaker: breaker.New(3, 1, 10*time.Second),
|
|
|
+ retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
|
|
|
+ }
|
|
|
+ go withRecover(pp.dispatch)
|
|
|
+ return pp
|
|
|
+}
|
|
|
+
|
|
|
+func (pp *partitionProducer) dispatch() {
|
|
|
+ // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
|
|
|
+ // on the first message
|
|
|
+ pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
|
|
|
+ if pp.leader != nil {
|
|
|
+ pp.output = pp.parent.getBrokerProducer(pp.leader)
|
|
|
+ }
|
|
|
+
|
|
|
+ for msg := range pp.input {
|
|
|
+ if msg.retries > pp.highWatermark {
|
|
|
+ // a new, higher, retry level; handle it and then back off
|
|
|
+ pp.newHighWatermark(msg.retries)
|
|
|
+ time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
|
|
|
+ } else if pp.highWatermark > 0 {
|
|
|
// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
|
|
|
- if msg.retries < highWatermark {
|
|
|
+ if msg.retries < pp.highWatermark {
|
|
|
// in fact this message is not even the current retry level, so buffer it for now (unless it's a just a chaser)
|
|
|
if msg.flags&chaser == chaser {
|
|
|
- retryState[msg.retries].expectChaser = false
|
|
|
- p.inFlight.Done() // this chaser is now handled and will be garbage collected
|
|
|
+ pp.retryState[msg.retries].expectChaser = false
|
|
|
+ pp.parent.inFlight.Done() // this chaser is now handled and will be garbage collected
|
|
|
} else {
|
|
|
- retryState[msg.retries].buf = append(retryState[msg.retries].buf, msg)
|
|
|
+ pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
|
|
|
}
|
|
|
continue
|
|
|
} else if msg.flags&chaser == chaser {
|
|
|
// this message is of the current retry level (msg.retries == highWatermark) and the chaser flag is set,
|
|
|
// meaning this retry level is done and we can go down (at least) one level and flush that
|
|
|
- retryState[highWatermark].expectChaser = false
|
|
|
- Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", topic, partition, highWatermark)
|
|
|
- for {
|
|
|
- highWatermark--
|
|
|
-
|
|
|
- if output == nil {
|
|
|
- if err := breaker.Run(doUpdate); err != nil {
|
|
|
- p.returnErrors(retryState[highWatermark].buf, err)
|
|
|
- goto flushDone
|
|
|
- }
|
|
|
- Logger.Printf("producer/leader/%s/%d selected broker %d\n", topic, partition, leader.ID())
|
|
|
- }
|
|
|
-
|
|
|
- for _, msg := range retryState[highWatermark].buf {
|
|
|
- output <- msg
|
|
|
- }
|
|
|
-
|
|
|
- flushDone:
|
|
|
- retryState[highWatermark].buf = nil
|
|
|
- if retryState[highWatermark].expectChaser {
|
|
|
- Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", topic, partition, highWatermark)
|
|
|
- break
|
|
|
- } else {
|
|
|
- if highWatermark == 0 {
|
|
|
- Logger.Printf("producer/leader/%s/%d state change to [normal]\n", topic, partition)
|
|
|
- break
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- p.inFlight.Done() // this chaser is now handled and will be garbage collected
|
|
|
+ pp.retryState[pp.highWatermark].expectChaser = false
|
|
|
+ pp.flushRetryBuffers()
|
|
|
+ pp.parent.inFlight.Done() // this chaser is now handled and will be garbage collected
|
|
|
continue
|
|
|
}
|
|
|
}
|
|
|
@@ -398,23 +414,83 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input <-
|
|
|
// if we made it this far then the current msg contains real data, and can be sent to the next goroutine
|
|
|
// without breaking any of our ordering guarantees
|
|
|
|
|
|
- if output == nil {
|
|
|
- if err := breaker.Run(doUpdate); err != nil {
|
|
|
- p.returnError(msg, err)
|
|
|
- time.Sleep(p.conf.Producer.Retry.Backoff)
|
|
|
+ if pp.output == nil {
|
|
|
+ if err := pp.updateLeader(); err != nil {
|
|
|
+ pp.parent.returnError(msg, err)
|
|
|
+ time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
|
|
|
continue
|
|
|
}
|
|
|
- Logger.Printf("producer/leader/%s/%d selected broker %d\n", topic, partition, leader.ID())
|
|
|
+ Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
|
|
|
}
|
|
|
|
|
|
- output <- msg
|
|
|
+ pp.output <- msg
|
|
|
}
|
|
|
|
|
|
- if output != nil {
|
|
|
- p.unrefBrokerProducer(leader, output)
|
|
|
+ if pp.output != nil {
|
|
|
+ pp.parent.unrefBrokerProducer(pp.leader, pp.output)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (pp *partitionProducer) newHighWatermark(hwm int) {
|
|
|
+ Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
|
|
|
+ pp.highWatermark = hwm
|
|
|
+
|
|
|
+ // send off a chaser so that we know when everything "in between" has made it
|
|
|
+ // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
|
|
|
+ pp.retryState[pp.highWatermark].expectChaser = true
|
|
|
+ pp.parent.inFlight.Add(1) // we're generating a chaser message; track it so we don't shut down while it's still inflight
|
|
|
+ pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: chaser, retries: pp.highWatermark - 1}
|
|
|
+
|
|
|
+ // a new HWM means that our current broker selection is out of date
|
|
|
+ Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
|
|
|
+ pp.parent.unrefBrokerProducer(pp.leader, pp.output)
|
|
|
+ pp.output = nil
|
|
|
+}
|
|
|
+
|
|
|
+func (pp *partitionProducer) flushRetryBuffers() {
|
|
|
+ Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
|
|
|
+ for {
|
|
|
+ pp.highWatermark--
|
|
|
+
|
|
|
+ if pp.output == nil {
|
|
|
+ if err := pp.updateLeader(); err != nil {
|
|
|
+ pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
|
|
|
+ goto flushDone
|
|
|
+ }
|
|
|
+ Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
|
|
|
+ }
|
|
|
+
|
|
|
+ for _, msg := range pp.retryState[pp.highWatermark].buf {
|
|
|
+ pp.output <- msg
|
|
|
+ }
|
|
|
+
|
|
|
+ flushDone:
|
|
|
+ pp.retryState[pp.highWatermark].buf = nil
|
|
|
+ if pp.retryState[pp.highWatermark].expectChaser {
|
|
|
+ Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
|
|
|
+ break
|
|
|
+ } else if pp.highWatermark == 0 {
|
|
|
+ Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
|
|
|
+ break
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (pp *partitionProducer) updateLeader() error {
|
|
|
+ return pp.breaker.Run(func() (err error) {
|
|
|
+ if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ pp.output = pp.parent.getBrokerProducer(pp.leader)
|
|
|
+ return nil
|
|
|
+ })
|
|
|
+}
|
|
|
+
|
|
|
// one per broker
|
|
|
// groups messages together into appropriately-sized batches for sending to the broker
|
|
|
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
|
|
|
@@ -582,7 +658,7 @@ func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage)
|
|
|
}
|
|
|
|
|
|
// singleton
|
|
|
-// effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
|
|
|
+// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
|
|
|
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
|
|
|
func (p *asyncProducer) retryHandler() {
|
|
|
var msg *ProducerMessage
|
|
|
@@ -608,9 +684,6 @@ func (p *asyncProducer) retryHandler() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-///////////////////////////////////////////
|
|
|
-///////////////////////////////////////////
|
|
|
-
|
|
|
// utility functions
|
|
|
|
|
|
func (p *asyncProducer) shutdown() {
|
|
|
@@ -633,41 +706,6 @@ func (p *asyncProducer) shutdown() {
|
|
|
close(p.successes)
|
|
|
}
|
|
|
|
|
|
-func (p *asyncProducer) assignPartition(breaker *breaker.Breaker, partitioner Partitioner, msg *ProducerMessage) error {
|
|
|
- var partitions []int32
|
|
|
-
|
|
|
- err := breaker.Run(func() (err error) {
|
|
|
- if partitioner.RequiresConsistency() {
|
|
|
- partitions, err = p.client.Partitions(msg.Topic)
|
|
|
- } else {
|
|
|
- partitions, err = p.client.WritablePartitions(msg.Topic)
|
|
|
- }
|
|
|
- return
|
|
|
- })
|
|
|
-
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
- numPartitions := int32(len(partitions))
|
|
|
-
|
|
|
- if numPartitions == 0 {
|
|
|
- return ErrLeaderNotAvailable
|
|
|
- }
|
|
|
-
|
|
|
- choice, err := partitioner.Partition(msg, numPartitions)
|
|
|
-
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- } else if choice < 0 || choice >= numPartitions {
|
|
|
- return ErrInvalidPartition
|
|
|
- }
|
|
|
-
|
|
|
- msg.Partition = partitions[choice]
|
|
|
-
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
|
|
|
|
|
|
req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)}
|