Browse Source

bugfixes uncovered by perf test

Burke Libbey 12 years ago
parent
commit
3a5ef42df9
1 changed files with 11 additions and 4 deletions
  1. 11 4
      producer.go

+ 11 - 4
producer.go

@@ -58,6 +58,7 @@ type brokerProducer struct {
 	flushNow      chan bool
 	flushNow      chan bool
 	broker        *Broker
 	broker        *Broker
 	stopper       chan bool
 	stopper       chan bool
+	done          chan bool
 	hasMessages   chan bool
 	hasMessages   chan bool
 }
 }
 
 
@@ -233,6 +234,7 @@ func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
 		flushNow:    make(chan bool, 1),
 		flushNow:    make(chan bool, 1),
 		broker:      broker,
 		broker:      broker,
 		stopper:     make(chan bool),
 		stopper:     make(chan bool),
+		done:        make(chan bool),
 		hasMessages: make(chan bool, 1),
 		hasMessages: make(chan bool, 1),
 	}
 	}
 
 
@@ -251,13 +253,17 @@ func (p *Producer) newBrokerProducer(broker *Broker) *brokerProducer {
 			case <-timer.C:
 			case <-timer.C:
 				bp.flush(p)
 				bp.flush(p)
 			case <-bp.stopper:
 			case <-bp.stopper:
-				p.m.Lock()
 				delete(p.brokerProducers, bp.broker)
 				delete(p.brokerProducers, bp.broker)
-				p.m.Unlock()
-				bp.flush(p)
+				select {
+				case <-bp.hasMessages:
+					bp.hasMessages <- true
+					bp.flush(p)
+				default:
+				}
 				p.client.disconnectBroker(bp.broker)
 				p.client.disconnectBroker(bp.broker)
 				close(bp.flushNow)
 				close(bp.flushNow)
 				close(bp.hasMessages)
 				close(bp.hasMessages)
+				close(bp.done)
 				return
 				return
 			}
 			}
 			timer.Reset(maxBufferTime)
 			timer.Reset(maxBufferTime)
@@ -322,7 +328,7 @@ func (bp *brokerProducer) flush(p *Producer) {
 	}
 	}
 	bp.mapM.Unlock()
 	bp.mapM.Unlock()
 
 
-	go bp.flushMessages(p, messagesToSend)
+	bp.flushMessages(p, messagesToSend)
 }
 }
 
 
 func (bp *brokerProducer) flushMessages(p *Producer, messages []*produceMessage) {
 func (bp *brokerProducer) flushMessages(p *Producer, messages []*produceMessage) {
@@ -341,6 +347,7 @@ func (bp *brokerProducer) flushMessages(p *Producer, messages []*produceMessage)
 
 
 func (bp *brokerProducer) Close() error {
 func (bp *brokerProducer) Close() error {
 	close(bp.stopper)
 	close(bp.stopper)
+	<-bp.done
 	return nil
 	return nil
 }
 }