|
|
@@ -203,7 +203,7 @@ func (p *asyncProducer) AsyncClose() {
|
|
|
// singleton
|
|
|
// dispatches messages by topic
|
|
|
func (p *asyncProducer) dispatcher() {
|
|
|
- handlers := make(map[string]chan *ProducerMessage)
|
|
|
+ handlers := make(map[string]chan<- *ProducerMessage)
|
|
|
shuttingDown := false
|
|
|
|
|
|
for msg := range p.input {
|
|
|
@@ -240,8 +240,7 @@ func (p *asyncProducer) dispatcher() {
|
|
|
|
|
|
handler := handlers[msg.Topic]
|
|
|
if handler == nil {
|
|
|
- handler = make(chan *ProducerMessage, p.conf.ChannelBufferSize)
|
|
|
- p.newTopicProducer(msg.Topic, handler)
|
|
|
+ handler = p.newTopicProducer(msg.Topic)
|
|
|
handlers[msg.Topic] = handler
|
|
|
}
|
|
|
|
|
|
@@ -261,21 +260,22 @@ type topicProducer struct {
|
|
|
input <-chan *ProducerMessage
|
|
|
|
|
|
breaker *breaker.Breaker
|
|
|
- handlers map[int32]chan *ProducerMessage
|
|
|
+ handlers map[int32]chan<- *ProducerMessage
|
|
|
partitioner Partitioner
|
|
|
}
|
|
|
|
|
|
-func (p *asyncProducer) newTopicProducer(topic string, input <-chan *ProducerMessage) *topicProducer {
|
|
|
+func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
|
|
|
+ input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
|
|
|
tp := &topicProducer{
|
|
|
parent: p,
|
|
|
topic: topic,
|
|
|
input: input,
|
|
|
breaker: breaker.New(3, 1, 10*time.Second),
|
|
|
- handlers: make(map[int32]chan *ProducerMessage),
|
|
|
+ handlers: make(map[int32]chan<- *ProducerMessage),
|
|
|
partitioner: p.conf.Producer.Partitioner(topic),
|
|
|
}
|
|
|
go withRecover(tp.dispatch)
|
|
|
- return tp
|
|
|
+ return input
|
|
|
}
|
|
|
|
|
|
func (tp *topicProducer) dispatch() {
|
|
|
@@ -289,8 +289,7 @@ func (tp *topicProducer) dispatch() {
|
|
|
|
|
|
handler := tp.handlers[msg.Partition]
|
|
|
if handler == nil {
|
|
|
- handler = make(chan *ProducerMessage, tp.parent.conf.ChannelBufferSize)
|
|
|
- tp.parent.newPartitionProducer(msg.Topic, msg.Partition, handler)
|
|
|
+ handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
|
|
|
tp.handlers[msg.Partition] = handler
|
|
|
}
|
|
|
|
|
|
@@ -363,7 +362,8 @@ type partitionRetryState struct {
|
|
|
expectChaser bool
|
|
|
}
|
|
|
|
|
|
-func (p *asyncProducer) newPartitionProducer(topic string, partition int32, input <-chan *ProducerMessage) *partitionProducer {
|
|
|
+func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
|
|
|
+ input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
|
|
|
pp := &partitionProducer{
|
|
|
parent: p,
|
|
|
topic: topic,
|
|
|
@@ -374,7 +374,7 @@ func (p *asyncProducer) newPartitionProducer(topic string, partition int32, inpu
|
|
|
retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
|
|
|
}
|
|
|
go withRecover(pp.dispatch)
|
|
|
- return pp
|
|
|
+ return input
|
|
|
}
|
|
|
|
|
|
func (pp *partitionProducer) dispatch() {
|