|
|
@@ -114,10 +114,10 @@ type Producer struct {
|
|
|
client *Client
|
|
|
config ProducerConfig
|
|
|
|
|
|
- errors chan *ProduceError
|
|
|
- input, successes, retries chan *MessageToSend
|
|
|
+ errors chan *ProducerError
|
|
|
+ input, successes, retries chan *ProducerMessage
|
|
|
|
|
|
- brokers map[*Broker]*brokerWorker
|
|
|
+ brokers map[*Broker]*brokerProducer
|
|
|
brokerLock sync.Mutex
|
|
|
}
|
|
|
|
|
|
@@ -140,11 +140,11 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
|
|
|
p := &Producer{
|
|
|
client: client,
|
|
|
config: *config,
|
|
|
- errors: make(chan *ProduceError),
|
|
|
- input: make(chan *MessageToSend),
|
|
|
- successes: make(chan *MessageToSend),
|
|
|
- retries: make(chan *MessageToSend),
|
|
|
- brokers: make(map[*Broker]*brokerWorker),
|
|
|
+ errors: make(chan *ProducerError),
|
|
|
+ input: make(chan *ProducerMessage),
|
|
|
+ successes: make(chan *ProducerMessage),
|
|
|
+ retries: make(chan *ProducerMessage),
|
|
|
+ brokers: make(map[*Broker]*brokerProducer),
|
|
|
}
|
|
|
|
|
|
// launch our singleton dispatchers
|
|
|
@@ -163,8 +163,8 @@ const (
|
|
|
shutdown // start the shutdown process
|
|
|
)
|
|
|
|
|
|
-// MessageToSend is the collection of elements passed to the Producer in order to send a message.
|
|
|
-type MessageToSend struct {
|
|
|
+// ProducerMessage is the collection of elements passed to the Producer in order to send a message.
|
|
|
+type ProducerMessage struct {
|
|
|
Topic string // The Kafka topic for this message.
|
|
|
Key Encoder // The partitioning key for this message. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
|
|
|
Value Encoder // The actual message to store in Kafka. It must implement the Encoder interface. Pre-existing Encoders include StringEncoder and ByteEncoder.
|
|
|
@@ -179,17 +179,17 @@ type MessageToSend struct {
|
|
|
|
|
|
// Offset is the offset of the message stored on the broker. This is only guaranteed to be defined if
|
|
|
// the message was successfully delivered and RequiredAcks is not NoResponse.
|
|
|
-func (m *MessageToSend) Offset() int64 {
|
|
|
+func (m *ProducerMessage) Offset() int64 {
|
|
|
return m.offset
|
|
|
}
|
|
|
|
|
|
// Partition is the partition that the message was sent to. This is only guaranteed to be defined if
|
|
|
// the message was successfully delivered.
|
|
|
-func (m *MessageToSend) Partition() int32 {
|
|
|
+func (m *ProducerMessage) Partition() int32 {
|
|
|
return m.partition
|
|
|
}
|
|
|
|
|
|
-func (m *MessageToSend) byteSize() int {
|
|
|
+func (m *ProducerMessage) byteSize() int {
|
|
|
size := 26 // the metadata overhead of CRC, flags, etc.
|
|
|
if m.Key != nil {
|
|
|
size += m.Key.Length()
|
|
|
@@ -200,37 +200,41 @@ func (m *MessageToSend) byteSize() int {
|
|
|
return size
|
|
|
}
|
|
|
|
|
|
-// ProduceError is the type of error generated when the producer fails to deliver a message.
|
|
|
-// It contains the original MessageToSend as well as the actual error value.
|
|
|
-type ProduceError struct {
|
|
|
- Msg *MessageToSend
|
|
|
+// ProducerError is the type of error generated when the producer fails to deliver a message.
|
|
|
+// It contains the original ProducerMessage as well as the actual error value.
|
|
|
+type ProducerError struct {
|
|
|
+ Msg *ProducerMessage
|
|
|
Err error
|
|
|
}
|
|
|
|
|
|
-// ProduceErrors is a type that wraps a batch of "ProduceError"s and implements the Error interface.
|
|
|
+func (pe ProducerError) Error() string {
|
|
|
+ return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
|
|
|
+}
|
|
|
+
|
|
|
+// ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
|
|
|
// It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
|
|
|
// when closing a producer.
|
|
|
-type ProduceErrors []*ProduceError
|
|
|
+type ProducerErrors []*ProducerError
|
|
|
|
|
|
-func (pe ProduceErrors) Error() string {
|
|
|
+func (pe ProducerErrors) Error() string {
|
|
|
return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
|
|
|
}
|
|
|
|
|
|
// Errors is the error output channel back to the user. You MUST read from this channel or the Producer will deadlock.
|
|
|
// It is suggested that you send messages and read errors together in a single select statement.
|
|
|
-func (p *Producer) Errors() <-chan *ProduceError {
|
|
|
+func (p *Producer) Errors() <-chan *ProducerError {
|
|
|
return p.errors
|
|
|
}
|
|
|
|
|
|
// Successes is the success output channel back to the user when AckSuccesses is configured.
|
|
|
// If AckSuccesses is true, you MUST read from this channel or the Producer will deadlock.
|
|
|
// It is suggested that you send and read messages together in a single select statement.
|
|
|
-func (p *Producer) Successes() <-chan *MessageToSend {
|
|
|
+func (p *Producer) Successes() <-chan *ProducerMessage {
|
|
|
return p.successes
|
|
|
}
|
|
|
|
|
|
// Input is the input channel for the user to write messages to that they wish to send.
|
|
|
-func (p *Producer) Input() chan<- *MessageToSend {
|
|
|
+func (p *Producer) Input() chan<- *ProducerMessage {
|
|
|
return p.input
|
|
|
}
|
|
|
|
|
|
@@ -248,7 +252,7 @@ func (p *Producer) Close() error {
|
|
|
})
|
|
|
}
|
|
|
|
|
|
- var errors ProduceErrors
|
|
|
+ var errors ProducerErrors
|
|
|
for event := range p.errors {
|
|
|
errors = append(errors, event)
|
|
|
}
|
|
|
@@ -265,7 +269,7 @@ func (p *Producer) Close() error {
|
|
|
// channels in order to drain the results of any messages in flight.
|
|
|
func (p *Producer) AsyncClose() {
|
|
|
go withRecover(func() {
|
|
|
- p.input <- &MessageToSend{flags: shutdown}
|
|
|
+ p.input <- &ProducerMessage{flags: shutdown}
|
|
|
})
|
|
|
}
|
|
|
|
|
|
@@ -279,7 +283,7 @@ func (p *Producer) AsyncClose() {
|
|
|
// singleton
|
|
|
// dispatches messages by topic
|
|
|
func (p *Producer) topicDispatcher() {
|
|
|
- handlers := make(map[string]chan *MessageToSend)
|
|
|
+ handlers := make(map[string]chan *ProducerMessage)
|
|
|
|
|
|
for msg := range p.input {
|
|
|
if msg == nil {
|
|
|
@@ -301,8 +305,8 @@ func (p *Producer) topicDispatcher() {
|
|
|
|
|
|
handler := handlers[msg.Topic]
|
|
|
if handler == nil {
|
|
|
- p.retries <- &MessageToSend{flags: ref}
|
|
|
- newHandler := make(chan *MessageToSend, p.config.ChannelBufferSize)
|
|
|
+ p.retries <- &ProducerMessage{flags: ref}
|
|
|
+ newHandler := make(chan *ProducerMessage, p.config.ChannelBufferSize)
|
|
|
topic := msg.Topic // block local because go's closure semantics suck
|
|
|
go withRecover(func() { p.partitionDispatcher(topic, newHandler) })
|
|
|
handler = newHandler
|
|
|
@@ -316,7 +320,7 @@ func (p *Producer) topicDispatcher() {
|
|
|
close(handler)
|
|
|
}
|
|
|
|
|
|
- p.retries <- &MessageToSend{flags: shutdown}
|
|
|
+ p.retries <- &ProducerMessage{flags: shutdown}
|
|
|
|
|
|
for msg := range p.input {
|
|
|
p.returnError(msg, ErrShuttingDown)
|
|
|
@@ -328,8 +332,8 @@ func (p *Producer) topicDispatcher() {
|
|
|
|
|
|
// one per topic
|
|
|
// partitions messages, then dispatches them by partition
|
|
|
-func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend) {
|
|
|
- handlers := make(map[int32]chan *MessageToSend)
|
|
|
+func (p *Producer) partitionDispatcher(topic string, input chan *ProducerMessage) {
|
|
|
+ handlers := make(map[int32]chan *ProducerMessage)
|
|
|
partitioner := p.config.Partitioner()
|
|
|
|
|
|
for msg := range input {
|
|
|
@@ -343,8 +347,8 @@ func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend)
|
|
|
|
|
|
handler := handlers[msg.partition]
|
|
|
if handler == nil {
|
|
|
- p.retries <- &MessageToSend{flags: ref}
|
|
|
- newHandler := make(chan *MessageToSend, p.config.ChannelBufferSize)
|
|
|
+ p.retries <- &ProducerMessage{flags: ref}
|
|
|
+ newHandler := make(chan *ProducerMessage, p.config.ChannelBufferSize)
|
|
|
topic := msg.Topic // block local because go's closure semantics suck
|
|
|
partition := msg.partition // block local because go's closure semantics suck
|
|
|
go withRecover(func() { p.leaderDispatcher(topic, partition, newHandler) })
|
|
|
@@ -358,15 +362,15 @@ func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend)
|
|
|
for _, handler := range handlers {
|
|
|
close(handler)
|
|
|
}
|
|
|
- p.retries <- &MessageToSend{flags: unref}
|
|
|
+ p.retries <- &ProducerMessage{flags: unref}
|
|
|
}
|
|
|
|
|
|
// one per partition per topic
|
|
|
// dispatches messages to the appropriate broker
|
|
|
// also responsible for maintaining message order during retries
|
|
|
-func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *MessageToSend) {
|
|
|
+func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *ProducerMessage) {
|
|
|
var leader *Broker
|
|
|
- var output chan *MessageToSend
|
|
|
+ var output chan *ProducerMessage
|
|
|
|
|
|
breaker := breaker.New(3, 1, 10*time.Second)
|
|
|
doUpdate := func() (err error) {
|
|
|
@@ -378,7 +382,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- output = p.getBrokerWorker(leader)
|
|
|
+ output = p.getBrokerProducer(leader)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
@@ -386,7 +390,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
// on the first message
|
|
|
leader, _ = p.client.Leader(topic, partition)
|
|
|
if leader != nil {
|
|
|
- output = p.getBrokerWorker(leader)
|
|
|
+ output = p.getBrokerProducer(leader)
|
|
|
}
|
|
|
|
|
|
// highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
|
|
|
@@ -395,7 +399,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
// therefore whether our buffer is complete and safe to flush)
|
|
|
highWatermark := 0
|
|
|
retryState := make([]struct {
|
|
|
- buf []*MessageToSend
|
|
|
+ buf []*ProducerMessage
|
|
|
expectChaser bool
|
|
|
}, p.config.MaxRetries+1)
|
|
|
|
|
|
@@ -406,9 +410,9 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
highWatermark = msg.retries
|
|
|
Logger.Printf("producer/leader state change to [retrying-%d] on %s/%d\n", highWatermark, topic, partition)
|
|
|
retryState[msg.retries].expectChaser = true
|
|
|
- output <- &MessageToSend{Topic: topic, partition: partition, flags: chaser, retries: msg.retries - 1}
|
|
|
+ output <- &ProducerMessage{Topic: topic, partition: partition, flags: chaser, retries: msg.retries - 1}
|
|
|
Logger.Printf("producer/leader abandoning broker %d on %s/%d\n", leader.ID(), topic, partition)
|
|
|
- p.unrefBrokerWorker(leader)
|
|
|
+ p.unrefBrokerProducer(leader)
|
|
|
output = nil
|
|
|
time.Sleep(p.config.RetryBackoff)
|
|
|
} else if highWatermark > 0 {
|
|
|
@@ -474,14 +478,14 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
|
|
|
output <- msg
|
|
|
}
|
|
|
|
|
|
- p.unrefBrokerWorker(leader)
|
|
|
- p.retries <- &MessageToSend{flags: unref}
|
|
|
+ p.unrefBrokerProducer(leader)
|
|
|
+ p.retries <- &ProducerMessage{flags: unref}
|
|
|
}
|
|
|
|
|
|
// one per broker
|
|
|
// groups messages together into appropriately-sized batches for sending to the broker
|
|
|
// based on https://godoc.org/github.com/eapache/channels#BatchingChannel
|
|
|
-func (p *Producer) messageAggregator(broker *Broker, input chan *MessageToSend) {
|
|
|
+func (p *Producer) messageAggregator(broker *Broker, input chan *ProducerMessage) {
|
|
|
var ticker *time.Ticker
|
|
|
var timer <-chan time.Time
|
|
|
if p.config.FlushFrequency > 0 {
|
|
|
@@ -489,11 +493,11 @@ func (p *Producer) messageAggregator(broker *Broker, input chan *MessageToSend)
|
|
|
timer = ticker.C
|
|
|
}
|
|
|
|
|
|
- var buffer []*MessageToSend
|
|
|
- var doFlush chan []*MessageToSend
|
|
|
+ var buffer []*ProducerMessage
|
|
|
+ var doFlush chan []*ProducerMessage
|
|
|
var bytesAccumulated int
|
|
|
|
|
|
- flusher := make(chan []*MessageToSend)
|
|
|
+ flusher := make(chan []*ProducerMessage)
|
|
|
go withRecover(func() { p.flusher(broker, flusher) })
|
|
|
|
|
|
for {
|
|
|
@@ -541,7 +545,7 @@ shutdown:
|
|
|
|
|
|
// one per broker
|
|
|
// takes a batch at a time from the messageAggregator and sends to the broker
|
|
|
-func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
|
|
|
+func (p *Producer) flusher(broker *Broker, input chan []*ProducerMessage) {
|
|
|
var closing error
|
|
|
currentRetries := make(map[string]map[int32]error)
|
|
|
Logger.Printf("producer/flusher/%d starting up\n", broker.ID())
|
|
|
@@ -553,7 +557,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
|
|
|
}
|
|
|
|
|
|
// group messages by topic/partition
|
|
|
- msgSets := make(map[string]map[int32][]*MessageToSend)
|
|
|
+ msgSets := make(map[string]map[int32][]*ProducerMessage)
|
|
|
for i, msg := range batch {
|
|
|
if currentRetries[msg.Topic] != nil && currentRetries[msg.Topic][msg.partition] != nil {
|
|
|
if msg.flags&chaser == chaser {
|
|
|
@@ -562,14 +566,14 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
|
|
|
broker.ID(), msg.Topic, msg.partition)
|
|
|
currentRetries[msg.Topic][msg.partition] = nil
|
|
|
}
|
|
|
- p.retryMessages([]*MessageToSend{msg}, currentRetries[msg.Topic][msg.partition])
|
|
|
+ p.retryMessages([]*ProducerMessage{msg}, currentRetries[msg.Topic][msg.partition])
|
|
|
batch[i] = nil // to prevent it being returned/retried twice
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
partitionSet := msgSets[msg.Topic]
|
|
|
if partitionSet == nil {
|
|
|
- partitionSet = make(map[int32][]*MessageToSend)
|
|
|
+ partitionSet = make(map[int32][]*ProducerMessage)
|
|
|
msgSets[msg.Topic] = partitionSet
|
|
|
}
|
|
|
|
|
|
@@ -640,15 +644,15 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
|
|
|
}
|
|
|
}
|
|
|
Logger.Printf("producer/flusher/%d shut down\n", broker.ID())
|
|
|
- p.retries <- &MessageToSend{flags: unref}
|
|
|
+ p.retries <- &ProducerMessage{flags: unref}
|
|
|
}
|
|
|
|
|
|
// singleton
|
|
|
// effectively a "bridge" between the flushers and the topicDispatcher in order to avoid deadlock
|
|
|
// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
|
|
|
func (p *Producer) retryHandler() {
|
|
|
- var buf []*MessageToSend
|
|
|
- var msg *MessageToSend
|
|
|
+ var buf []*ProducerMessage
|
|
|
+ var msg *ProducerMessage
|
|
|
refs := 0
|
|
|
shuttingDown := false
|
|
|
|
|
|
@@ -693,7 +697,7 @@ func (p *Producer) retryHandler() {
|
|
|
|
|
|
// utility functions
|
|
|
|
|
|
-func (p *Producer) assignPartition(partitioner Partitioner, msg *MessageToSend) error {
|
|
|
+func (p *Producer) assignPartition(partitioner Partitioner, msg *ProducerMessage) error {
|
|
|
var partitions []int32
|
|
|
var err error
|
|
|
|
|
|
@@ -726,7 +730,7 @@ func (p *Producer) assignPartition(partitioner Partitioner, msg *MessageToSend)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) buildRequest(batch map[string]map[int32][]*MessageToSend) *ProduceRequest {
|
|
|
+func (p *Producer) buildRequest(batch map[string]map[int32][]*ProducerMessage) *ProduceRequest {
|
|
|
|
|
|
req := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: int32(p.config.Timeout / time.Millisecond)}
|
|
|
empty := true
|
|
|
@@ -788,13 +792,13 @@ func (p *Producer) buildRequest(batch map[string]map[int32][]*MessageToSend) *Pr
|
|
|
return req
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) returnError(msg *MessageToSend, err error) {
|
|
|
+func (p *Producer) returnError(msg *ProducerMessage, err error) {
|
|
|
msg.flags = 0
|
|
|
msg.retries = 0
|
|
|
- p.errors <- &ProduceError{Msg: msg, Err: err}
|
|
|
+ p.errors <- &ProducerError{Msg: msg, Err: err}
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) returnErrors(batch []*MessageToSend, err error) {
|
|
|
+func (p *Producer) returnErrors(batch []*ProducerMessage, err error) {
|
|
|
for _, msg := range batch {
|
|
|
if msg != nil {
|
|
|
p.returnError(msg, err)
|
|
|
@@ -802,7 +806,7 @@ func (p *Producer) returnErrors(batch []*MessageToSend, err error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) returnSuccesses(batch []*MessageToSend) {
|
|
|
+func (p *Producer) returnSuccesses(batch []*ProducerMessage) {
|
|
|
for _, msg := range batch {
|
|
|
if msg != nil {
|
|
|
msg.flags = 0
|
|
|
@@ -811,7 +815,7 @@ func (p *Producer) returnSuccesses(batch []*MessageToSend) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) retryMessages(batch []*MessageToSend, err error) {
|
|
|
+func (p *Producer) retryMessages(batch []*ProducerMessage, err error) {
|
|
|
for _, msg := range batch {
|
|
|
if msg == nil {
|
|
|
continue
|
|
|
@@ -825,42 +829,42 @@ func (p *Producer) retryMessages(batch []*MessageToSend, err error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-type brokerWorker struct {
|
|
|
- input chan *MessageToSend
|
|
|
+type brokerProducer struct {
|
|
|
+ input chan *ProducerMessage
|
|
|
refs int
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) getBrokerWorker(broker *Broker) chan *MessageToSend {
|
|
|
+func (p *Producer) getBrokerProducer(broker *Broker) chan *ProducerMessage {
|
|
|
p.brokerLock.Lock()
|
|
|
defer p.brokerLock.Unlock()
|
|
|
|
|
|
- worker := p.brokers[broker]
|
|
|
+ producer := p.brokers[broker]
|
|
|
|
|
|
- if worker == nil {
|
|
|
- p.retries <- &MessageToSend{flags: ref}
|
|
|
- worker = &brokerWorker{
|
|
|
+ if producer == nil {
|
|
|
+ p.retries <- &ProducerMessage{flags: ref}
|
|
|
+ producer = &brokerProducer{
|
|
|
refs: 1,
|
|
|
- input: make(chan *MessageToSend),
|
|
|
+ input: make(chan *ProducerMessage),
|
|
|
}
|
|
|
- p.brokers[broker] = worker
|
|
|
- go withRecover(func() { p.messageAggregator(broker, worker.input) })
|
|
|
+ p.brokers[broker] = producer
|
|
|
+ go withRecover(func() { p.messageAggregator(broker, producer.input) })
|
|
|
} else {
|
|
|
- worker.refs++
|
|
|
+ producer.refs++
|
|
|
}
|
|
|
|
|
|
- return worker.input
|
|
|
+ return producer.input
|
|
|
}
|
|
|
|
|
|
-func (p *Producer) unrefBrokerWorker(broker *Broker) {
|
|
|
+func (p *Producer) unrefBrokerProducer(broker *Broker) {
|
|
|
p.brokerLock.Lock()
|
|
|
defer p.brokerLock.Unlock()
|
|
|
|
|
|
- worker := p.brokers[broker]
|
|
|
+ producer := p.brokers[broker]
|
|
|
|
|
|
- if worker != nil {
|
|
|
- worker.refs--
|
|
|
- if worker.refs == 0 {
|
|
|
- close(worker.input)
|
|
|
+ if producer != nil {
|
|
|
+ producer.refs--
|
|
|
+ if producer.refs == 0 {
|
|
|
+ close(producer.input)
|
|
|
delete(p.brokers, broker)
|
|
|
}
|
|
|
}
|