Burke Libbey 12 gadi atpakaļ
vecāks
revīzija
0bc1777419
2 mainītis faili ar 33 papildinājumiem un 46 dzēšanām
  1. 23 5
      produce_message.go
  2. 10 41
      producer.go

+ 23 - 5
produce_message.go

@@ -4,18 +4,36 @@ type produceMessage struct {
 	tp         topicPartition
 	key, value []byte
 	failures   uint32
+	sync       bool
 }
 
 type produceRequestBuilder []*produceMessage
 
-func (msg *produceMessage) reenqueue(p *Producer) (ok bool) {
+func (msg *produceMessage) enqueue(p *Producer) error {
+	if msg.sync {
+		var prb produceRequestBuilder = []*produceMessage{msg}
+		bp, err := p.brokerProducerFor(msg.tp)
+		if err != nil {
+			return err
+		}
+		errs := make(chan error, 1)
+		bp.flushRequest(p, prb, func(err error) {
+			errs <- err
+		})
+		return <-errs
+	} else {
+		p.addMessage(msg)
+		return nil
+	}
+
+}
+
+func (msg *produceMessage) reenqueue(p *Producer) error {
 	if msg.failures < p.config.MaxDeliveryRetries {
 		msg.failures++
-		p.addMessage(msg)
-		return true
-	} else {
-		return false
+		return msg.enqueue(p)
 	}
+	return nil
 }
 
 func (msg *produceMessage) hasTopicPartition(topic string, partition int32) bool {

+ 10 - 41
producer.go

@@ -135,33 +135,8 @@ func (p *Producer) Close() error {
 //
 // If you care about message ordering, you should not call QueueMessage and
 // SendMessage on the same Producer.
-func (p *Producer) QueueMessage(topic string, key, value Encoder) (err error) {
-	var keyBytes, valBytes []byte
-
-	if key != nil {
-		if keyBytes, err = key.Encode(); err != nil {
-			return err
-		}
-	}
-	if value != nil {
-		if valBytes, err = value.Encode(); err != nil {
-			return err
-		}
-	}
-
-	partition, err := p.choosePartition(topic, key)
-	if err != nil {
-		return err
-	}
-
-	msg := &produceMessage{
-		tp:       topicPartition{topic, partition},
-		key:      keyBytes,
-		value:    valBytes,
-		failures: 0,
-	}
-
-	return p.addMessage(msg)
+func (p *Producer) QueueMessage(topic string, key, value Encoder) error {
+	return p.genericSendMessage(topic, key, value, false)
 }
 
 // SendMessage sends a message with the given key and value to the given topic.
@@ -176,6 +151,10 @@ func (p *Producer) QueueMessage(topic string, key, value Encoder) (err error) {
 // If you care about message ordering, you should not call QueueMessage and
 // SendMessage on the same Producer.
 func (p *Producer) SendMessage(topic string, key, value Encoder) (err error) {
+	return p.genericSendMessage(topic, key, value, true)
+}
+
+func (p *Producer) genericSendMessage(topic string, key, value Encoder, synchronous bool) (err error) {
 	var keyBytes, valBytes []byte
 
 	if key != nil {
@@ -199,20 +178,10 @@ func (p *Producer) SendMessage(topic string, key, value Encoder) (err error) {
 		key:      keyBytes,
 		value:    valBytes,
 		failures: 0,
+		sync:     synchronous,
 	}
 
-	bp, err := p.brokerProducerFor(msg.tp)
-	if err != nil {
-		return err
-	}
-
-	// TODO: don't pass through QueueMessage pipeline if failed.
-	var prb produceRequestBuilder = []*produceMessage{msg}
-	errs := make(chan error, 1)
-	bp.flushRequest(p, prb, func(err error) {
-		errs <- err
-	})
-	return <-errs
+	return msg.enqueue(p)
 }
 
 func (p *Producer) addMessage(msg *produceMessage) error {
@@ -377,7 +346,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 
 		overlimit := 0
 		prb.reverseEach(func(msg *produceMessage) {
-			if ok := msg.reenqueue(p); !ok {
+			if err := msg.reenqueue(p); err != nil {
 				overlimit++
 			}
 		})
@@ -415,7 +384,7 @@ func (bp *brokerProducer) flushRequest(p *Producer, prb produceRequestBuilder, e
 				overlimit := 0
 				prb.reverseEach(func(msg *produceMessage) {
 					if msg.hasTopicPartition(topic, partition) {
-						if ok := msg.reenqueue(p); !ok {
+						if err := msg.reenqueue(p); err != nil {
 							overlimit++
 						}
 					}