|
|
@@ -230,24 +230,31 @@ func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
|
|
|
|
|
|
go func() {
|
|
|
timer := time.NewTimer(maxBufferTime)
|
|
|
+ var shutdownRequired bool
|
|
|
wg.Done()
|
|
|
for {
|
|
|
select {
|
|
|
case <-bp.flushNow:
|
|
|
+ if shutdownRequired = bp.flush(p); shutdownRequired {
|
|
|
+ goto shutdown
|
|
|
+ }
|
|
|
bp.flush(p)
|
|
|
case <-timer.C:
|
|
|
- bp.flushIfAnyMessages(p)
|
|
|
+ if shutdownRequired = bp.flushIfAnyMessages(p); shutdownRequired {
|
|
|
+ goto shutdown
|
|
|
+ }
|
|
|
case <-bp.stopper:
|
|
|
- delete(p.brokerProducers, bp.broker)
|
|
|
- bp.flushIfAnyMessages(p)
|
|
|
- p.client.disconnectBroker(bp.broker)
|
|
|
- close(bp.flushNow)
|
|
|
- close(bp.hasMessages)
|
|
|
- close(bp.done)
|
|
|
- return
|
|
|
+ goto shutdown
|
|
|
}
|
|
|
timer.Reset(maxBufferTime)
|
|
|
}
|
|
|
+ shutdown:
|
|
|
+ delete(p.brokerProducers, bp.broker)
|
|
|
+ bp.flushIfAnyMessages(p)
|
|
|
+ p.client.disconnectBroker(bp.broker)
|
|
|
+ close(bp.flushNow)
|
|
|
+ close(bp.hasMessages)
|
|
|
+ close(bp.done)
|
|
|
}()
|
|
|
wg.Wait() // don't return until the G has started
|
|
|
|
|
|
@@ -283,19 +290,20 @@ func (bp *brokerProducer) flushIfOverCapacity(maxBufferBytes uint32) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (bp *brokerProducer) flushIfAnyMessages(p *Producer) {
|
|
|
+func (bp *brokerProducer) flushIfAnyMessages(p *Producer) (shutdownRequired bool) {
|
|
|
select {
|
|
|
case <-bp.hasMessages:
|
|
|
select {
|
|
|
case bp.hasMessages <- true:
|
|
|
default:
|
|
|
}
|
|
|
- bp.flush(p)
|
|
|
+ return bp.flush(p)
|
|
|
default:
|
|
|
}
|
|
|
+ return false
|
|
|
}
|
|
|
|
|
|
-func (bp *brokerProducer) flush(p *Producer) {
|
|
|
+func (bp *brokerProducer) flush(p *Producer) (shutdownRequired bool) {
|
|
|
var prb produceRequestBuilder
|
|
|
|
|
|
// only deliver messages for topic-partitions that are not currently being delivered.
|
|
|
@@ -314,16 +322,17 @@ func (bp *brokerProducer) flush(p *Producer) {
|
|
|
bp.bufferedBytes -= prb.byteSize()
|
|
|
bp.mapM.Unlock()
|
|
|
|
|
|
- bp.flushRequest(p, prb, func(err error) {
|
|
|
+ return bp.flushRequest(p, prb, func(err error) {
|
|
|
if err != nil {
|
|
|
Logger.Println(err)
|
|
|
}
|
|
|
p.errors <- err
|
|
|
})
|
|
|
}
|
|
|
+ return false
|
|
|
}
|
|
|
|
|
|
-func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, errorCb func(error)) {
|
|
|
+func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, errorCb func(error)) (shutdownRequired bool) {
|
|
|
// produce_message.go
|
|
|
req := prb.toRequest(&p.config)
|
|
|
response, err := bp.broker.Produce(p.client.id, req)
|
|
|
@@ -336,10 +345,8 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
|
|
|
// messages that weren't invalid? Really, this is a "shit's broke real good"
|
|
|
// scenario, so logging it and moving on is probably acceptable.
|
|
|
errorCb(err)
|
|
|
- return
|
|
|
+ return false
|
|
|
default:
|
|
|
- bp.Close()
|
|
|
-
|
|
|
overlimit := 0
|
|
|
prb.reverseEach(func(msg *produceMessage) {
|
|
|
if err := msg.reenqueue(p); err != nil {
|
|
|
@@ -349,14 +356,14 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
|
|
|
if overlimit > 0 {
|
|
|
errorCb(DroppedMessagesError{overlimit, nil})
|
|
|
}
|
|
|
- return
|
|
|
+ return true
|
|
|
}
|
|
|
|
|
|
// When does this ever actually happen, and why don't we explode when it does?
|
|
|
// This seems bad.
|
|
|
if response == nil {
|
|
|
errorCb(nil)
|
|
|
- return
|
|
|
+ return false
|
|
|
}
|
|
|
|
|
|
for topic, d := range response.Blocks {
|
|
|
@@ -390,6 +397,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
+ return false
|
|
|
}
|
|
|
|
|
|
func (bp *brokerProducer) Close() error {
|