Burke Libbey 12 лет назад
Родитель
Сommit
03fbf23be5
2 измененных файлов с 28 добавлено и 21 удалено
  1. 15 12
      produce_message.go
  2. 13 9
      producer.go

+ 15 - 12
produce_message.go

@@ -9,22 +9,25 @@ type produceMessage struct {
 
 type produceRequestBuilder []*produceMessage
 
+// If the message is synchronous, we manually send it and wait for a return.
+// Otherwise, we just hand it back to the producer to enqueue using the normal
+// method.
 func (msg *produceMessage) enqueue(p *Producer) error {
-	if msg.sync {
-		var prb produceRequestBuilder = []*produceMessage{msg}
-		bp, err := p.brokerProducerFor(msg.tp)
-		if err != nil {
-			return err
-		}
-		errs := make(chan error, 1)
-		bp.flushRequest(p, prb, func(err error) {
-			errs <- err
-		})
-		return <-errs
-	} else {
+	if !msg.sync {
 		return p.addMessage(msg)
 	}
 
+	var prb produceRequestBuilder = []*produceMessage{msg}
+	bp, err := p.brokerProducerFor(msg.tp)
+	if err != nil {
+		return err
+	}
+	errs := make(chan error, 1)
+	bp.flushRequest(p, prb, func(err error) {
+		errs <- err
+	})
+	return <-errs
+
 }
 
 func (msg *produceMessage) reenqueue(p *Producer) error {

+ 13 - 9
producer.go

@@ -33,13 +33,15 @@ type ProducerConfig struct {
 // scope (this is in addition to calling Close on the underlying client, which
 // is still necessary).
 //
-// If MaxBufferedBytes=0 and MaxBufferTime=0, the Producer is considered to be
-// operating in "synchronous" mode. This means that errors will be returned
-// directly from calls to SendMessage. If either value is greater than zero, the
-// Producer is operating in "asynchronous" mode, and you must read these return
-// values back from the channel returned by Errors(). Note that you actually
-// *must* read these error values: The channel has a fixed capacity, and the
-// producer will block if it's full.
+// The default values for MaxBufferedBytes and MaxBufferTime cause sarama to
+// deliver messages immediately, but to buffer subsequent messages while a
+// previous request is in-flight. This is often the correct behaviour.
+//
+// If synchronous operation is desired, you can use SendMessage. This will cause
+// sarama to block until the broker has returned a value. Normally, you will
+// want to use QueueMessage instead, and read the error back from the Errors()
+// channel. Note that when using QueueMessage, you *must* read the values from
+// the Errors() channel, or sarama will block indefinitely after a few requests.
 type Producer struct {
 	client          *Client
 	config          ProducerConfig
@@ -112,7 +114,6 @@ func (p *Producer) Errors() chan error {
 // You must call this function before a producer object passes out of scope, as
 // it may otherwise leak memory. You must call this before calling Close on the
 // underlying client.
-// TODO: This should lock something.
 func (p *Producer) Close() error {
 	for _, bp := range p.brokerProducers {
 		bp.Close()
@@ -134,7 +135,8 @@ func (p *Producer) Close() error {
 // Errors(), otherwise the producer will stall after some number of errors.
 //
 // If you care about message ordering, you should not call QueueMessage and
-// SendMessage on the same Producer.
+// SendMessage on the same Producer. Either, used alone, preserves ordering,
+// however.
 func (p *Producer) QueueMessage(topic string, key, value Encoder) error {
 	return p.genericSendMessage(topic, key, value, false)
 }
@@ -173,6 +175,7 @@ func (p *Producer) genericSendMessage(topic string, key, value Encoder, synchron
 		return err
 	}
 
+	// produce_message.go
 	msg := &produceMessage{
 		tp:       topicPartition{topic, partition},
 		key:      keyBytes,
@@ -181,6 +184,7 @@ func (p *Producer) genericSendMessage(topic string, key, value Encoder, synchron
 		sync:     synchronous,
 	}
 
+	// produce_message.go
 	return msg.enqueue(p)
 }