|
@@ -6,6 +6,7 @@ import (
|
|
|
"time"
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/eapache/go-resiliency/breaker"
|
|
"github.com/eapache/go-resiliency/breaker"
|
|
|
|
|
+ "github.com/eapache/queue"
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
func forceFlushThreshold() int {
|
|
func forceFlushThreshold() int {
|
|
@@ -35,7 +36,7 @@ type AsyncProducer interface {
|
|
|
// Input is the input channel for the user to write messages to that they wish to send.
|
|
// Input is the input channel for the user to write messages to that they wish to send.
|
|
|
Input() chan<- *ProducerMessage
|
|
Input() chan<- *ProducerMessage
|
|
|
|
|
|
|
|
- // Successes is the success output channel back to the user when AckSuccesses is confured.
|
|
|
|
|
|
|
+ // Successes is the success output channel back to the user when AckSuccesses is enabled.
|
|
|
// If Return.Successes is true, you MUST read from this channel or the Producer will deadlock.
|
|
// If Return.Successes is true, you MUST read from this channel or the Producer will deadlock.
|
|
|
// It is suggested that you send and read messages together in a single select statement.
|
|
// It is suggested that you send and read messages together in a single select statement.
|
|
|
Successes() <-chan *ProducerMessage
|
|
Successes() <-chan *ProducerMessage
|
|
@@ -53,6 +54,7 @@ type asyncProducer struct {
|
|
|
|
|
|
|
|
errors chan *ProducerError
|
|
errors chan *ProducerError
|
|
|
input, successes, retries chan *ProducerMessage
|
|
input, successes, retries chan *ProducerMessage
|
|
|
|
|
+ inFlight sync.WaitGroup
|
|
|
|
|
|
|
|
brokers map[*Broker]chan *ProducerMessage
|
|
brokers map[*Broker]chan *ProducerMessage
|
|
|
brokerRefs map[chan *ProducerMessage]int
|
|
brokerRefs map[chan *ProducerMessage]int
|
|
@@ -94,7 +96,7 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// launch our singleton dispatchers
|
|
// launch our singleton dispatchers
|
|
|
- go withRecover(p.topicDispatcher)
|
|
|
|
|
|
|
+ go withRecover(p.dispatcher)
|
|
|
go withRecover(p.retryHandler)
|
|
go withRecover(p.retryHandler)
|
|
|
|
|
|
|
|
return p, nil
|
|
return p, nil
|
|
@@ -104,8 +106,6 @@ type flagSet int8
|
|
|
|
|
|
|
|
const (
|
|
const (
|
|
|
chaser flagSet = 1 << iota // message is last in a group that failed
|
|
chaser flagSet = 1 << iota // message is last in a group that failed
|
|
|
- ref // add a reference to a singleton channel
|
|
|
|
|
- unref // remove a reference from a singleton channel
|
|
|
|
|
shutdown // start the shutdown process
|
|
shutdown // start the shutdown process
|
|
|
)
|
|
)
|
|
|
|
|
|
|
@@ -136,6 +136,11 @@ func (m *ProducerMessage) byteSize() int {
|
|
|
return size
|
|
return size
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+func (m *ProducerMessage) clear() {
|
|
|
|
|
+ m.flags = 0
|
|
|
|
|
+ m.retries = 0
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
// ProducerError is the type of error generated when the producer fails to deliver a message.
|
|
// ProducerError is the type of error generated when the producer fails to deliver a message.
|
|
|
// It contains the original ProducerMessage as well as the actual error value.
|
|
// It contains the original ProducerMessage as well as the actual error value.
|
|
|
type ProducerError struct {
|
|
type ProducerError struct {
|
|
@@ -192,22 +197,14 @@ func (p *asyncProducer) Close() error {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (p *asyncProducer) AsyncClose() {
|
|
func (p *asyncProducer) AsyncClose() {
|
|
|
- go withRecover(func() {
|
|
|
|
|
- p.input <- &ProducerMessage{flags: shutdown}
|
|
|
|
|
- })
|
|
|
|
|
|
|
+ go withRecover(p.shutdown)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-///////////////////////////////////////////
|
|
|
|
|
-// In normal processing, a message flows through the following functions from top to bottom,
|
|
|
|
|
-// starting at topicDispatcher (which reads from Producer.input) and ending in flusher
|
|
|
|
|
-// (which sends the message to the broker). In cases where a message must be retried, it goes
|
|
|
|
|
-// through retryHandler before being returned to the top of the flow.
|
|
|
|
|
-///////////////////////////////////////////
|
|
|
|
|
-
|
|
|
|
|
// singleton
|
|
// singleton
|
|
|
// dispatches messages by topic
|
|
// dispatches messages by topic
|
|
|
-func (p *asyncProducer) topicDispatcher() {
|
|
|
|
|
|
|
+func (p *asyncProducer) dispatcher() {
|
|
|
handlers := make(map[string]chan *ProducerMessage)
|
|
handlers := make(map[string]chan *ProducerMessage)
|
|
|
|
|
+ shuttingDown := false
|
|
|
|
|
|
|
|
for msg := range p.input {
|
|
for msg := range p.input {
|
|
|
if msg == nil {
|
|
if msg == nil {
|
|
@@ -216,8 +213,22 @@ func (p *asyncProducer) topicDispatcher() {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if msg.flags&shutdown != 0 {
|
|
if msg.flags&shutdown != 0 {
|
|
|
- Logger.Println("Producer shutting down.")
|
|
|
|
|
- break
|
|
|
|
|
|
|
+ shuttingDown = true
|
|
|
|
|
+ p.inFlight.Done()
|
|
|
|
|
+ continue
|
|
|
|
|
+ } else if msg.retries == 0 {
|
|
|
|
|
+ if shuttingDown {
|
|
|
|
|
+ // we can't just call returnError here because that decrements the wait group,
|
|
|
|
|
+ // which hasn't been incremented yet for this message, and shouldn't be
|
|
|
|
|
+ pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
|
|
|
|
|
+ if p.conf.Producer.Return.Errors {
|
|
|
|
|
+ p.errors <- pErr
|
|
|
|
|
+ } else {
|
|
|
|
|
+ Logger.Println(pErr)
|
|
|
|
|
+ }
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ p.inFlight.Add(1)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) ||
|
|
if (p.conf.Producer.Compression == CompressionNone && msg.Value != nil && msg.Value.Length() > p.conf.Producer.MaxMessageBytes) ||
|
|
@@ -229,11 +240,8 @@ func (p *asyncProducer) topicDispatcher() {
|
|
|
|
|
|
|
|
handler := handlers[msg.Topic]
|
|
handler := handlers[msg.Topic]
|
|
|
if handler == nil {
|
|
if handler == nil {
|
|
|
- p.retries <- &ProducerMessage{flags: ref}
|
|
|
|
|
- newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
|
|
|
|
|
- topic := msg.Topic // block local because go's closure semantics suck
|
|
|
|
|
- go withRecover(func() { p.partitionDispatcher(topic, newHandler) })
|
|
|
|
|
- handler = newHandler
|
|
|
|
|
|
|
+ handler = make(chan *ProducerMessage, p.conf.ChannelBufferSize)
|
|
|
|
|
+ p.newTopicProducer(msg.Topic, handler)
|
|
|
handlers[msg.Topic] = handler
|
|
handlers[msg.Topic] = handler
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -243,155 +251,162 @@ func (p *asyncProducer) topicDispatcher() {
|
|
|
for _, handler := range handlers {
|
|
for _, handler := range handlers {
|
|
|
close(handler)
|
|
close(handler)
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- p.retries <- &ProducerMessage{flags: shutdown}
|
|
|
|
|
-
|
|
|
|
|
- for msg := range p.input {
|
|
|
|
|
- p.returnError(msg, ErrShuttingDown)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- if p.ownClient {
|
|
|
|
|
- err := p.client.Close()
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- Logger.Println("producer/shutdown failed to close the embedded client:", err)
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- close(p.errors)
|
|
|
|
|
- close(p.successes)
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// one per topic
|
|
// one per topic
|
|
|
// partitions messages, then dispatches them by partition
|
|
// partitions messages, then dispatches them by partition
|
|
|
-func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMessage) {
|
|
|
|
|
- handlers := make(map[int32]chan *ProducerMessage)
|
|
|
|
|
- partitioner := p.conf.Producer.Partitioner(topic)
|
|
|
|
|
- breaker := breaker.New(3, 1, 10*time.Second)
|
|
|
|
|
|
|
+type topicProducer struct {
|
|
|
|
|
+ parent *asyncProducer
|
|
|
|
|
+ topic string
|
|
|
|
|
+ input <-chan *ProducerMessage
|
|
|
|
|
+
|
|
|
|
|
+ breaker *breaker.Breaker
|
|
|
|
|
+ handlers map[int32]chan *ProducerMessage
|
|
|
|
|
+ partitioner Partitioner
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (p *asyncProducer) newTopicProducer(topic string, input <-chan *ProducerMessage) *topicProducer {
|
|
|
|
|
+ tp := &topicProducer{
|
|
|
|
|
+ parent: p,
|
|
|
|
|
+ topic: topic,
|
|
|
|
|
+ input: input,
|
|
|
|
|
+ breaker: breaker.New(3, 1, 10*time.Second),
|
|
|
|
|
+ handlers: make(map[int32]chan *ProducerMessage),
|
|
|
|
|
+ partitioner: p.conf.Producer.Partitioner(topic),
|
|
|
|
|
+ }
|
|
|
|
|
+ go withRecover(tp.dispatch)
|
|
|
|
|
+ return tp
|
|
|
|
|
+}
|
|
|
|
|
|
|
|
- for msg := range input {
|
|
|
|
|
|
|
+func (tp *topicProducer) dispatch() {
|
|
|
|
|
+ for msg := range tp.input {
|
|
|
if msg.retries == 0 {
|
|
if msg.retries == 0 {
|
|
|
- err := breaker.Run(func() error {
|
|
|
|
|
- return p.assignPartition(partitioner, msg)
|
|
|
|
|
- })
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- p.returnError(msg, err)
|
|
|
|
|
|
|
+ if err := tp.partitionMessage(msg); err != nil {
|
|
|
|
|
+ tp.parent.returnError(msg, err)
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- handler := handlers[msg.Partition]
|
|
|
|
|
|
|
+ handler := tp.handlers[msg.Partition]
|
|
|
if handler == nil {
|
|
if handler == nil {
|
|
|
- p.retries <- &ProducerMessage{flags: ref}
|
|
|
|
|
- newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
|
|
|
|
|
- topic := msg.Topic // block local because go's closure semantics suck
|
|
|
|
|
- partition := msg.Partition // block local because go's closure semantics suck
|
|
|
|
|
- go withRecover(func() { p.leaderDispatcher(topic, partition, newHandler) })
|
|
|
|
|
- handler = newHandler
|
|
|
|
|
- handlers[msg.Partition] = handler
|
|
|
|
|
|
|
+ handler = make(chan *ProducerMessage, tp.parent.conf.ChannelBufferSize)
|
|
|
|
|
+ tp.parent.newPartitionProducer(msg.Topic, msg.Partition, handler)
|
|
|
|
|
+ tp.handlers[msg.Partition] = handler
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
handler <- msg
|
|
handler <- msg
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- for _, handler := range handlers {
|
|
|
|
|
|
|
+ for _, handler := range tp.handlers {
|
|
|
close(handler)
|
|
close(handler)
|
|
|
}
|
|
}
|
|
|
- p.retries <- &ProducerMessage{flags: unref}
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-// one per partition per topic
|
|
|
|
|
-// dispatches messages to the appropriate broker
|
|
|
|
|
-// also responsible for maintaining message order during retries
|
|
|
|
|
-func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input chan *ProducerMessage) {
|
|
|
|
|
- var leader *Broker
|
|
|
|
|
- var output chan *ProducerMessage
|
|
|
|
|
|
|
+func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
|
|
|
|
|
+ var partitions []int32
|
|
|
|
|
|
|
|
- breaker := breaker.New(3, 1, 10*time.Second)
|
|
|
|
|
- doUpdate := func() (err error) {
|
|
|
|
|
- if err = p.client.RefreshMetadata(topic); err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
|
|
+ err := tp.breaker.Run(func() (err error) {
|
|
|
|
|
+ if tp.partitioner.RequiresConsistency() {
|
|
|
|
|
+ partitions, err = tp.parent.client.Partitions(msg.Topic)
|
|
|
|
|
+ } else {
|
|
|
|
|
+ partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
|
|
|
}
|
|
}
|
|
|
|
|
+ return
|
|
|
|
|
+ })
|
|
|
|
|
|
|
|
- if leader, err = p.client.Leader(topic, partition); err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
- output = p.getBrokerProducer(leader)
|
|
|
|
|
- return nil
|
|
|
|
|
|
|
+ numPartitions := int32(len(partitions))
|
|
|
|
|
+
|
|
|
|
|
+ if numPartitions == 0 {
|
|
|
|
|
+ return ErrLeaderNotAvailable
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- // try to prefetch the leader; if this doesn't work, we'll do a proper breaker-protected refresh-and-fetch
|
|
|
|
|
- // on the first message
|
|
|
|
|
- leader, _ = p.client.Leader(topic, partition)
|
|
|
|
|
- if leader != nil {
|
|
|
|
|
- output = p.getBrokerProducer(leader)
|
|
|
|
|
|
|
+ choice, err := tp.partitioner.Partition(msg, numPartitions)
|
|
|
|
|
+
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ } else if choice < 0 || choice >= numPartitions {
|
|
|
|
|
+ return ErrInvalidPartition
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ msg.Partition = partitions[choice]
|
|
|
|
|
+
|
|
|
|
|
+ return nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+// one per partition per topic
|
|
|
|
|
+// dispatches messages to the appropriate broker
|
|
|
|
|
+// also responsible for maintaining message order during retries
|
|
|
|
|
+type partitionProducer struct {
|
|
|
|
|
+ parent *asyncProducer
|
|
|
|
|
+ topic string
|
|
|
|
|
+ partition int32
|
|
|
|
|
+ input <-chan *ProducerMessage
|
|
|
|
|
+
|
|
|
|
|
+ leader *Broker
|
|
|
|
|
+ breaker *breaker.Breaker
|
|
|
|
|
+ output chan *ProducerMessage
|
|
|
|
|
+
|
|
|
// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
|
|
// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
|
|
|
// all other messages get buffered in retryState[msg.retries].buf to preserve ordering
|
|
// all other messages get buffered in retryState[msg.retries].buf to preserve ordering
|
|
|
// retryState[msg.retries].expectChaser simply tracks whether we've seen a chaser message for a given level (and
|
|
// retryState[msg.retries].expectChaser simply tracks whether we've seen a chaser message for a given level (and
|
|
|
// therefore whether our buffer is complete and safe to flush)
|
|
// therefore whether our buffer is complete and safe to flush)
|
|
|
- highWatermark := 0
|
|
|
|
|
- retryState := make([]struct {
|
|
|
|
|
- buf []*ProducerMessage
|
|
|
|
|
- expectChaser bool
|
|
|
|
|
- }, p.conf.Producer.Retry.Max+1)
|
|
|
|
|
-
|
|
|
|
|
- for msg := range input {
|
|
|
|
|
- if msg.retries > highWatermark {
|
|
|
|
|
- // new, higher, retry level; send off a chaser so that we know when everything "in between" has made it
|
|
|
|
|
- // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
|
|
|
|
|
- highWatermark = msg.retries
|
|
|
|
|
- Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
|
|
- retryState[msg.retries].expectChaser = true
|
|
|
|
|
- output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1}
|
|
|
|
|
- Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
|
|
- p.unrefBrokerProducer(leader, output)
|
|
|
|
|
- output = nil
|
|
|
|
|
- time.Sleep(p.conf.Producer.Retry.Backoff)
|
|
|
|
|
- } else if highWatermark > 0 {
|
|
|
|
|
|
|
+ highWatermark int
|
|
|
|
|
+ retryState []partitionRetryState
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+type partitionRetryState struct {
|
|
|
|
|
+ buf []*ProducerMessage
|
|
|
|
|
+ expectChaser bool
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (p *asyncProducer) newPartitionProducer(topic string, partition int32, input <-chan *ProducerMessage) *partitionProducer {
|
|
|
|
|
+ pp := &partitionProducer{
|
|
|
|
|
+ parent: p,
|
|
|
|
|
+ topic: topic,
|
|
|
|
|
+ partition: partition,
|
|
|
|
|
+ input: input,
|
|
|
|
|
+
|
|
|
|
|
+ breaker: breaker.New(3, 1, 10*time.Second),
|
|
|
|
|
+ retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
|
|
|
|
|
+ }
|
|
|
|
|
+ go withRecover(pp.dispatch)
|
|
|
|
|
+ return pp
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (pp *partitionProducer) dispatch() {
|
|
|
|
|
+ // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
|
|
|
|
|
+ // on the first message
|
|
|
|
|
+ pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
|
|
|
|
|
+ if pp.leader != nil {
|
|
|
|
|
+ pp.output = pp.parent.getBrokerProducer(pp.leader)
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ for msg := range pp.input {
|
|
|
|
|
+ if msg.retries > pp.highWatermark {
|
|
|
|
|
+ // a new, higher, retry level; handle it and then back off
|
|
|
|
|
+ pp.newHighWatermark(msg.retries)
|
|
|
|
|
+ time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
|
|
|
|
|
+ } else if pp.highWatermark > 0 {
|
|
|
// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
|
|
// we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
|
|
|
- if msg.retries < highWatermark {
|
|
|
|
|
|
|
+ if msg.retries < pp.highWatermark {
|
|
|
// in fact this message is not even the current retry level, so buffer it for now (unless it's a just a chaser)
|
|
// in fact this message is not even the current retry level, so buffer it for now (unless it's a just a chaser)
|
|
|
if msg.flags&chaser == chaser {
|
|
if msg.flags&chaser == chaser {
|
|
|
- retryState[msg.retries].expectChaser = false
|
|
|
|
|
|
|
+ pp.retryState[msg.retries].expectChaser = false
|
|
|
|
|
+ pp.parent.inFlight.Done() // this chaser is now handled and will be garbage collected
|
|
|
} else {
|
|
} else {
|
|
|
- retryState[msg.retries].buf = append(retryState[msg.retries].buf, msg)
|
|
|
|
|
|
|
+ pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
|
|
|
}
|
|
}
|
|
|
continue
|
|
continue
|
|
|
} else if msg.flags&chaser == chaser {
|
|
} else if msg.flags&chaser == chaser {
|
|
|
// this message is of the current retry level (msg.retries == highWatermark) and the chaser flag is set,
|
|
// this message is of the current retry level (msg.retries == highWatermark) and the chaser flag is set,
|
|
|
// meaning this retry level is done and we can go down (at least) one level and flush that
|
|
// meaning this retry level is done and we can go down (at least) one level and flush that
|
|
|
- retryState[highWatermark].expectChaser = false
|
|
|
|
|
- Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
|
|
- for {
|
|
|
|
|
- highWatermark--
|
|
|
|
|
- Logger.Printf("producer/leader state change to [flushing-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
|
|
-
|
|
|
|
|
- if output == nil {
|
|
|
|
|
- if err := breaker.Run(doUpdate); err != nil {
|
|
|
|
|
- p.returnErrors(retryState[highWatermark].buf, err)
|
|
|
|
|
- goto flushDone
|
|
|
|
|
- }
|
|
|
|
|
- Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- for _, msg := range retryState[highWatermark].buf {
|
|
|
|
|
- output <- msg
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- flushDone:
|
|
|
|
|
- retryState[highWatermark].buf = nil
|
|
|
|
|
- if retryState[highWatermark].expectChaser {
|
|
|
|
|
- Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
|
|
- break
|
|
|
|
|
- } else {
|
|
|
|
|
- Logger.Printf("producer/leader state change to [normal-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
|
|
- if highWatermark == 0 {
|
|
|
|
|
- break
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ pp.retryState[pp.highWatermark].expectChaser = false
|
|
|
|
|
+ pp.flushRetryBuffers()
|
|
|
|
|
+ pp.parent.inFlight.Done() // this chaser is now handled and will be garbage collected
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
@@ -399,46 +414,101 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
|
|
|
// if we made it this far then the current msg contains real data, and can be sent to the next goroutine
|
|
// if we made it this far then the current msg contains real data, and can be sent to the next goroutine
|
|
|
// without breaking any of our ordering guarantees
|
|
// without breaking any of our ordering guarantees
|
|
|
|
|
|
|
|
- if output == nil {
|
|
|
|
|
- if err := breaker.Run(doUpdate); err != nil {
|
|
|
|
|
- p.returnError(msg, err)
|
|
|
|
|
- time.Sleep(p.conf.Producer.Retry.Backoff)
|
|
|
|
|
|
|
+ if pp.output == nil {
|
|
|
|
|
+ if err := pp.updateLeader(); err != nil {
|
|
|
|
|
+ pp.parent.returnError(msg, err)
|
|
|
|
|
+ time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
- Logger.Printf("producer/leader selected broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
|
|
|
|
+ Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- output <- msg
|
|
|
|
|
|
|
+ pp.output <- msg
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if pp.output != nil {
|
|
|
|
|
+ pp.parent.unrefBrokerProducer(pp.leader, pp.output)
|
|
|
}
|
|
}
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (pp *partitionProducer) newHighWatermark(hwm int) {
|
|
|
|
|
+ Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
|
|
|
|
|
+ pp.highWatermark = hwm
|
|
|
|
|
+
|
|
|
|
|
+ // send off a chaser so that we know when everything "in between" has made it
|
|
|
|
|
+ // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
|
|
|
|
|
+ pp.retryState[pp.highWatermark].expectChaser = true
|
|
|
|
|
+ pp.parent.inFlight.Add(1) // we're generating a chaser message; track it so we don't shut down while it's still inflight
|
|
|
|
|
+ pp.output <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: chaser, retries: pp.highWatermark - 1}
|
|
|
|
|
+
|
|
|
|
|
+ // a new HWM means that our current broker selection is out of date
|
|
|
|
|
+ Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
|
|
|
|
|
+ pp.parent.unrefBrokerProducer(pp.leader, pp.output)
|
|
|
|
|
+ pp.output = nil
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (pp *partitionProducer) flushRetryBuffers() {
|
|
|
|
|
+ Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
|
|
|
|
|
+ for {
|
|
|
|
|
+ pp.highWatermark--
|
|
|
|
|
|
|
|
- if output != nil {
|
|
|
|
|
- p.unrefBrokerProducer(leader, output)
|
|
|
|
|
|
|
+ if pp.output == nil {
|
|
|
|
|
+ if err := pp.updateLeader(); err != nil {
|
|
|
|
|
+ pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
|
|
|
|
|
+ goto flushDone
|
|
|
|
|
+ }
|
|
|
|
|
+ Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ for _, msg := range pp.retryState[pp.highWatermark].buf {
|
|
|
|
|
+ pp.output <- msg
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ flushDone:
|
|
|
|
|
+ pp.retryState[pp.highWatermark].buf = nil
|
|
|
|
|
+ if pp.retryState[pp.highWatermark].expectChaser {
|
|
|
|
|
+ Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
|
|
|
|
|
+ break
|
|
|
|
|
+ } else if pp.highWatermark == 0 {
|
|
|
|
|
+ Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
|
|
|
|
|
+ break
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
- p.retries <- &ProducerMessage{flags: unref}
|
|
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+func (pp *partitionProducer) updateLeader() error {
|
|
|
|
|
+ return pp.breaker.Run(func() (err error) {
|
|
|
|
|
+ if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
|
|
|
|
|
+ return err
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ pp.output = pp.parent.getBrokerProducer(pp.leader)
|
|
|
|
|
+ return nil
|
|
|
|
|
+ })
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// one per broker
|
|
// one per broker
|
|
|
// groups messages together into appropriately-sized batches for sending to the broker
|
|
// groups messages together into appropriately-sized batches for sending to the broker
|
|
|
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
|
|
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
|
|
|
-func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
|
|
|
|
|
- var ticker *time.Ticker
|
|
|
|
|
- var timer <-chan time.Time
|
|
|
|
|
- if p.conf.Producer.Flush.Frequency > 0 {
|
|
|
|
|
- ticker = time.NewTicker(p.conf.Producer.Flush.Frequency)
|
|
|
|
|
- timer = ticker.C
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- var buffer []*ProducerMessage
|
|
|
|
|
- var doFlush chan []*ProducerMessage
|
|
|
|
|
- var bytesAccumulated int
|
|
|
|
|
- var defaultFlush bool
|
|
|
|
|
|
|
+func (p *asyncProducer) messageAggregator(broker *Broker, input <-chan *ProducerMessage) {
|
|
|
|
|
+ var (
|
|
|
|
|
+ timer <-chan time.Time
|
|
|
|
|
+ buffer []*ProducerMessage
|
|
|
|
|
+ flushTriggered chan []*ProducerMessage
|
|
|
|
|
+ bytesAccumulated int
|
|
|
|
|
+ defaultFlush bool
|
|
|
|
|
+ )
|
|
|
|
|
|
|
|
if p.conf.Producer.Flush.Frequency == 0 && p.conf.Producer.Flush.Bytes == 0 && p.conf.Producer.Flush.Messages == 0 {
|
|
if p.conf.Producer.Flush.Frequency == 0 && p.conf.Producer.Flush.Bytes == 0 && p.conf.Producer.Flush.Messages == 0 {
|
|
|
defaultFlush = true
|
|
defaultFlush = true
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- flusher := make(chan []*ProducerMessage)
|
|
|
|
|
- go withRecover(func() { p.flusher(broker, flusher) })
|
|
|
|
|
|
|
+ output := make(chan []*ProducerMessage)
|
|
|
|
|
+ go withRecover(func() { p.flusher(broker, output) })
|
|
|
|
|
|
|
|
for {
|
|
for {
|
|
|
select {
|
|
select {
|
|
@@ -450,10 +520,11 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
|
|
|
if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
|
|
if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
|
|
|
(p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) ||
|
|
(p.conf.Producer.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.conf.Producer.MaxMessageBytes) ||
|
|
|
(p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) {
|
|
(p.conf.Producer.Flush.MaxMessages > 0 && len(buffer) >= p.conf.Producer.Flush.MaxMessages) {
|
|
|
- Logger.Println("producer/aggregator maximum request accumulated, forcing blocking flush")
|
|
|
|
|
- flusher <- buffer
|
|
|
|
|
|
|
+ Logger.Printf("producer/aggregator/%d maximum request accumulated, forcing blocking flush\n", broker.ID())
|
|
|
|
|
+ output <- buffer
|
|
|
|
|
+ timer = nil
|
|
|
buffer = nil
|
|
buffer = nil
|
|
|
- doFlush = nil
|
|
|
|
|
|
|
+ flushTriggered = nil
|
|
|
bytesAccumulated = 0
|
|
bytesAccumulated = 0
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -464,30 +535,30 @@ func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMe
|
|
|
msg.flags&chaser == chaser ||
|
|
msg.flags&chaser == chaser ||
|
|
|
(p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) ||
|
|
(p.conf.Producer.Flush.Messages > 0 && len(buffer) >= p.conf.Producer.Flush.Messages) ||
|
|
|
(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
|
|
(p.conf.Producer.Flush.Bytes > 0 && bytesAccumulated >= p.conf.Producer.Flush.Bytes) {
|
|
|
- doFlush = flusher
|
|
|
|
|
|
|
+ flushTriggered = output
|
|
|
|
|
+ } else if p.conf.Producer.Flush.Frequency > 0 && timer == nil {
|
|
|
|
|
+ timer = time.After(p.conf.Producer.Flush.Frequency)
|
|
|
}
|
|
}
|
|
|
case <-timer:
|
|
case <-timer:
|
|
|
- doFlush = flusher
|
|
|
|
|
- case doFlush <- buffer:
|
|
|
|
|
|
|
+ flushTriggered = output
|
|
|
|
|
+ case flushTriggered <- buffer:
|
|
|
|
|
+ timer = nil
|
|
|
buffer = nil
|
|
buffer = nil
|
|
|
- doFlush = nil
|
|
|
|
|
|
|
+ flushTriggered = nil
|
|
|
bytesAccumulated = 0
|
|
bytesAccumulated = 0
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
shutdown:
|
|
shutdown:
|
|
|
- if ticker != nil {
|
|
|
|
|
- ticker.Stop()
|
|
|
|
|
- }
|
|
|
|
|
if len(buffer) > 0 {
|
|
if len(buffer) > 0 {
|
|
|
- flusher <- buffer
|
|
|
|
|
|
|
+ output <- buffer
|
|
|
}
|
|
}
|
|
|
- close(flusher)
|
|
|
|
|
|
|
+ close(output)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// one per broker
|
|
// one per broker
|
|
|
// takes a batch at a time from the messageAggregator and sends to the broker
|
|
// takes a batch at a time from the messageAggregator and sends to the broker
|
|
|
-func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
|
|
|
|
+func (p *asyncProducer) flusher(broker *Broker, input <-chan []*ProducerMessage) {
|
|
|
var closing error
|
|
var closing error
|
|
|
currentRetries := make(map[string]map[int32]error)
|
|
currentRetries := make(map[string]map[int32]error)
|
|
|
Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
|
|
Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
|
|
@@ -538,17 +609,15 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
default:
|
|
default:
|
|
|
Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
|
|
Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
|
|
|
p.abandonBrokerConnection(broker)
|
|
p.abandonBrokerConnection(broker)
|
|
|
- p.retryMessages(batch, err)
|
|
|
|
|
_ = broker.Close()
|
|
_ = broker.Close()
|
|
|
closing = err
|
|
closing = err
|
|
|
|
|
+ p.retryMessages(batch, err)
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
if response == nil {
|
|
if response == nil {
|
|
|
// this only happens when RequiredAcks is NoResponse, so we have to assume success
|
|
// this only happens when RequiredAcks is NoResponse, so we have to assume success
|
|
|
- if p.conf.Producer.Return.Successes {
|
|
|
|
|
- p.returnSuccesses(batch)
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ p.returnSuccesses(batch)
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -566,12 +635,10 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
switch block.Err {
|
|
switch block.Err {
|
|
|
case ErrNoError:
|
|
case ErrNoError:
|
|
|
// All the messages for this topic-partition were delivered successfully!
|
|
// All the messages for this topic-partition were delivered successfully!
|
|
|
- if p.conf.Producer.Return.Successes {
|
|
|
|
|
- for i := range msgs {
|
|
|
|
|
- msgs[i].Offset = block.Offset + int64(i)
|
|
|
|
|
- }
|
|
|
|
|
- p.returnSuccesses(msgs)
|
|
|
|
|
|
|
+ for i := range msgs {
|
|
|
|
|
+ msgs[i].Offset = block.Offset + int64(i)
|
|
|
}
|
|
}
|
|
|
|
|
+ p.returnSuccesses(msgs)
|
|
|
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
|
|
case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable,
|
|
|
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
|
|
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
|
|
|
Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
|
|
Logger.Printf("producer/flusher/%d state change to [retrying] on %s/%d because %v\n",
|
|
@@ -588,90 +655,55 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
Logger.Printf("producer/flusher/%d shut down\n", broker.ID())
|
|
Logger.Printf("producer/flusher/%d shut down\n", broker.ID())
|
|
|
- p.retries <- &ProducerMessage{flags: unref}
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// singleton
|
|
// singleton
|
|
|
-// effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
|
|
|
|
|
|
|
+// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
|
|
|
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
|
|
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
|
|
|
func (p *asyncProducer) retryHandler() {
|
|
func (p *asyncProducer) retryHandler() {
|
|
|
- var buf []*ProducerMessage
|
|
|
|
|
var msg *ProducerMessage
|
|
var msg *ProducerMessage
|
|
|
- refs := 0
|
|
|
|
|
- shuttingDown := false
|
|
|
|
|
|
|
+ buf := queue.New()
|
|
|
|
|
|
|
|
for {
|
|
for {
|
|
|
- if len(buf) == 0 {
|
|
|
|
|
|
|
+ if buf.Length() == 0 {
|
|
|
msg = <-p.retries
|
|
msg = <-p.retries
|
|
|
} else {
|
|
} else {
|
|
|
select {
|
|
select {
|
|
|
case msg = <-p.retries:
|
|
case msg = <-p.retries:
|
|
|
- case p.input <- buf[0]:
|
|
|
|
|
- buf = buf[1:]
|
|
|
|
|
|
|
+ case p.input <- buf.Peek().(*ProducerMessage):
|
|
|
|
|
+ buf.Remove()
|
|
|
continue
|
|
continue
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- if msg.flags&ref != 0 {
|
|
|
|
|
- refs++
|
|
|
|
|
- } else if msg.flags&unref != 0 {
|
|
|
|
|
- refs--
|
|
|
|
|
- if refs == 0 && shuttingDown {
|
|
|
|
|
- break
|
|
|
|
|
- }
|
|
|
|
|
- } else if msg.flags&shutdown != 0 {
|
|
|
|
|
- shuttingDown = true
|
|
|
|
|
- if refs == 0 {
|
|
|
|
|
- break
|
|
|
|
|
- }
|
|
|
|
|
- } else {
|
|
|
|
|
- buf = append(buf, msg)
|
|
|
|
|
|
|
+ if msg == nil {
|
|
|
|
|
+ return
|
|
|
}
|
|
}
|
|
|
- }
|
|
|
|
|
|
|
|
|
|
- close(p.retries)
|
|
|
|
|
- for i := range buf {
|
|
|
|
|
- p.input <- buf[i]
|
|
|
|
|
|
|
+ buf.Add(msg)
|
|
|
}
|
|
}
|
|
|
- close(p.input)
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
-///////////////////////////////////////////
|
|
|
|
|
-///////////////////////////////////////////
|
|
|
|
|
-
|
|
|
|
|
// utility functions
|
|
// utility functions
|
|
|
|
|
|
|
|
-func (p *asyncProducer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
|
|
|
|
|
- var partitions []int32
|
|
|
|
|
- var err error
|
|
|
|
|
-
|
|
|
|
|
- if partitioner.RequiresConsistency() {
|
|
|
|
|
- partitions, err = p.client.Partitions(msg.Topic)
|
|
|
|
|
- } else {
|
|
|
|
|
- partitions, err = p.client.WritablePartitions(msg.Topic)
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- numPartitions := int32(len(partitions))
|
|
|
|
|
|
|
+func (p *asyncProducer) shutdown() {
|
|
|
|
|
+ Logger.Println("Producer shutting down.")
|
|
|
|
|
+ p.inFlight.Add(1)
|
|
|
|
|
+ p.input <- &ProducerMessage{flags: shutdown}
|
|
|
|
|
|
|
|
- if numPartitions == 0 {
|
|
|
|
|
- return ErrLeaderNotAvailable
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ p.inFlight.Wait()
|
|
|
|
|
|
|
|
- choice, err := partitioner.Partition(msg, numPartitions)
|
|
|
|
|
-
|
|
|
|
|
- if err != nil {
|
|
|
|
|
- return err
|
|
|
|
|
- } else if choice < 0 || choice >= numPartitions {
|
|
|
|
|
- return ErrInvalidPartition
|
|
|
|
|
|
|
+ if p.ownClient {
|
|
|
|
|
+ err := p.client.Close()
|
|
|
|
|
+ if err != nil {
|
|
|
|
|
+ Logger.Println("producer/shutdown failed to close the embedded client:", err)
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- msg.Partition = partitions[choice]
|
|
|
|
|
-
|
|
|
|
|
- return nil
|
|
|
|
|
|
|
+ close(p.input)
|
|
|
|
|
+ close(p.retries)
|
|
|
|
|
+ close(p.errors)
|
|
|
|
|
+ close(p.successes)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
|
|
func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
|
|
@@ -737,14 +769,14 @@ func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessa
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
|
|
func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
|
|
|
- msg.flags = 0
|
|
|
|
|
- msg.retries = 0
|
|
|
|
|
|
|
+ msg.clear()
|
|
|
pErr := &ProducerError{Msg: msg, Err: err}
|
|
pErr := &ProducerError{Msg: msg, Err: err}
|
|
|
if p.conf.Producer.Return.Errors {
|
|
if p.conf.Producer.Return.Errors {
|
|
|
p.errors <- pErr
|
|
p.errors <- pErr
|
|
|
} else {
|
|
} else {
|
|
|
Logger.Println(pErr)
|
|
Logger.Println(pErr)
|
|
|
}
|
|
}
|
|
|
|
|
+ p.inFlight.Done()
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
|
|
func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
|
|
@@ -757,10 +789,14 @@ func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
|
|
|
|
|
|
|
|
func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
|
|
func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
|
|
|
for _, msg := range batch {
|
|
for _, msg := range batch {
|
|
|
- if msg != nil {
|
|
|
|
|
- msg.flags = 0
|
|
|
|
|
|
|
+ if msg == nil {
|
|
|
|
|
+ continue
|
|
|
|
|
+ }
|
|
|
|
|
+ if p.conf.Producer.Return.Successes {
|
|
|
|
|
+ msg.clear()
|
|
|
p.successes <- msg
|
|
p.successes <- msg
|
|
|
}
|
|
}
|
|
|
|
|
+ p.inFlight.Done()
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -785,7 +821,6 @@ func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage
|
|
|
bp := p.brokers[broker]
|
|
bp := p.brokers[broker]
|
|
|
|
|
|
|
|
if bp == nil {
|
|
if bp == nil {
|
|
|
- p.retries <- &ProducerMessage{flags: ref}
|
|
|
|
|
bp = make(chan *ProducerMessage)
|
|
bp = make(chan *ProducerMessage)
|
|
|
p.brokers[broker] = bp
|
|
p.brokers[broker] = bp
|
|
|
p.brokerRefs[bp] = 0
|
|
p.brokerRefs[bp] = 0
|