|
|
@@ -117,7 +117,7 @@ type Producer struct {
|
|
|
errors chan *ProducerError
|
|
|
input, successes, retries chan *ProducerMessage
|
|
|
|
|
|
- brokers map[*Broker]*brokerWorker
|
|
|
+ brokers map[*Broker]*brokerProducer
|
|
|
brokerLock sync.Mutex
|
|
|
}
|
|
|
|
|
|
@@ -144,7 +144,7 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
|
|
|
input: make(chan *ProducerMessage),
|
|
|
successes: make(chan *ProducerMessage),
|
|
|
retries: make(chan *ProducerMessage),
|
|
|
- brokers: make(map[*Broker]*brokerWorker),
|
|
|
+ brokers: make(map[*Broker]*brokerProducer),
|
|
|
}
|
|
|
|
|
|
// launch our singleton dispatchers
|
|
|
@@ -382,7 +382,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *P
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- output = p.getBrokerWorker(leader)
|
|
|
+ output = p.getBrokerProducer(leader)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -390,7 +390,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *P
|
|
|
// on the first message
|
|
|
leader, _ = p.client.Leader(topic, partition)
|
|
|
if leader != nil {
|
|
|
- output = p.getBrokerWorker(leader)
|
|
|
+ output = p.getBrokerProducer(leader)
|
|
|
}
|
|
|
|
|
|
// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
|
|
|
@@ -412,7 +412,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *P
|
|
|
retryState[msg.retries].expectChaser = true
|
|
|
output <- &ProducerMessage{Topic: topic, partition: partition, flags: chaser, retries: msg.retries - 1}
|
|
|
Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
- p.unrefBrokerWorker(leader)
|
|
|
+ p.unrefBrokerProducer(leader)
|
|
|
output = nil
|
|
|
time.Sleep(p.config.RetryBackoff)
|
|
|
} else if highWatermark > 0 {
|
|
|
@@ -478,7 +478,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *P
|
|
|
output <- msg
|
|
|
}
|
|
|
|
|
|
- p.unrefBrokerWorker(leader)
|
|
|
+ p.unrefBrokerProducer(leader)
|
|
|
p.retries <- &ProducerMessage{flags: unref}
|
|
|
}
|
|
|
|
|
|
@@ -829,42 +829,42 @@ func (p *Producer) retryMessages(batch []*ProducerMessage, err error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-type brokerWorker struct {
|
|
|
+type brokerProducer struct {
|
|
|
input chan *ProducerMessage
|
|
|
refs int
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) getBrokerWorker(broker *Broker) chan *ProducerMessage {
|
|
|
+func (p *Producer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
|
|
|
p.brokerLock.Lock()
|
|
|
defer p.brokerLock.Unlock()
|
|
|
|
|
|
- worker := p.brokers[broker]
|
|
|
+ producer := p.brokers[broker]
|
|
|
|
|
|
- if worker == nil {
|
|
|
+ if producer == nil {
|
|
|
p.retries <- &ProducerMessage{flags: ref}
|
|
|
- worker = &brokerWorker{
|
|
|
+ producer = &brokerProducer{
|
|
|
refs: 1,
|
|
|
input: make(chan *ProducerMessage),
|
|
|
}
|
|
|
- p.brokers[broker] = worker
|
|
|
- go withRecover(func() { p.messageAggregator(broker, worker.input) })
|
|
|
+ p.brokers[broker] = producer
|
|
|
+ go withRecover(func() { p.messageAggregator(broker, producer.input) })
|
|
|
} else {
|
|
|
- worker.refs++
|
|
|
+ producer.refs++
|
|
|
}
|
|
|
|
|
|
- return worker.input
|
|
|
+ return producer.input
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) unrefBrokerWorker(broker *Broker) {
|
|
|
+func (p *Producer) unrefBrokerProducer(broker *Broker) {
|
|
|
p.brokerLock.Lock()
|
|
|
defer p.brokerLock.Unlock()
|
|
|
|
|
|
- worker := p.brokers[broker]
|
|
|
+ producer := p.brokers[broker]
|
|
|
|
|
|
- if worker != nil {
|
|
|
- worker.refs--
|
|
|
- if worker.refs == 0 {
|
|
|
- close(worker.input)
|
|
|
+ if producer != nil {
|
|
|
+ producer.refs--
|
|
|
+ if producer.refs == 0 {
|
|
|
+ close(producer.input)
|
|
|
delete(p.brokers, broker)
|
|
|
}
|
|
|
}
|