|
|
@@ -5,39 +5,47 @@ import (
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
-// MultiProducerConfig is used to pass multiple configuration options to NewProducer.
|
|
|
type MultiProducerConfig struct {
|
|
|
- Partitioner Partitioner // Chooses the partition to send messages to, or randomly if this is nil.
|
|
|
- RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
|
|
|
- Timeout int32 // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
|
|
|
- Compression CompressionCodec // The type of compression to use on messages (defaults to no compression).
|
|
|
- MaxBufferBytes uint32
|
|
|
- MaxBufferTime uint32
|
|
|
+ Partitioner Partitioner
|
|
|
+ RequiredAcks RequiredAcks
|
|
|
+ Timeout int32
|
|
|
+ Compression CompressionCodec
|
|
|
+ MaxBufferBytes uint32
|
|
|
+ MaxBufferTime uint32
|
|
|
+ MaxDeliveryRetries uint32
|
|
|
+}
|
|
|
+
|
|
|
+type MultiProducer struct {
|
|
|
+ client *Client
|
|
|
+ config MultiProducerConfig
|
|
|
+ brokerProducers map[*Broker]*brokerProducer
|
|
|
+ m sync.RWMutex
|
|
|
+ errors chan error
|
|
|
+ deliveryLocks map[topicPartition]chan bool
|
|
|
+ dm sync.RWMutex
|
|
|
}
|
|
|
|
|
|
type brokerProducer struct {
|
|
|
- sync.Mutex
|
|
|
- broker *Broker
|
|
|
- request *ProduceRequest
|
|
|
+ messages map[string]map[int32][]*produceMessage
|
|
|
+ mapM sync.Mutex
|
|
|
bufferedBytes uint32
|
|
|
flushNow chan bool
|
|
|
+ broker *Broker
|
|
|
stopper chan bool
|
|
|
}
|
|
|
|
|
|
-// MultiProducer publishes Kafka messages on a given topic. It routes messages to the correct broker, refreshing metadata as appropriate,
|
|
|
-// and parses responses for errors. You must call Close() on a MultiProducer to avoid leaks, it may not be garbage-collected automatically when
|
|
|
-// it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
|
|
|
-type MultiProducer struct {
|
|
|
- m sync.RWMutex
|
|
|
- client *Client
|
|
|
- config MultiProducerConfig
|
|
|
- brokerProducers map[*Broker]*brokerProducer
|
|
|
- errors chan error
|
|
|
- deliveryMapM sync.RWMutex
|
|
|
- deliveryMutexes map[string]map[int32]chan bool
|
|
|
+type produceMessage struct {
|
|
|
+ topic string
|
|
|
+ partition int32
|
|
|
+ key, value []byte
|
|
|
+ failures uint32
|
|
|
+}
|
|
|
+
|
|
|
+type topicPartition struct {
|
|
|
+ topic string
|
|
|
+ partition int32
|
|
|
}
|
|
|
|
|
|
-// NewMultiProducer creates a new MultiProducer using the given client. The resulting object will buffer/flush Produce messages to Kafka.
|
|
|
func NewMultiProducer(client *Client, config *MultiProducerConfig) (*MultiProducer, error) {
|
|
|
if config == nil {
|
|
|
config = new(MultiProducerConfig)
|
|
|
@@ -59,44 +67,54 @@ func NewMultiProducer(client *Client, config *MultiProducerConfig) (*MultiProduc
|
|
|
config.MaxBufferBytes = 1
|
|
|
}
|
|
|
|
|
|
- p := new(MultiProducer)
|
|
|
- p.client = client
|
|
|
- p.config = *config
|
|
|
- p.errors = make(chan error, 16)
|
|
|
- p.brokerProducers = make(map[*Broker]*brokerProducer)
|
|
|
- p.deliveryMutexes = make(map[string]map[int32]chan bool)
|
|
|
+ return &MultiProducer{
|
|
|
+ client: client,
|
|
|
+ config: *config,
|
|
|
+ errors: make(chan error, 16),
|
|
|
+ deliveryLocks: make(map[topicPartition]chan bool),
|
|
|
+ }, nil
|
|
|
+}
|
|
|
|
|
|
- return p, nil
|
|
|
+func (p *MultiProducer) Errors() chan error {
|
|
|
+ if p.isSynchronous() {
|
|
|
+ panic("use of Errors() is not permitted in synchronous mode.")
|
|
|
+ } else {
|
|
|
+ return p.errors
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
-// Close shuts down the MultiProducer and flushes any messages it may have buffered. You must call this function before
|
|
|
-// a MultiProducer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
|
|
|
-// on the underlying client.
|
|
|
func (p *MultiProducer) Close() error {
|
|
|
- p.m.Lock()
|
|
|
- defer p.m.Unlock()
|
|
|
+ return nil
|
|
|
+}
|
|
|
|
|
|
- p.deliveryMapM.Lock()
|
|
|
- for _, d := range p.deliveryMutexes {
|
|
|
- for _, ch := range d {
|
|
|
- close(ch)
|
|
|
+func (p *MultiProducer) SendMessage(topic string, key, value Encoder) (err error) {
|
|
|
+ var keyBytes, valBytes []byte
|
|
|
+
|
|
|
+ if key != nil {
|
|
|
+ if keyBytes, err = key.Encode(); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if value != nil {
|
|
|
+ if valBytes, err = key.Encode(); err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
}
|
|
|
- p.deliveryMapM.Unlock()
|
|
|
|
|
|
- for _, bp := range p.brokerProducers {
|
|
|
- bp.Close()
|
|
|
+ partition, err := p.choosePartition(topic, key)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
}
|
|
|
|
|
|
- return nil
|
|
|
-}
|
|
|
+ msg := &produceMessage{
|
|
|
+ topic: topic,
|
|
|
+ partition: partition,
|
|
|
+ key: keyBytes,
|
|
|
+ value: valBytes,
|
|
|
+ failures: 0,
|
|
|
+ }
|
|
|
|
|
|
-// SendMessage sends a message with the given topic, key, and value. The partition to send to is selected by the
|
|
|
-// MultiProducer's Partitioner. To send strings as either key or value, see the StringEncoder type.
|
|
|
-// If operating in synchronous mode (MaxBufferTime=MaxBufferBytes=0), the error will be returned. If either value is > 0, nil will
|
|
|
-// always be returned and you must listen on the channel returned by Errors() to asynchronously receive error replies.
|
|
|
-func (p *MultiProducer) SendMessage(topic string, key, value Encoder) error {
|
|
|
- return p.safeSendMessage(topic, key, value, true)
|
|
|
+ return p.addMessage(msg, false)
|
|
|
}
|
|
|
|
|
|
func (p *MultiProducer) choosePartition(topic string, key Encoder) (int32, error) {
|
|
|
@@ -116,30 +134,68 @@ func (p *MultiProducer) choosePartition(topic string, key Encoder) (int32, error
|
|
|
return partitions[choice], nil
|
|
|
}
|
|
|
|
|
|
+func (p *MultiProducer) addMessage(msg *produceMessage, isRetry bool) error {
|
|
|
+ broker, err := p.client.Leader(msg.topic, msg.partition)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ bp := p.brokerProducerFor(broker)
|
|
|
+ bp.addMessage(msg, p.config.MaxBufferBytes, isRetry)
|
|
|
+
|
|
|
+ if p.isSynchronous() {
|
|
|
+ return <-p.errors
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (p *MultiProducer) isSynchronous() bool {
|
|
|
+ return p.config.MaxBufferBytes == 0 && p.config.MaxBufferTime == 0
|
|
|
+}
|
|
|
+
|
|
|
+func (p *MultiProducer) brokerProducerFor(broker *Broker) *brokerProducer {
|
|
|
+ p.m.RLock()
|
|
|
+ bp, ok := p.brokerProducers[broker]
|
|
|
+ p.m.RUnlock()
|
|
|
+ if !ok {
|
|
|
+ p.m.Lock()
|
|
|
+ bp, ok := p.brokerProducers[broker]
|
|
|
+ if !ok {
|
|
|
+ bp = p.newBrokerProducer(broker)
|
|
|
+ p.brokerProducers[broker] = bp
|
|
|
+ }
|
|
|
+ p.m.Unlock()
|
|
|
+ }
|
|
|
+ return bp
|
|
|
+}
|
|
|
+
|
|
|
func (p *MultiProducer) newBrokerProducer(broker *Broker) *brokerProducer {
|
|
|
bp := &brokerProducer{
|
|
|
- broker: broker,
|
|
|
+ messages: make(map[string]map[int32][]*produceMessage),
|
|
|
flushNow: make(chan bool),
|
|
|
+ broker: broker,
|
|
|
stopper: make(chan bool),
|
|
|
}
|
|
|
|
|
|
maxBufferTime := time.Duration(p.config.MaxBufferTime) * time.Millisecond
|
|
|
|
|
|
- initNow := make(chan bool)
|
|
|
+ var wg sync.WaitGroup
|
|
|
+ wg.Add(1)
|
|
|
+
|
|
|
go func() {
|
|
|
timer := time.NewTimer(maxBufferTime)
|
|
|
- close(initNow)
|
|
|
+ wg.Done()
|
|
|
for {
|
|
|
select {
|
|
|
case <-bp.flushNow:
|
|
|
- p.flush(bp)
|
|
|
+ bp.flush(p)
|
|
|
case <-timer.C:
|
|
|
- p.flush(bp)
|
|
|
+ bp.flush(p)
|
|
|
case <-bp.stopper:
|
|
|
p.m.Lock()
|
|
|
delete(p.brokerProducers, bp.broker)
|
|
|
p.m.Unlock()
|
|
|
- p.flush(bp)
|
|
|
+ bp.flush(p)
|
|
|
p.client.disconnectBroker(bp.broker)
|
|
|
close(bp.flushNow)
|
|
|
return
|
|
|
@@ -147,99 +203,72 @@ func (p *MultiProducer) newBrokerProducer(broker *Broker) *brokerProducer {
|
|
|
timer.Reset(maxBufferTime)
|
|
|
}
|
|
|
}()
|
|
|
- <-initNow
|
|
|
+ wg.Wait()
|
|
|
|
|
|
return bp
|
|
|
}
|
|
|
|
|
|
-func (p *MultiProducer) brokerProducerFor(broker *Broker) *brokerProducer {
|
|
|
- p.m.RLock()
|
|
|
- bp, ok := p.brokerProducers[broker]
|
|
|
- p.m.RUnlock()
|
|
|
+func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32, isRetry bool) {
|
|
|
+ bp.mapM.Lock()
|
|
|
+ forTopic, ok := bp.messages[msg.topic]
|
|
|
if !ok {
|
|
|
- p.m.Lock()
|
|
|
- bp, ok = p.brokerProducers[broker]
|
|
|
- if !ok {
|
|
|
- bp = p.newBrokerProducer(broker)
|
|
|
- p.brokerProducers[broker] = bp
|
|
|
- }
|
|
|
- p.m.Unlock()
|
|
|
+ forTopic = make(map[int32][]*produceMessage)
|
|
|
+ bp.messages[msg.topic] = forTopic
|
|
|
}
|
|
|
- return bp
|
|
|
-}
|
|
|
-
|
|
|
-func (p *MultiProducer) isSynchronous() bool {
|
|
|
- return p.config.MaxBufferTime == 0 && p.config.MaxBufferBytes < 2
|
|
|
-}
|
|
|
-
|
|
|
-// Shouldn't be used if operating in synchronous mode.
|
|
|
-func (p *MultiProducer) Errors() <-chan error {
|
|
|
- if p.isSynchronous() {
|
|
|
- panic("you can't use Errors() when operating in synchronous mode")
|
|
|
+ if isRetry {
|
|
|
+ // Prepend: Deliver first.
|
|
|
+ forTopic[msg.partition] = append([]*produceMessage{msg}, forTopic[msg.partition]...)
|
|
|
} else {
|
|
|
- return p.errors
|
|
|
+ // Append
|
|
|
+ forTopic[msg.partition] = append(forTopic[msg.partition], msg)
|
|
|
+ }
|
|
|
+ bp.bufferedBytes += uint32(len(msg.key) + len(msg.value))
|
|
|
+ bp.mapM.Unlock()
|
|
|
+ if bp.bufferedBytes > maxBufferBytes {
|
|
|
+ bp.tryFlush()
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (bp *brokerProducer) addMessage(topic string, partition int32, message *Message, maxBytes uint32) {
|
|
|
- bp.request.AddMessage(topic, partition, message)
|
|
|
- bp.bufferedBytes += uint32(len(message.Key) + len(message.Value))
|
|
|
- if bp.bufferedBytes > maxBytes {
|
|
|
- select {
|
|
|
- case bp.flushNow <- true:
|
|
|
- default:
|
|
|
- }
|
|
|
+func (bp *brokerProducer) tryFlush() {
|
|
|
+ select {
|
|
|
+ case <-bp.flushNow:
|
|
|
+ default:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (p *MultiProducer) newProduceRequest() *ProduceRequest {
|
|
|
- return &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: p.config.Timeout}
|
|
|
-}
|
|
|
+func (bp *brokerProducer) flush(p *MultiProducer) {
|
|
|
+ // try to acquire delivery locks for each topic-partition involved.
|
|
|
|
|
|
-func (p *MultiProducer) addMessageForBroker(broker *Broker, topic string, partition int32, keyBytes, valBytes []byte) error {
|
|
|
- bp := p.brokerProducerFor(broker)
|
|
|
+ var messagesToSend []*produceMessage
|
|
|
|
|
|
- bp.Lock()
|
|
|
- if bp.request == nil {
|
|
|
- bp.request = p.newProduceRequest()
|
|
|
- }
|
|
|
- msg := &Message{Codec: p.config.Compression, Key: keyBytes, Value: valBytes}
|
|
|
- bp.addMessage(topic, partition, msg, p.config.MaxBufferBytes)
|
|
|
- bp.Unlock()
|
|
|
+ bp.mapM.Lock()
|
|
|
+ for topic, m := range bp.messages {
|
|
|
+ for partition, messages := range m {
|
|
|
+ if p.tryAcquireDeliveryLock(topic, partition) {
|
|
|
|
|
|
- if p.isSynchronous() {
|
|
|
- return <-p.errors
|
|
|
- } else {
|
|
|
- return nil
|
|
|
- }
|
|
|
-}
|
|
|
+ messagesToSend = append(messagesToSend, messages...)
|
|
|
+ m[partition] = nil
|
|
|
|
|
|
-func (p *MultiProducer) safeSendMessage(topic string, key, value Encoder, retry bool) error {
|
|
|
- partition, err := p.choosePartition(topic, key)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
+ bp.mapM.Unlock()
|
|
|
|
|
|
- var keyBytes []byte
|
|
|
- var valBytes []byte
|
|
|
+ go bp.flushMessages(p, messagesToSend)
|
|
|
+}
|
|
|
|
|
|
- if key != nil {
|
|
|
- keyBytes, err = key.Encode()
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- }
|
|
|
- valBytes, err = value.Encode()
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
+func (bp *brokerProducer) flushMessages(p *MultiProducer, messages []*produceMessage) {
|
|
|
+ if len(messages) == 0 {
|
|
|
+ return
|
|
|
}
|
|
|
|
|
|
- broker, err := p.client.Leader(topic, partition)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
+ req := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: p.config.Timeout}
|
|
|
+ for _, pmsg := range messages {
|
|
|
+ msg := &Message{Codec: p.config.Compression, Key: pmsg.key, Value: pmsg.value}
|
|
|
+ req.AddMessage(pmsg.topic, pmsg.partition, msg)
|
|
|
}
|
|
|
|
|
|
- return p.addMessageForBroker(broker, topic, partition, keyBytes, valBytes)
|
|
|
+ bp.flushRequest(p, req, messages)
|
|
|
}
|
|
|
|
|
|
func (bp *brokerProducer) Close() error {
|
|
|
@@ -247,148 +276,135 @@ func (bp *brokerProducer) Close() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (p *MultiProducer) flush(bp *brokerProducer) {
|
|
|
- bp.Lock()
|
|
|
- req := bp.request
|
|
|
- bp.request = nil
|
|
|
- bp.bufferedBytes = 0
|
|
|
- bp.Unlock()
|
|
|
- if req != nil {
|
|
|
- p.flushRequest(bp, true, req)
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
-func (p *MultiProducer) lockDelivery(topic string, partition int32) {
|
|
|
- p.deliveryMutexChan(topic, partition) <- true
|
|
|
-}
|
|
|
-
|
|
|
-func (p *MultiProducer) unlockDelivery(topic string, partition int32) {
|
|
|
- <-p.deliveryMutexChan(topic, partition)
|
|
|
-}
|
|
|
-
|
|
|
-func (p *MultiProducer) deliveryMutexChan(topic string, partition int32) chan bool {
|
|
|
- p.deliveryMapM.RLock()
|
|
|
- submap, ok := p.deliveryMutexes[topic]
|
|
|
- p.deliveryMapM.RUnlock()
|
|
|
- if !ok {
|
|
|
- p.deliveryMapM.Lock()
|
|
|
- submap, ok = p.deliveryMutexes[topic]
|
|
|
- if !ok {
|
|
|
- submap = make(map[int32]chan bool)
|
|
|
- p.deliveryMutexes[topic] = submap
|
|
|
- }
|
|
|
- p.deliveryMapM.Unlock()
|
|
|
- }
|
|
|
- p.deliveryMapM.RLock()
|
|
|
- chn, ok := submap[partition]
|
|
|
- p.deliveryMapM.RUnlock()
|
|
|
- if !ok {
|
|
|
- p.deliveryMapM.Lock()
|
|
|
- chn = make(chan bool, 1)
|
|
|
- submap[partition] = chn
|
|
|
- p.deliveryMapM.Unlock()
|
|
|
- }
|
|
|
- return chn
|
|
|
-}
|
|
|
-
|
|
|
-// flushRequest must push one and exactly one message onto p.errors when given only one topic-partition.
|
|
|
-func (p *MultiProducer) flushRequest(bp *brokerProducer, retry bool, request *ProduceRequest) {
|
|
|
-
|
|
|
+func (bp *brokerProducer) flushRequest(p *MultiProducer, request *ProduceRequest, messages []*produceMessage) {
|
|
|
response, err := bp.broker.Produce(p.client.id, request)
|
|
|
|
|
|
- if retry {
|
|
|
- // If this is the first attempt to deliver, lock the delivery mutex for each topic-partition involved.
|
|
|
- // These will be released when delivery succeeds. This locking preserves ordering in the face of broker failure.
|
|
|
-
|
|
|
- for topic, d := range request.msgSets {
|
|
|
- for partition := range d {
|
|
|
- // TODO: Is it possible to deadlock here? Do we need some sort of manager?
|
|
|
- // At a first glance it seems like yes, but I think the way partitions map onto brokers,
|
|
|
- // the answer may actually be no. Need to think harder about this anyhow.
|
|
|
- p.lockDelivery(topic, partition)
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
switch err {
|
|
|
case nil:
|
|
|
break
|
|
|
case EncodingError:
|
|
|
+ // No sense in retrying; it'll just fail again. But what about all the other
|
|
|
+ // messages that weren't invalid? Really, this is a "shit's broke real good"
|
|
|
+ // scenario, so angrily logging it and moving on is probably acceptable.
|
|
|
p.errors <- err
|
|
|
- return
|
|
|
+ goto releaseAllLocks
|
|
|
default:
|
|
|
- if !retry {
|
|
|
- p.errors <- err
|
|
|
- return
|
|
|
- }
|
|
|
+ // TODO: Now we have to sift through the messages and determine which should be retried.
|
|
|
|
|
|
+ p.client.disconnectBroker(bp.broker)
|
|
|
bp.Close()
|
|
|
|
|
|
- for topic, d := range request.msgSets {
|
|
|
- for partition, msgSet := range d {
|
|
|
-
|
|
|
- otherBroker, err := p.client.Leader(topic, partition)
|
|
|
- if err != nil {
|
|
|
- p.errors <- err
|
|
|
- return
|
|
|
- }
|
|
|
- otherBp := p.brokerProducerFor(otherBroker)
|
|
|
-
|
|
|
- retryReq := p.newProduceRequest()
|
|
|
- for _, msgBlock := range msgSet.Messages {
|
|
|
- retryReq.AddMessage(topic, partition, msgBlock.Msg)
|
|
|
- }
|
|
|
- p.flushRequest(otherBp, false, retryReq)
|
|
|
-
|
|
|
+ // ie. for msg := range reverse(messages)
|
|
|
+ for i := len(messages) - 1; i >= 0; i-- {
|
|
|
+ msg := messages[i]
|
|
|
+ if msg.failures < p.config.MaxDeliveryRetries {
|
|
|
+ msg.failures++
|
|
|
+ // Passing isRetry=true causes the message to happen before other queued messages.
|
|
|
+ // This is also why we have to iterate backwards through the failed messages --
|
|
|
+ // to preserve ordering, we have to prepend the items starting from the last one.
|
|
|
+ p.addMessage(msg, true)
|
|
|
+ } else {
|
|
|
+ // log about message failing too many times?
|
|
|
}
|
|
|
}
|
|
|
+ goto releaseAllLocks
|
|
|
}
|
|
|
|
|
|
+ // When does this ever actually happen, and why don't we explode when it does?
|
|
|
+ // This seems bad.
|
|
|
if response == nil {
|
|
|
p.errors <- nil
|
|
|
- return
|
|
|
+ goto releaseAllLocks
|
|
|
}
|
|
|
|
|
|
for topic, d := range response.Blocks {
|
|
|
for partition, block := range d {
|
|
|
if block == nil {
|
|
|
- p.errors <- IncompleteResponse
|
|
|
- continue
|
|
|
+ // IncompleteResponse. Here we just drop all the messages; we don't know whether
|
|
|
+ // they were successfully sent or not. Non-ideal, but how often does it happen?
|
|
|
+ // Log angrily.
|
|
|
}
|
|
|
-
|
|
|
switch block.Err {
|
|
|
case NoError:
|
|
|
- p.errors <- nil
|
|
|
- // This is where we know we've succeeded. Hence, we can unlock the mutex for the relevant topic-partition.
|
|
|
-
|
|
|
- p.unlockDelivery(topic, partition)
|
|
|
-
|
|
|
+ // All the messages for this topic-partition were delivered successfully!
|
|
|
+ // Unlock delivery for this topic-partition and discard the produceMessage objects.
|
|
|
case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
|
|
|
- if retry {
|
|
|
-
|
|
|
- msgSet := request.msgSets[topic][partition]
|
|
|
-
|
|
|
- otherBroker, err := p.client.Leader(topic, partition)
|
|
|
- if err != nil {
|
|
|
- p.errors <- err
|
|
|
- continue
|
|
|
+ // TODO: should we refresh metadata for this topic?
|
|
|
+
|
|
|
+ // ie. for msg := range reverse(messages)
|
|
|
+ for i := len(messages) - 1; i >= 0; i-- {
|
|
|
+ msg := messages[i]
|
|
|
+ if msg.topic == topic && msg.partition == partition {
|
|
|
+ if msg.failures < p.config.MaxDeliveryRetries {
|
|
|
+ msg.failures++
|
|
|
+ // Passing isRetry=true causes the message to happen before other queued messages.
|
|
|
+ // This is also why we have to iterate backwards through the failed messages --
|
|
|
+ // to preserve ordering, we have to prepend the items starting from the last one.
|
|
|
+ p.addMessage(msg, true)
|
|
|
+ } else {
|
|
|
+ // dropping message; log angrily maybe.
|
|
|
+ }
|
|
|
}
|
|
|
- otherBp := p.brokerProducerFor(otherBroker)
|
|
|
-
|
|
|
- retryReq := p.newProduceRequest()
|
|
|
- for _, msgBlock := range msgSet.Messages {
|
|
|
- retryReq.AddMessage(topic, partition, msgBlock.Msg)
|
|
|
- }
|
|
|
- p.flushRequest(otherBp, false, retryReq)
|
|
|
-
|
|
|
- } else {
|
|
|
- p.errors <- block.Err
|
|
|
}
|
|
|
default:
|
|
|
- p.errors <- block.Err
|
|
|
+ // non-retriable error. Drop the messages and log angrily.
|
|
|
}
|
|
|
+ p.releaseDeliveryLock(topic, partition)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ return
|
|
|
+
|
|
|
+releaseAllLocks:
|
|
|
+ // This is slow, but only happens on rare error conditions.
|
|
|
+
|
|
|
+ tps := make(map[string]map[int32]bool)
|
|
|
+ for _, msg := range messages {
|
|
|
+ forTopic, ok := tps[msg.topic]
|
|
|
+ if !ok {
|
|
|
+ forTopic = make(map[int32]bool)
|
|
|
+ tps[msg.topic] = forTopic
|
|
|
+ }
|
|
|
+ forTopic[msg.partition] = true
|
|
|
+ }
|
|
|
+
|
|
|
+ for topic, d := range tps {
|
|
|
+ for partition := range d {
|
|
|
+ p.releaseDeliveryLock(topic, partition)
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (p *MultiProducer) tryAcquireDeliveryLock(topic string, partition int32) bool {
|
|
|
+ tp := topicPartition{topic, partition}
|
|
|
+ p.dm.RLock()
|
|
|
+ ch, ok := p.deliveryLocks[tp]
|
|
|
+ p.dm.RUnlock()
|
|
|
+ if !ok {
|
|
|
+ p.dm.Lock()
|
|
|
+ ch, ok = p.deliveryLocks[tp]
|
|
|
+ if !ok {
|
|
|
+ ch = make(chan bool, 1)
|
|
|
+ p.deliveryLocks[tp] = ch
|
|
|
+ }
|
|
|
+ p.dm.Unlock()
|
|
|
+ }
|
|
|
+
|
|
|
+ select {
|
|
|
+ case ch <- true:
|
|
|
+ return true
|
|
|
+ default:
|
|
|
+ return false
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+func (p *MultiProducer) releaseDeliveryLock(topic string, partition int32) {
|
|
|
+ p.dm.RLock()
|
|
|
+ ch := p.deliveryLocks[topicPartition{topic, partition}]
|
|
|
+ p.dm.RUnlock()
|
|
|
+ select {
|
|
|
+ case <-ch:
|
|
|
+ default:
|
|
|
+ panic("Serious logic bug: releaseDeliveryLock called without acquiring lock first.")
|
|
|
+ }
|
|
|
}
|