|
|
@@ -33,6 +33,8 @@ type MultiProducer struct {
|
|
|
config MultiProducerConfig
|
|
|
brokerProducers map[*Broker]*brokerProducer
|
|
|
errors chan error
|
|
|
+ deliveryMapM sync.RWMutex
|
|
|
+ deliveryMutexes map[string]map[int32]chan bool
|
|
|
}
|
|
|
|
|
|
// NewMultiProducer creates a new MultiProducer using the given client. The resulting object will buffer/flush Produce messages to Kafka.
|
|
|
@@ -62,6 +64,7 @@ func NewMultiProducer(client *Client, config *MultiProducerConfig) (*MultiProduc
|
|
|
p.config = *config
|
|
|
p.errors = make(chan error, 16)
|
|
|
p.brokerProducers = make(map[*Broker]*brokerProducer)
|
|
|
+ p.deliveryMutexes = make(map[string]map[int32]chan bool)
|
|
|
|
|
|
return p, nil
|
|
|
}
|
|
|
@@ -73,6 +76,14 @@ func (p *MultiProducer) Close() error {
|
|
|
p.m.Lock()
|
|
|
defer p.m.Unlock()
|
|
|
|
|
|
+ p.deliveryMapM.Lock()
|
|
|
+ for _, d := range p.deliveryMutexes {
|
|
|
+ for _, ch := range d {
|
|
|
+ close(ch)
|
|
|
+ }
|
|
|
+ }
|
|
|
+ p.deliveryMapM.Unlock()
|
|
|
+
|
|
|
for _, bp := range p.brokerProducers {
|
|
|
bp.Close()
|
|
|
}
|
|
|
@@ -247,11 +258,59 @@ func (p *MultiProducer) flush(bp *brokerProducer) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+func (p *MultiProducer) lockDelivery(topic string, partition int32) {
|
|
|
+ p.deliveryMutexChan(topic, partition) <- true
|
|
|
+}
|
|
|
+
|
|
|
+func (p *MultiProducer) unlockDelivery(topic string, partition int32) {
|
|
|
+ <-p.deliveryMutexChan(topic, partition)
|
|
|
+}
|
|
|
+
|
|
|
+func (p *MultiProducer) deliveryMutexChan(topic string, partition int32) chan bool {
|
|
|
+ p.deliveryMapM.RLock()
|
|
|
+ submap, ok := p.deliveryMutexes[topic]
|
|
|
+ p.deliveryMapM.RUnlock()
|
|
|
+ if !ok {
|
|
|
+ p.deliveryMapM.Lock()
|
|
|
+ submap, ok = p.deliveryMutexes[topic]
|
|
|
+ if !ok {
|
|
|
+ submap = make(map[int32]chan bool)
|
|
|
+ p.deliveryMutexes[topic] = submap
|
|
|
+ }
|
|
|
+ p.deliveryMapM.Unlock()
|
|
|
+ }
|
|
|
+ p.deliveryMapM.RLock()
|
|
|
+ chn, ok := submap[partition]
|
|
|
+ p.deliveryMapM.RUnlock()
|
|
|
+ if !ok {
|
|
|
+ p.deliveryMapM.Lock()
|
|
|
+ chn = make(chan bool, 1)
|
|
|
+ submap[partition] = chn
|
|
|
+ p.deliveryMapM.Unlock()
|
|
|
+ }
|
|
|
+ return chn
|
|
|
+}
|
|
|
+
|
|
|
// flushRequest must push one and exactly one message onto p.errors when given only one topic-partition.
|
|
|
func (p *MultiProducer) flushRequest(bp *brokerProducer, retry bool, request *ProduceRequest) {
|
|
|
|
|
|
response, err := bp.broker.Produce(p.client.id, request)
|
|
|
|
|
|
+ if retry {
|
|
|
+ // If this is the first attempt to deliver, lock the delivery mutex for each topic-partition involved.
|
|
|
+ // These will be released when delivery succeeds. This locking preserves ordering in the face of broker failure.
|
|
|
+
|
|
|
+ for topic, d := range request.msgSets {
|
|
|
+ for partition := range d {
|
|
|
+ // TODO: Is it possible to deadlock here? Do we need some sort of manager?
|
|
|
+ // At a first glance it seems like yes, but I think the way partitions map onto brokers,
|
|
|
+ // the answer may actually be no. Need to think harder about this anyhow.
|
|
|
+ p.lockDelivery(topic, partition)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
switch err {
|
|
|
case nil:
|
|
|
break
|
|
|
@@ -301,6 +360,9 @@ func (p *MultiProducer) flushRequest(bp *brokerProducer, retry bool, request *Pr
|
|
|
switch block.Err {
|
|
|
case NoError:
|
|
|
p.errors <- nil
|
|
|
+ // This is where we know we've succeeded. Hence, we can unlock the mutex for the relevant topic-partition.
|
|
|
+
|
|
|
+ p.unlockDelivery(topic, partition)
|
|
|
|
|
|
case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
|
|
|
if retry {
|