|
@@ -266,7 +266,10 @@ func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32)
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
func (bp *brokerProducer) flushIfOverCapacity(maxBufferBytes uint32) {
|
|
func (bp *brokerProducer) flushIfOverCapacity(maxBufferBytes uint32) {
|
|
|
- if bp.bufferedBytes > maxBufferBytes {
|
|
|
|
|
|
|
+ bp.mapM.Lock()
|
|
|
|
|
+ over := bp.bufferedBytes > maxBufferBytes
|
|
|
|
|
+ bp.mapM.Unlock()
|
|
|
|
|
+ if over {
|
|
|
select {
|
|
select {
|
|
|
case bp.flushNow <- true:
|
|
case bp.flushNow <- true:
|
|
|
default:
|
|
default:
|