|
|
@@ -12,14 +12,14 @@ func forceFlushThreshold() int {
|
|
|
return int(MaxRequestSize - (10 * 1024)) // 10KiB is safety room for misc. overhead, we might want to calculate this more precisely?
|
|
|
}
|
|
|
|
|
|
-// Producer publishes Kafka messages. It routes messages to the correct broker
|
|
|
-// for the provided topic-partition, refreshing metadata as appropriate, and
|
|
|
-// parses responses for errors. You must read from the Errors() channel or the
|
|
|
+// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
|
|
|
+// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
|
|
|
+// and parses responses for errors. You must read from the Errors() channel or the
|
|
|
// producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
|
|
|
// leaks: it will not be garbage-collected automatically when it passes out of
|
|
|
// scope (this is in addition to calling Close on the underlying client, which
|
|
|
// is still necessary).
|
|
|
-type Producer interface {
|
|
|
+type AsyncProducer interface {
|
|
|
|
|
|
// AsyncClose triggers a shutdown of the producer, flushing any messages it may have
|
|
|
// buffered. The shutdown has completed when both the Errors and Successes channels
|
|
|
@@ -47,7 +47,7 @@ type Producer interface {
|
|
|
Errors() <-chan *ProducerError
|
|
|
}
|
|
|
|
|
|
-type producer struct {
|
|
|
+type asyncProducer struct {
|
|
|
client Client
|
|
|
conf *Config
|
|
|
ownClient bool
|
|
|
@@ -59,29 +59,29 @@ type producer struct {
|
|
|
brokerLock sync.Mutex
|
|
|
}
|
|
|
|
|
|
-// NewProducer creates a new Producer using the given broker addresses and configuration.
|
|
|
-func NewProducer(addrs []string, conf *Config) (Producer, error) {
|
|
|
+// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
|
|
|
+func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
|
|
|
client, err := NewClient(addrs, conf)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- p, err := NewProducerFromClient(client)
|
|
|
+ p, err := NewAsyncProducerFromClient(client)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- p.(*producer).ownClient = true
|
|
|
+ p.(*asyncProducer).ownClient = true
|
|
|
return p, nil
|
|
|
}
|
|
|
|
|
|
-// NewProducerFromClient creates a new Producer using the given client.
|
|
|
-func NewProducerFromClient(client Client) (Producer, error) {
|
|
|
+// NewAsyncProducerFromClient creates a new Producer using the given client.
|
|
|
+func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
|
|
|
// Check that we are not dealing with a closed Client before processing any other arguments
|
|
|
if client.Closed() {
|
|
|
return nil, ErrClosedClient
|
|
|
}
|
|
|
|
|
|
- p := &producer{
|
|
|
+ p := &asyncProducer{
|
|
|
client: client,
|
|
|
conf: client.Config(),
|
|
|
errors: make(chan *ProducerError),
|
|
|
@@ -109,28 +109,18 @@ const (
|
|
|
|
|
|
// ProducerMessage is the collection of elements passed to the Producer in order to send a message.
|
|
|
type ProducerMessage struct {
|
|
|
- Topic string // The Kafka topic for this message.
|
|
|
- Key Encoder // The partitioning key for this message. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
|
|
|
- Value Encoder // The actual message to store in Kafka. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
|
|
|
- Metadata interface{} // This field is used to hold arbitrary data you wish to include so it will be available when receiving on the Successes and Errors channels. Sarama completely ignores this field and is only to be used for pass-through data.
|
|
|
+ Topic string // The Kafka topic for this message.
|
|
|
+ Key Encoder // The partitioning key for this message. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
|
|
|
+ Value Encoder // The actual message to store in Kafka. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
|
|
|
|
|
|
- // these are filled in by the producer as the message is processed
|
|
|
- offset int64
|
|
|
- partition int32
|
|
|
- retries int
|
|
|
- flags flagSet
|
|
|
-}
|
|
|
+ // These are filled in by the producer as the message is processed
|
|
|
+ Offset int64 // Offset is the offset of the message stored on the broker. This is only guaranteed to be defined if the message was successfully delivered and RequiredAcks is not NoResponse.
|
|
|
+ Partition int32 // Partition is the partition that the message was sent to. This is only guaranteed to be defined if the message was successfully delivered.
|
|
|
|
|
|
-// Offset is the offset of the message stored on the broker. This is only guaranteed to be defined if
|
|
|
-// the message was successfully delivered and RequiredAcks is not NoResponse.
|
|
|
-func (m *ProducerMessage) Offset() int64 {
|
|
|
- return m.offset
|
|
|
-}
|
|
|
+ Metadata interface{} // This field is used to hold arbitrary data you wish to include so it will be available when receiving on the Successes and Errors channels. Sarama completely ignores this field and is only to be used for pass-through data.
|
|
|
|
|
|
-// Partition is the partition that the message was sent to. This is only guaranteed to be defined if
|
|
|
-// the message was successfully delivered.
|
|
|
-func (m *ProducerMessage) Partition() int32 {
|
|
|
- return m.partition
|
|
|
+ retries int
|
|
|
+ flags flagSet
|
|
|
}
|
|
|
|
|
|
func (m *ProducerMessage) byteSize() int {
|
|
|
@@ -164,19 +154,19 @@ func (pe ProducerErrors) Error() string {
|
|
|
return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
|
|
|
}
|
|
|
|
|
|
-func (p *producer) Errors() <-chan *ProducerError {
|
|
|
+func (p *asyncProducer) Errors() <-chan *ProducerError {
|
|
|
return p.errors
|
|
|
}
|
|
|
|
|
|
-func (p *producer) Successes() <-chan *ProducerMessage {
|
|
|
+func (p *asyncProducer) Successes() <-chan *ProducerMessage {
|
|
|
return p.successes
|
|
|
}
|
|
|
|
|
|
-func (p *producer) Input() chan<- *ProducerMessage {
|
|
|
+func (p *asyncProducer) Input() chan<- *ProducerMessage {
|
|
|
return p.input
|
|
|
}
|
|
|
|
|
|
-func (p *producer) Close() error {
|
|
|
+func (p *asyncProducer) Close() error {
|
|
|
p.AsyncClose()
|
|
|
|
|
|
if p.conf.Producer.Return.Successes {
|
|
|
@@ -199,7 +189,7 @@ func (p *producer) Close() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (p *producer) AsyncClose() {
|
|
|
+func (p *asyncProducer) AsyncClose() {
|
|
|
go withRecover(func() {
|
|
|
p.input <- &ProducerMessage{flags: shutdown}
|
|
|
})
|
|
|
@@ -214,7 +204,7 @@ func (p *producer) AsyncClose() {
|
|
|
|
|
|
// singleton
|
|
|
// dispatches messages by topic
|
|
|
-func (p *producer) topicDispatcher() {
|
|
|
+func (p *asyncProducer) topicDispatcher() {
|
|
|
handlers := make(map[string]chan *ProducerMessage)
|
|
|
|
|
|
for msg := range p.input {
|
|
|
@@ -270,7 +260,7 @@ func (p *producer) topicDispatcher() {
|
|
|
|
|
|
// one per topic
|
|
|
// partitions messages, then dispatches them by partition
|
|
|
-func (p *producer) partitionDispatcher(topic string, input chan *ProducerMessage) {
|
|
|
+func (p *asyncProducer) partitionDispatcher(topic string, input chan *ProducerMessage) {
|
|
|
handlers := make(map[int32]chan *ProducerMessage)
|
|
|
partitioner := p.conf.Producer.Partitioner()
|
|
|
|
|
|
@@ -283,15 +273,15 @@ func (p *producer) partitionDispatcher(topic string, input chan *ProducerMessage
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- handler := handlers[msg.partition]
|
|
|
+ handler := handlers[msg.Partition]
|
|
|
if handler == nil {
|
|
|
p.retries <- &ProducerMessage{flags: ref}
|
|
|
newHandler := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
|
|
|
topic := msg.Topic // block local because go's closure semantics suck
|
|
|
- partition := msg.partition // block local because go's closure semantics suck
|
|
|
+ partition := msg.Partition // block local because go's closure semantics suck
|
|
|
go withRecover(func() { p.leaderDispatcher(topic, partition, newHandler) })
|
|
|
handler = newHandler
|
|
|
- handlers[msg.partition] = handler
|
|
|
+ handlers[msg.Partition] = handler
|
|
|
}
|
|
|
|
|
|
handler <- msg
|
|
|
@@ -306,7 +296,7 @@ func (p *producer) partitionDispatcher(topic string, input chan *ProducerMessage
|
|
|
// one per partition per topic
|
|
|
// dispatches messages to the appropriate broker
|
|
|
// also responsible for maintaining message order during retries
|
|
|
-func (p *producer) leaderDispatcher(topic string, partition int32, input chan *ProducerMessage) {
|
|
|
+func (p *asyncProducer) leaderDispatcher(topic string, partition int32, input chan *ProducerMessage) {
|
|
|
var leader *Broker
|
|
|
var output chan *ProducerMessage
|
|
|
|
|
|
@@ -348,7 +338,7 @@ func (p *producer) leaderDispatcher(topic string, partition int32, input chan *P
|
|
|
highWatermark = msg.retries
|
|
|
Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
retryState[msg.retries].expectChaser = true
|
|
|
- output <- &ProducerMessage{Topic: topic, partition: partition, flags: chaser, retries: msg.retries - 1}
|
|
|
+ output <- &ProducerMessage{Topic: topic, Partition: partition, flags: chaser, retries: msg.retries - 1}
|
|
|
Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
p.unrefBrokerProducer(leader)
|
|
|
output = nil
|
|
|
@@ -423,7 +413,7 @@ func (p *producer) leaderDispatcher(topic string, partition int32, input chan *P
|
|
|
// one per broker
|
|
|
// groups messages together into appropriately-sized batches for sending to the broker
|
|
|
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
|
|
|
-func (p *producer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
|
|
|
+func (p *asyncProducer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
|
|
|
var ticker *time.Ticker
|
|
|
var timer <-chan time.Time
|
|
|
if p.conf.Producer.Flush.Frequency > 0 {
|
|
|
@@ -483,7 +473,7 @@ shutdown:
|
|
|
|
|
|
// one per broker
|
|
|
// takes a batch at a time from the messageAggregator and sends to the broker
|
|
|
-func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
+func (p *asyncProducer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
var closing error
|
|
|
currentRetries := make(map[string]map[int32]error)
|
|
|
Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
|
|
|
@@ -497,14 +487,14 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
// group messages by topic/partition
|
|
|
msgSets := make(map[string]map[int32][]*ProducerMessage)
|
|
|
for i, msg := range batch {
|
|
|
- if currentRetries[msg.Topic] != nil && currentRetries[msg.Topic][msg.partition] != nil {
|
|
|
+ if currentRetries[msg.Topic] != nil && currentRetries[msg.Topic][msg.Partition] != nil {
|
|
|
if msg.flags&chaser == chaser {
|
|
|
// we can start processing this topic/partition again
|
|
|
Logger.Printf("producer/flusher/%d state change to [normal] on %s/%d\n",
|
|
|
- broker.ID(), msg.Topic, msg.partition)
|
|
|
- currentRetries[msg.Topic][msg.partition] = nil
|
|
|
+ broker.ID(), msg.Topic, msg.Partition)
|
|
|
+ currentRetries[msg.Topic][msg.Partition] = nil
|
|
|
}
|
|
|
- p.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.partition])
|
|
|
+ p.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.Partition])
|
|
|
batch[i] = nil // to prevent it being returned/retried twice
|
|
|
continue
|
|
|
}
|
|
|
@@ -515,7 +505,7 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
msgSets[msg.Topic] = partitionSet
|
|
|
}
|
|
|
|
|
|
- partitionSet[msg.partition] = append(partitionSet[msg.partition], msg)
|
|
|
+ partitionSet[msg.Partition] = append(partitionSet[msg.Partition], msg)
|
|
|
}
|
|
|
|
|
|
request := p.buildRequest(msgSets)
|
|
|
@@ -563,7 +553,7 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
// All the messages for this topic-partition were delivered successfully!
|
|
|
if p.conf.Producer.Return.Successes {
|
|
|
for i := range msgs {
|
|
|
- msgs[i].offset = block.Offset + int64(i)
|
|
|
+ msgs[i].Offset = block.Offset + int64(i)
|
|
|
}
|
|
|
p.returnSuccesses(msgs)
|
|
|
}
|
|
|
@@ -589,7 +579,7 @@ func (p *producer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
// singleton
|
|
|
// effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
|
|
|
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
|
|
|
-func (p *producer) retryHandler() {
|
|
|
+func (p *asyncProducer) retryHandler() {
|
|
|
var buf []*ProducerMessage
|
|
|
var msg *ProducerMessage
|
|
|
refs := 0
|
|
|
@@ -636,7 +626,7 @@ func (p *producer) retryHandler() {
|
|
|
|
|
|
// utility functions
|
|
|
|
|
|
-func (p *producer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
|
|
|
+func (p *asyncProducer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
|
|
|
var partitions []int32
|
|
|
var err error
|
|
|
|
|
|
@@ -664,12 +654,12 @@ func (p *producer) assignPartition(partitioner Partitioner, msg *ProducerMessage
|
|
|
return ErrInvalidPartition
|
|
|
}
|
|
|
|
|
|
- msg.partition = partitions[choice]
|
|
|
+ msg.Partition = partitions[choice]
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (p *producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
|
|
|
+func (p *asyncProducer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
|
|
|
|
|
|
req := &ProduceRequest{RequiredAcks: p.conf.Producer.RequiredAcks, Timeout: int32(p.conf.Producer.Timeout / time.Millisecond)}
|
|
|
empty := true
|
|
|
@@ -731,7 +721,7 @@ func (p *producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *
|
|
|
return req
|
|
|
}
|
|
|
|
|
|
-func (p *producer) returnError(msg *ProducerMessage, err error) {
|
|
|
+func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
|
|
|
msg.flags = 0
|
|
|
msg.retries = 0
|
|
|
pErr := &ProducerError{Msg: msg, Err: err}
|
|
|
@@ -742,7 +732,7 @@ func (p *producer) returnError(msg *ProducerMessage, err error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (p *producer) returnErrors(batch []*ProducerMessage, err error) {
|
|
|
+func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
|
|
|
for _, msg := range batch {
|
|
|
if msg != nil {
|
|
|
p.returnError(msg, err)
|
|
|
@@ -750,7 +740,7 @@ func (p *producer) returnErrors(batch []*ProducerMessage, err error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (p *producer) returnSuccesses(batch []*ProducerMessage) {
|
|
|
+func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
|
|
|
for _, msg := range batch {
|
|
|
if msg != nil {
|
|
|
msg.flags = 0
|
|
|
@@ -759,7 +749,7 @@ func (p *producer) returnSuccesses(batch []*ProducerMessage) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (p *producer) retryMessages(batch []*ProducerMessage, err error) {
|
|
|
+func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
|
|
|
for _, msg := range batch {
|
|
|
if msg == nil {
|
|
|
continue
|
|
|
@@ -778,7 +768,7 @@ type brokerProducer struct {
|
|
|
refs int
|
|
|
}
|
|
|
|
|
|
-func (p *producer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
|
|
|
+func (p *asyncProducer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
|
|
|
p.brokerLock.Lock()
|
|
|
defer p.brokerLock.Unlock()
|
|
|
|
|
|
@@ -799,7 +789,7 @@ func (p *producer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
|
|
|
return producer.input
|
|
|
}
|
|
|
|
|
|
-func (p *producer) unrefBrokerProducer(broker *Broker) {
|
|
|
+func (p *asyncProducer) unrefBrokerProducer(broker *Broker) {
|
|
|
p.brokerLock.Lock()
|
|
|
defer p.brokerLock.Unlock()
|
|
|
|