|
|
@@ -19,7 +19,34 @@ func forceFlushThreshold() int {
|
|
|
// leaks: it will not be garbage-collected automatically when it passes out of
|
|
|
// scope (this is in addition to calling Close on the underlying client, which
|
|
|
// is still necessary).
|
|
|
-type Producer struct {
|
|
|
+type Producer interface {
|
|
|
+
|
|
|
+ // AsyncClose triggers a shutdown of the producer, flushing any messages it may have
|
|
|
+ // buffered. The shutdown has completed when both the Errors and Successes channels
|
|
|
+ // have been closed. When calling AsyncClose, you *must* continue to read from those
|
|
|
+ // channels in order to drain the results of any messages in flight.
|
|
|
+ AsyncClose()
|
|
|
+
|
|
|
+ // Close shuts down the producer and flushes any messages it may have buffered.
|
|
|
+ // You must call this function before a producer object passes out of scope, as
|
|
|
+ // it may otherwise leak memory. You must call this before calling Close on the
|
|
|
+ // underlying client.
|
|
|
+ Close() error
|
|
|
+
|
|
|
+ // Input is the input channel for the user to write messages to that they wish to send.
|
|
|
+ Input() chan<- *ProducerMessage
|
|
|
+
|
|
|
+ // Successes is the success output channel back to the user when AckSuccesses is confured.
|
|
|
+ // If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock.
|
|
|
+ // It is suggested that you send and read messages together in a single select statement.
|
|
|
+ Successes() <-chan *ProducerMessage
|
|
|
+
|
|
|
+ // Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock.
|
|
|
+ // It is suggested that you send messages and read errors together in a single select statement.
|
|
|
+ Errors() <-chan *ProducerError
|
|
|
+}
|
|
|
+
|
|
|
+type producer struct {
|
|
|
client *Client
|
|
|
conf *Config
|
|
|
ownClient bool
|
|
|
@@ -32,7 +59,7 @@ type Producer struct {
|
|
|
}
|
|
|
|
|
|
// NewProducer creates a new Producer using the given broker addresses and configuration.
|
|
|
-func NewProducer(addrs []string, conf *Config) (*Producer, error) {
|
|
|
+func NewProducer(addrs []string, conf *Config) (Producer, error) {
|
|
|
client, err := NewClient(addrs, conf)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
@@ -42,18 +69,18 @@ func NewProducer(addrs []string, conf *Config) (*Producer, error) {
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- p.ownClient = true
|
|
|
+ p.(*producer).ownClient = true
|
|
|
return p, nil
|
|
|
}
|
|
|
|
|
|
// NewProducerFromClient creates a new Producer using the given client.
|
|
|
-func NewProducerFromClient(client *Client) (*Producer, error) {
|
|
|
+func NewProducerFromClient(client *Client) (Producer, error) {
|
|
|
// Check that we are not dealing with a closed Client before processing any other arguments
|
|
|
if client.Closed() {
|
|
|
return nil, ErrClosedClient
|
|
|
}
|
|
|
|
|
|
- p := &Producer{
|
|
|
+ p := &producer{
|
|
|
client: client,
|
|
|
conf: client.conf,
|
|
|
errors: make(chan *ProducerError),
|
|
|
@@ -136,29 +163,19 @@ func (pe ProducerErrors) Error() string {
|
|
|
return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
|
|
|
}
|
|
|
|
|
|
-// Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock.
|
|
|
-// It is suggested that you send messages and read errors together in a single select statement.
|
|
|
-func (p *Producer) Errors() <-chan *ProducerError {
|
|
|
+func (p *producer) Errors() <-chan *ProducerError {
|
|
|
return p.errors
|
|
|
}
|
|
|
|
|
|
-// Successes is the success output channel back to the user when AckSuccesses is confured.
|
|
|
-// If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock.
|
|
|
-// It is suggested that you send and read messages together in a single select statement.
|
|
|
-func (p *Producer) Successes() <-chan *ProducerMessage {
|
|
|
+func (p *producer) Successes() <-chan *ProducerMessage {
|
|
|
return p.successes
|
|
|
}
|
|
|
|
|
|
-// Input is the input channel for the user to write messages to that they wish to send.
|
|
|
-func (p *Producer) Input() chan<- *ProducerMessage {
|
|
|
+func (p *producer) Input() chan<- *ProducerMessage {
|
|
|
return p.input
|
|
|
}
|
|
|
|
|
|
-// Close shuts down the producer and flushes any messages it may have buffered.
|
|
|
-// You must call this function before a producer object passes out of scope, as
|
|
|
-// it may otherwise leak memory. You must call this before calling Close on the
|
|
|
-// underlying client.
|
|
|
-func (p *Producer) Close() error {
|
|
|
+func (p *producer) Close() error {
|
|
|
p.AsyncClose()
|
|
|
|
|
|
if p.conf.Producer.AckSuccesses {
|
|
|
@@ -179,11 +196,7 @@ func (p *Producer) Close() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-// AsyncClose triggers a shutdown of the producer, flushing any messages it may have
|
|
|
-// buffered. The shutdown has completed when both the Errors and Successes channels
|
|
|
-// have been closed. When calling AsyncClose, you *must* continue to read from those
|
|
|
-// channels in order to drain the results of any messages in flight.
|
|
|
-func (p *Producer) AsyncClose() {
|
|
|
+func (p *producer) AsyncClose() {
|
|
|
go withRecover(func() {
|
|
|
p.input <- &ProducerMessage{flags: shutdown}
|
|
|
})
|
|
|
@@ -198,7 +211,7 @@ func (p *Producer) AsyncClose() {
|
|
|
|
|
|
// singleton
|
|
|
// dispatches messages by topic
|
|
|
-func (p *Producer) topicDispatcher() {
|
|
|
+func (p *producer) topicDispatcher() {
|
|
|
handlers := make(map[string]chan *ProducerMessage)
|
|
|
|
|
|
for msg := range p.input {
|
|
|
@@ -254,7 +267,7 @@ func (p *Producer) topicDispatcher() {
|
|
|
|
|
|
// one per topic
|
|
|
// partitions messages, then dispatches them by partition
|
|
|
-func (p *Producer) partitionDispatcher(topic string, input chan *ProducerMessage) {
|
|
|
+func (p *producer) partitionDispatcher(topic string, input chan *ProducerMessage) {
|
|
|
handlers := make(map[int32]chan *ProducerMessage)
|
|
|
partitioner := p.conf.Producer.Partitioner()
|
|
|
|
|
|
@@ -290,7 +303,7 @@ func (p *Producer) partitionDispatcher(topic string, input chan *ProducerMessage
|
|
|
// one per partition per topic
|
|
|
// dispatches messages to the appropriate broker
|
|
|
// also responsible for maintaining message order during retries
|
|
|
-func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *ProducerMessage) {
|
|
|
+func (p *producer) leaderDispatcher(topic string, partition int32, input chan *ProducerMessage) {
|
|
|
var leader *Broker
|
|
|
var output chan *ProducerMessage
|
|
|
|
|
|
@@ -407,7 +420,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *P
|
|
|
// one per broker
|
|
|
// groups messages together into appropriately-sized batches for sending to the broker
|
|
|
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
|
|
|
-func (p *Producer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
|
|
|
+func (p *producer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
|
|
|
var ticker *time.Ticker
|
|
|
var timer <-chan time.Time
|
|
|
if p.conf.Producer.Flush.Frequency > 0 {
|
|
|
@@ -467,7 +480,7 @@ shutdown:
|
|
|
|
|
|
// one per broker
|
|
|
// takes a batch at a time from the messageAggregator and sends to the broker
|
|
|
-func (p *Producer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
+func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
var closing error
|
|
|
currentRetries := make(map[string]map[int32]error)
|
|
|
Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
|
|
|
@@ -573,7 +586,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
// singleton
|
|
|
// effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
|
|
|
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
|
|
|
-func (p *Producer) retryHandler() {
|
|
|
+func (p *producer) retryHandler() {
|
|
|
var buf []*ProducerMessage
|
|
|
var msg *ProducerMessage
|
|
|
refs := 0
|
|
|
@@ -620,7 +633,7 @@ func (p *Producer) retryHandler() {
|
|
|
|
|
|
// utility functions
|
|
|
|
|
|
-func (p *Producer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
|
|
|
+func (p *producer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
|
|
|
var partitions []int32
|
|
|
var err error
|
|
|
|
|
|
@@ -653,7 +666,7 @@ func (p *Producer) assignPartition(partitioner Partitioner, msg *ProducerMessage
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
|
|
|
+func (p *producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
|
|
|
|
|
|
req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)}
|
|
|
empty := true
|
|
|
@@ -715,13 +728,13 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *
|
|
|
return req
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) returnError(msg *ProducerMessage, err error) {
|
|
|
+func (p *producer) returnError(msg *ProducerMessage, err error) {
|
|
|
msg.flags = 0
|
|
|
msg.retries = 0
|
|
|
p.errors <- &ProducerError{Msg: msg, Err: err}
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) returnErrors(batch []*ProducerMessage, err error) {
|
|
|
+func (p *producer) returnErrors(batch []*ProducerMessage, err error) {
|
|
|
for _, msg := range batch {
|
|
|
if msg != nil {
|
|
|
p.returnError(msg, err)
|
|
|
@@ -729,7 +742,7 @@ func (p *Producer) returnErrors(batch []*ProducerMessage, err error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) returnSuccesses(batch []*ProducerMessage) {
|
|
|
+func (p *producer) returnSuccesses(batch []*ProducerMessage) {
|
|
|
for _, msg := range batch {
|
|
|
if msg != nil {
|
|
|
msg.flags = 0
|
|
|
@@ -738,7 +751,7 @@ func (p *Producer) returnSuccesses(batch []*ProducerMessage) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) retryMessages(batch []*ProducerMessage, err error) {
|
|
|
+func (p *producer) retryMessages(batch []*ProducerMessage, err error) {
|
|
|
for _, msg := range batch {
|
|
|
if msg == nil {
|
|
|
continue
|
|
|
@@ -757,7 +770,7 @@ type brokerProducer struct {
|
|
|
refs int
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
|
|
|
+func (p *producer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
|
|
|
p.brokerLock.Lock()
|
|
|
defer p.brokerLock.Unlock()
|
|
|
|
|
|
@@ -778,7 +791,7 @@ func (p *Producer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
|
|
|
return producer.input
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) unrefBrokerProducer(broker *Broker) {
|
|
|
+func (p *producer) unrefBrokerProducer(broker *Broker) {
|
|
|
p.brokerLock.Lock()
|
|
|
defer p.brokerLock.Unlock()
|
|
|
|