Browse Source

Got tests passing

Burke Libbey 12 years ago
parent
commit
9c4f6aee8a
1 changed files with 27 additions and 13 deletions
  1. 27 13
      multiproducer.go

+ 27 - 13
multiproducer.go

@@ -26,12 +26,13 @@ type MultiProducer struct {
 }
 
 type brokerProducer struct {
-	messages      map[string]map[int32][]*produceMessage
 	mapM          sync.Mutex
+	messages      map[string]map[int32][]*produceMessage
 	bufferedBytes uint32
 	flushNow      chan bool
 	broker        *Broker
 	stopper       chan bool
+	hasMessages   chan bool
 }
 
 type produceMessage struct {
@@ -68,10 +69,11 @@ func NewMultiProducer(client *Client, config *MultiProducerConfig) (*MultiProduc
 	}
 
 	return &MultiProducer{
-		client:        client,
-		config:        *config,
-		errors:        make(chan error, 16),
-		deliveryLocks: make(map[topicPartition]chan bool),
+		client:          client,
+		config:          *config,
+		errors:          make(chan error, 16),
+		deliveryLocks:   make(map[topicPartition]chan bool),
+		brokerProducers: make(map[*Broker]*brokerProducer),
 	}, nil
 }
 
@@ -96,7 +98,7 @@ func (p *MultiProducer) SendMessage(topic string, key, value Encoder) (err error
 		}
 	}
 	if value != nil {
-		if valBytes, err = key.Encode(); err != nil {
+		if valBytes, err = value.Encode(); err != nil {
 			return err
 		}
 	}
@@ -150,7 +152,7 @@ func (p *MultiProducer) addMessage(msg *produceMessage, isRetry bool) error {
 }
 
 func (p *MultiProducer) isSynchronous() bool {
-	return p.config.MaxBufferBytes == 0 && p.config.MaxBufferTime == 0
+	return p.config.MaxBufferBytes < 2 && p.config.MaxBufferTime == 0
 }
 
 func (p *MultiProducer) brokerProducerFor(broker *Broker) *brokerProducer {
@@ -159,7 +161,7 @@ func (p *MultiProducer) brokerProducerFor(broker *Broker) *brokerProducer {
 	p.m.RUnlock()
 	if !ok {
 		p.m.Lock()
-		bp, ok := p.brokerProducers[broker]
+		bp, ok = p.brokerProducers[broker]
 		if !ok {
 			bp = p.newBrokerProducer(broker)
 			p.brokerProducers[broker] = bp
@@ -171,10 +173,11 @@ func (p *MultiProducer) brokerProducerFor(broker *Broker) *brokerProducer {
 
 func (p *MultiProducer) newBrokerProducer(broker *Broker) *brokerProducer {
 	bp := &brokerProducer{
-		messages: make(map[string]map[int32][]*produceMessage),
-		flushNow: make(chan bool),
-		broker:   broker,
-		stopper:  make(chan bool),
+		messages:    make(map[string]map[int32][]*produceMessage),
+		flushNow:    make(chan bool, 1),
+		broker:      broker,
+		stopper:     make(chan bool),
+		hasMessages: make(chan bool, 1),
 	}
 
 	maxBufferTime := time.Duration(p.config.MaxBufferTime) * time.Millisecond
@@ -198,6 +201,7 @@ func (p *MultiProducer) newBrokerProducer(broker *Broker) *brokerProducer {
 				bp.flush(p)
 				p.client.disconnectBroker(bp.broker)
 				close(bp.flushNow)
+				close(bp.hasMessages)
 				return
 			}
 			timer.Reset(maxBufferTime)
@@ -223,15 +227,22 @@ func (bp *brokerProducer) addMessage(msg *produceMessage, maxBufferBytes uint32,
 		forTopic[msg.partition] = append(forTopic[msg.partition], msg)
 	}
 	bp.bufferedBytes += uint32(len(msg.key) + len(msg.value))
+
+	select {
+	case bp.hasMessages <- true:
+	default:
+	}
+
 	bp.mapM.Unlock()
 	if bp.bufferedBytes > maxBufferBytes {
+		// TODO: decrement this later on
 		bp.tryFlush()
 	}
 }
 
 func (bp *brokerProducer) tryFlush() {
 	select {
-	case <-bp.flushNow:
+	case bp.flushNow <- true:
 	default:
 	}
 }
@@ -241,6 +252,8 @@ func (bp *brokerProducer) flush(p *MultiProducer) {
 
 	var messagesToSend []*produceMessage
 
+	<-bp.hasMessages // wait for a message if the BP currently has none.
+
 	bp.mapM.Lock()
 	for topic, m := range bp.messages {
 		for partition, messages := range m {
@@ -328,6 +341,7 @@ func (bp *brokerProducer) flushRequest(p *MultiProducer, request *ProduceRequest
 			case NoError:
 				// All the messages for this topic-partition were delivered successfully!
 				// Unlock delivery for this topic-partition and discard the produceMessage objects.
+				p.errors <- nil
 			case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
 				// TODO: should we refresh metadata for this topic?