|
|
@@ -12,14 +12,14 @@ func forceFlushThreshold() int {
|
|
|
return int(MaxRequestSize - (10 * 1024)) // 10KiB is safety room for misc. overhead, we might want to calculate this more precisely?
|
|
|
}
|
|
|
|
|
|
-// Producer publishes Kafka messages. It routes messages to the correct broker
|
|
|
-// for the provided topic-partition, refreshing metadata as appropriate, and
|
|
|
-// parses responses for errors. You must read from the Errors() channel or the
|
|
|
+// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
|
|
|
+// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
|
|
|
+// and parses responses for errors. You must read from the Errors() channel or the
|
|
|
// producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
|
|
|
// leaks: it will not be garbage-collected automatically when it passes out of
|
|
|
// scope (this is in addition to calling Close on the underlying client, which
|
|
|
// is still necessary).
|
|
|
-type Producer interface {
|
|
|
+type AsyncProducer interface {
|
|
|
|
|
|
// AsyncClose triggers a shutdown of the producer, flushing any messages it may have
|
|
|
// buffered. The shutdown has completed when both the Errors and Successes channels
|
|
|
@@ -47,7 +47,7 @@ type Producer interface {
|
|
|
Errors() <-chan *ProducerError
|
|
|
}
|
|
|
|
|
|
-type producer struct {
|
|
|
+type asyncProducer struct {
|
|
|
client Client
|
|
|
conf *Config
|
|
|
ownClient bool
|
|
|
@@ -59,29 +59,29 @@ type producer struct {
|
|
|
brokerLock sync.Mutex
|
|
|
}
|
|
|
|
|
|
-// NewProducer creates a new Producer using the given broker addresses and configuration.
|
|
|
-func NewProducer(addrs []string, conf *Config) (Producer, error) {
|
|
|
+// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
|
|
|
+func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
|
|
|
client, err := NewClient(addrs, conf)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- p, err := NewProducerFromClient(client)
|
|
|
+ p, err := NewAsyncProducerFromClient(client)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- p.(*producer).ownClient = true
|
|
|
+ p.(*asyncProducer).ownClient = true
|
|
|
return p, nil
|
|
|
}
|
|
|
|
|
|
-// NewProducerFromClient creates a new Producer using the given client.
|
|
|
-func NewProducerFromClient(client Client) (Producer, error) {
|
|
|
+// NewAsyncProducerFromClient creates a new Producer using the given client.
|
|
|
+func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
|
|
|
// Check that we are not dealing with a closed Client before processing any other arguments
|
|
|
if client.Closed() {
|
|
|
return nil, ErrClosedClient
|
|
|
}
|
|
|
|
|
|
- p := &producer{
|
|
|
+ p := &asyncProducer{
|
|
|
client: client,
|
|
|
conf: client.Config(),
|
|
|
errors: make(chan *ProducerError),
|
|
|
@@ -164,19 +164,19 @@ func (pe ProducerErrors) Error() string {
|
|
|
return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
|
|
|
}
|
|
|
|
|
|
-func (p *producer) Errors() <-chan *ProducerError {
|
|
|
+func (p *asyncProducer) Errors() <-chan *ProducerError {
|
|
|
return p.errors
|
|
|
}
|
|
|
|
|
|
-func (p *producer) Successes() <-chan *ProducerMessage {
|
|
|
+func (p *asyncProducer) Successes() <-chan *ProducerMessage {
|
|
|
return p.successes
|
|
|
}
|
|
|
|
|
|
-func (p *producer) Input() chan<- *ProducerMessage {
|
|
|
+func (p *asyncProducer) Input() chan<- *ProducerMessage {
|
|
|
return p.input
|
|
|
}
|
|
|
|
|
|
-func (p *producer) Close() error {
|
|
|
+func (p *asyncProducer) Close() error {
|
|
|
p.AsyncClose()
|
|
|
|
|
|
if p.conf.Producer.Return.Successes {
|
|
|
@@ -199,7 +199,7 @@ func (p *producer) Close() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (p *producer) AsyncClose() {
|
|
|
+func (p *asyncProducer) AsyncClose() {
|
|
|
go withRecover(func() {
|
|
|
p.input <- &ProducerMessage{flags: shutdown}
|
|
|
})
|
|
|
@@ -214,7 +214,7 @@ func (p *producer) AsyncClose() {
|
|
|
|
|
|
// singleton
|
|
|
// dispatches messages by topic
|
|
|
-func (p *producer) topicDispatcher() {
|
|
|
+func (p *asyncProducer) topicDispatcher() {
|
|
|
handlers := make(map[string]chan *ProducerMessage)
|
|
|
|
|
|
for msg := range p.input {
|
|
|
@@ -270,7 +270,7 @@ func (p *producer) topicDispatcher() {
|
|
|
|
|
|
// one per topic
|
|
|
// partitions messages, then dispatches them by partition
|
|
|
-func (p *producer) partitionDispatcher(topic string, input chan *ProducerMessage) {
|
|
|
+func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMessage) {
|
|
|
handlers := make(map[int32]chan *ProducerMessage)
|
|
|
partitioner := p.conf.Producer.Partitioner()
|
|
|
|
|
|
@@ -306,7 +306,7 @@ func (p *producer) partitionDispatcher(topic string, input chan *ProducerMessage
|
|
|
// one per partition per topic
|
|
|
// dispatches messages to the appropriate broker
|
|
|
// also responsible for maintaining message order during retries
|
|
|
-func (p *producer) leaderDispatcher(topic string, partition int32, input chan *ProducerMessage) {
|
|
|
+func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input chan *ProducerMessage) {
|
|
|
var leader *Broker
|
|
|
var output chan *ProducerMessage
|
|
|
|
|
|
@@ -423,7 +423,7 @@ func (p *producer) leaderDispatcher(topic string, partition int32, input chan *P
|
|
|
// one per broker
|
|
|
// groups messages together into appropriately-sized batches for sending to the broker
|
|
|
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
|
|
|
-func (p *producer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
|
|
|
+func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
|
|
|
var ticker *time.Ticker
|
|
|
var timer <-chan time.Time
|
|
|
if p.conf.Producer.Flush.Frequency > 0 {
|
|
|
@@ -483,7 +483,7 @@ shutdown:
|
|
|
|
|
|
// one per broker
|
|
|
// takes a batch at a time from the messageAggregator and sends to the broker
|
|
|
-func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
+func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
var closing error
|
|
|
currentRetries := make(map[string]map[int32]error)
|
|
|
Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
|
|
|
@@ -589,7 +589,7 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
// singleton
|
|
|
// effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
|
|
|
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
|
|
|
-func (p *producer) retryHandler() {
|
|
|
+func (p *asyncProducer) retryHandler() {
|
|
|
var buf []*ProducerMessage
|
|
|
var msg *ProducerMessage
|
|
|
refs := 0
|
|
|
@@ -636,7 +636,7 @@ func (p *producer) retryHandler() {
|
|
|
|
|
|
// utility functions
|
|
|
|
|
|
-func (p *producer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
|
|
|
+func (p *asyncProducer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
|
|
|
var partitions []int32
|
|
|
var err error
|
|
|
|
|
|
@@ -669,7 +669,7 @@ func (p *producer) assignPartition(partitioner Partitioner, msg *ProducerMessage
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (p *producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
|
|
|
+func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
|
|
|
|
|
|
req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)}
|
|
|
empty := true
|
|
|
@@ -731,7 +731,7 @@ func (p *producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *
|
|
|
return req
|
|
|
}
|
|
|
|
|
|
-func (p *producer) returnError(msg *ProducerMessage, err error) {
|
|
|
+func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
|
|
|
msg.flags = 0
|
|
|
msg.retries = 0
|
|
|
pErr := &ProducerError{Msg: msg, Err: err}
|
|
|
@@ -742,7 +742,7 @@ func (p *producer) returnError(msg *ProducerMessage, err error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (p *producer) returnErrors(batch []*ProducerMessage, err error) {
|
|
|
+func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
|
|
|
for _, msg := range batch {
|
|
|
if msg != nil {
|
|
|
p.returnError(msg, err)
|
|
|
@@ -750,7 +750,7 @@ func (p *producer) returnErrors(batch []*ProducerMessage, err error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (p *producer) returnSuccesses(batch []*ProducerMessage) {
|
|
|
+func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
|
|
|
for _, msg := range batch {
|
|
|
if msg != nil {
|
|
|
msg.flags = 0
|
|
|
@@ -759,7 +759,7 @@ func (p *producer) returnSuccesses(batch []*ProducerMessage) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (p *producer) retryMessages(batch []*ProducerMessage, err error) {
|
|
|
+func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
|
|
|
for _, msg := range batch {
|
|
|
if msg == nil {
|
|
|
continue
|
|
|
@@ -778,7 +778,7 @@ type brokerProducer struct {
|
|
|
refs int
|
|
|
}
|
|
|
|
|
|
-func (p *producer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
|
|
|
+func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
|
|
|
p.brokerLock.Lock()
|
|
|
defer p.brokerLock.Unlock()
|
|
|
|
|
|
@@ -799,7 +799,7 @@ func (p *producer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
|
|
|
return producer.input
|
|
|
}
|
|
|
|
|
|
-func (p *producer) unrefBrokerProducer(broker *Broker) {
|
|
|
+func (p *asyncProducer) unrefBrokerProducer(broker *Broker) {
|
|
|
p.brokerLock.Lock()
|
|
|
defer p.brokerLock.Unlock()
|
|
|
|