|
@@ -239,6 +239,7 @@ func (p *Producer) Close() error {
|
|
|
///////////////////////////////////////////
|
|
///////////////////////////////////////////
|
|
|
|
|
|
|
|
// singleton
|
|
// singleton
|
|
|
|
|
+// dispatches messages by topic
|
|
|
func (p *Producer) topicDispatcher() {
|
|
func (p *Producer) topicDispatcher() {
|
|
|
handlers := make(map[string]chan *MessageToSend)
|
|
handlers := make(map[string]chan *MessageToSend)
|
|
|
|
|
|
|
@@ -287,6 +288,7 @@ func (p *Producer) topicDispatcher() {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// one per topic
|
|
// one per topic
|
|
|
|
|
+// partitions messages, then dispatches them by partition
|
|
|
func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend) {
|
|
func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend) {
|
|
|
handlers := make(map[int32]chan *MessageToSend)
|
|
handlers := make(map[int32]chan *MessageToSend)
|
|
|
partitioner := p.config.Partitioner()
|
|
partitioner := p.config.Partitioner()
|
|
@@ -321,6 +323,8 @@ func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// one per partition per topic
|
|
// one per partition per topic
|
|
|
|
|
+// dispatches messages to the appropriate broker
|
|
|
|
|
+// also responsible for maintaining message order during retries
|
|
|
func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *MessageToSend) {
|
|
func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *MessageToSend) {
|
|
|
var leader *Broker
|
|
var leader *Broker
|
|
|
var output chan *MessageToSend
|
|
var output chan *MessageToSend
|
|
@@ -401,6 +405,8 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// one per broker
|
|
// one per broker
|
|
|
|
|
+// groups messages together into appropriately-sized batches for sending to the broker
|
|
|
|
|
+// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
|
|
|
func (p *Producer) messageAggregator(broker *Broker, input chan *MessageToSend) {
|
|
func (p *Producer) messageAggregator(broker *Broker, input chan *MessageToSend) {
|
|
|
var ticker *time.Ticker
|
|
var ticker *time.Ticker
|
|
|
var timer <-chan time.Time
|
|
var timer <-chan time.Time
|
|
@@ -459,6 +465,7 @@ shutdown:
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// one per broker
|
|
// one per broker
|
|
|
|
|
+// takes a batch at a time from the messageAggregator and sends to the broker
|
|
|
func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
|
|
func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
|
|
|
var closing error
|
|
var closing error
|
|
|
currentRetries := make(map[string]map[int32]error)
|
|
currentRetries := make(map[string]map[int32]error)
|
|
@@ -560,6 +567,8 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// singleton
|
|
// singleton
|
|
|
|
|
+// effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
|
|
|
|
|
+// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
|
|
|
func (p *Producer) retryHandler() {
|
|
func (p *Producer) retryHandler() {
|
|
|
var buf []*MessageToSend
|
|
var buf []*MessageToSend
|
|
|
var msg *MessageToSend
|
|
var msg *MessageToSend
|