|
|
@@ -54,7 +54,8 @@ type asyncProducer struct {
|
|
|
errors chan *ProducerError
|
|
|
input, successes, retries chan *ProducerMessage
|
|
|
|
|
|
- brokers map[*Broker]*brokerProducer
|
|
|
+ brokers map[*Broker]chan *ProducerMessage
|
|
|
+ brokerRefs map[chan *ProducerMessage]int
|
|
|
brokerLock sync.Mutex
|
|
|
}
|
|
|
|
|
|
@@ -82,13 +83,14 @@ func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
|
|
|
}
|
|
|
|
|
|
p := &asyncProducer{
|
|
|
- client: client,
|
|
|
- conf: client.Config(),
|
|
|
- errors: make(chan *ProducerError),
|
|
|
- input: make(chan *ProducerMessage),
|
|
|
- successes: make(chan *ProducerMessage),
|
|
|
- retries: make(chan *ProducerMessage),
|
|
|
- brokers: make(map[*Broker]*brokerProducer),
|
|
|
+ client: client,
|
|
|
+ conf: client.Config(),
|
|
|
+ errors: make(chan *ProducerError),
|
|
|
+ input: make(chan *ProducerMessage),
|
|
|
+ successes: make(chan *ProducerMessage),
|
|
|
+ retries: make(chan *ProducerMessage),
|
|
|
+ brokers: make(map[*Broker]chan *ProducerMessage),
|
|
|
+ brokerRefs: make(map[chan *ProducerMessage]int),
|
|
|
}
|
|
|
|
|
|
// launch our singleton dispatchers
|
|
|
@@ -340,7 +342,7 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
|
|
|
retryState[msg.retries].expectChaser = true
|
|
|
output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1}
|
|
|
Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
- p.unrefBrokerProducer(leader)
|
|
|
+ p.unrefBrokerProducer(leader, output)
|
|
|
output = nil
|
|
|
time.Sleep(p.conf.Producer.Retry.Backoff)
|
|
|
} else if highWatermark > 0 {
|
|
|
@@ -406,7 +408,9 @@ func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input ch
|
|
|
output <- msg
|
|
|
}
|
|
|
|
|
|
- p.unrefBrokerProducer(leader)
|
|
|
+ if output != nil {
|
|
|
+ p.unrefBrokerProducer(leader, output)
|
|
|
+ }
|
|
|
p.retries <- &ProducerMessage{flags: unref}
|
|
|
}
|
|
|
|
|
|
@@ -529,9 +533,10 @@ func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
continue
|
|
|
default:
|
|
|
Logger.Printf("producer/flusher/%d state change to [closing] because %s\n", broker.ID(), err)
|
|
|
- closing = err
|
|
|
- _ = broker.Close()
|
|
|
+ p.abandonBrokerConnection(broker)
|
|
|
p.retryMessages(batch, err)
|
|
|
+ _ = broker.Close()
|
|
|
+ closing = err
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
@@ -769,43 +774,43 @@ func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-type brokerProducer struct {
|
|
|
- input chan *ProducerMessage
|
|
|
- refs int
|
|
|
-}
|
|
|
-
|
|
|
func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
|
|
|
p.brokerLock.Lock()
|
|
|
defer p.brokerLock.Unlock()
|
|
|
|
|
|
- producer := p.brokers[broker]
|
|
|
+ bp := p.brokers[broker]
|
|
|
|
|
|
- if producer == nil {
|
|
|
+ if bp == nil {
|
|
|
p.retries <- &ProducerMessage{flags: ref}
|
|
|
- producer = &brokerProducer{
|
|
|
- refs: 1,
|
|
|
- input: make(chan *ProducerMessage),
|
|
|
- }
|
|
|
- p.brokers[broker] = producer
|
|
|
- go withRecover(func() { p.messageAggregator(broker, producer.input) })
|
|
|
- } else {
|
|
|
- producer.refs++
|
|
|
+ bp = make(chan *ProducerMessage)
|
|
|
+ p.brokers[broker] = bp
|
|
|
+ p.brokerRefs[bp] = 0
|
|
|
+ go withRecover(func() { p.messageAggregator(broker, bp) })
|
|
|
}
|
|
|
|
|
|
- return producer.input
|
|
|
+ p.brokerRefs[bp]++
|
|
|
+
|
|
|
+ return bp
|
|
|
}
|
|
|
|
|
|
-func (p *asyncProducer) unrefBrokerProducer(broker *Broker) {
|
|
|
+func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp chan *ProducerMessage) {
|
|
|
p.brokerLock.Lock()
|
|
|
defer p.brokerLock.Unlock()
|
|
|
|
|
|
- producer := p.brokers[broker]
|
|
|
+ p.brokerRefs[bp]--
|
|
|
+ if p.brokerRefs[bp] == 0 {
|
|
|
+ close(bp)
|
|
|
+ delete(p.brokerRefs, bp)
|
|
|
|
|
|
- if producer != nil {
|
|
|
- producer.refs--
|
|
|
- if producer.refs == 0 {
|
|
|
- close(producer.input)
|
|
|
+ if p.brokers[broker] == bp {
|
|
|
delete(p.brokers, broker)
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
|
|
|
+ p.brokerLock.Lock()
|
|
|
+ defer p.brokerLock.Unlock()
|
|
|
+
|
|
|
+ delete(p.brokers, broker)
|
|
|
+}
|