Browse Source

Extract Synchronous to its own option, making zeromq-style buffering the default

Burke Libbey 12 years ago
parent
commit
1f3c45ff26
1 changed files with 24 additions and 6 deletions
  1. 24 6
      producer.go

+ 24 - 6
producer.go

@@ -6,11 +6,24 @@ import (
 )
 
 // ProducerConfig is used to pass multiple configuration options to NewProducer.
+//
+// If Synchronous=true, messages are not buffered, and are delivered one at a
+// time, with errors being returned from SendMessage.
+//
+// If Synchronous=false and MaxBufferTime=MaxBufferBytes=0, messages will be
+// immediately and constantly, but if multiple messages are received while a
+// roundtrip to kafka is in progress, they will both be combined into the next
+// request. In this mode, errors are not returned from SendMessage, but over the
+// Errors() channel.
+//
+// With Synchronous=false and MaxBufferTime and/or MaxBufferBytes set to values
+// > 0, sarama will buffer messages before sending, to reduce traffic.
 type ProducerConfig struct {
 	Partitioner        Partitioner      // Chooses the partition to send messages to, or randomly if this is nil.
 	RequiredAcks       RequiredAcks     // The level of acknowledgement reliability needed from the broker (defaults to no acknowledgement).
 	Timeout            int32            // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks.
 	Compression        CompressionCodec // The type of compression to use on messages (defaults to no compression).
+	Synchronous        bool             // If true, errors are returned from SendMessage and no buffering happens. MaxBuffer* parameters must not be set.
 	MaxBufferBytes     uint32           // The maximum number of bytes to buffer per-broker before sending to Kafka.
 	MaxBufferTime      uint32           // The maximum number of milliseconds to buffer messages before sending to a broker.
 	MaxDeliveryRetries uint32           // The number of times to retry a failed message. You should always specify at least 1.
@@ -82,6 +95,15 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 		config.Partitioner = NewRandomPartitioner()
 	}
 
+	if config.Synchronous {
+		if config.MaxBufferBytes != 0 {
+			return nil, ConfigurationError("Synchronous and MaxBufferBytes conflict")
+		}
+		if config.MaxBufferTime != 0 {
+			return nil, ConfigurationError("Synchronous and MaxBufferTime conflict")
+		}
+	}
+
 	if config.MaxBufferBytes == 0 {
 		config.MaxBufferBytes = 1
 	}
@@ -99,7 +121,7 @@ func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 // while parsing ProduceResponses from kafka. Should never be called in
 // synchronous mode.
 func (p *Producer) Errors() chan error {
-	if p.isSynchronous() {
+	if p.config.Synchronous {
 		panic("use of Errors() is not permitted in synchronous mode.")
 	} else {
 		return p.errors
@@ -183,16 +205,12 @@ func (p *Producer) addMessage(msg *produceMessage, isRetry bool) error {
 	bp := p.brokerProducerFor(broker)
 	bp.addMessage(msg, p.config.MaxBufferBytes, isRetry)
 
-	if p.isSynchronous() {
+	if p.config.Synchronous {
 		return <-p.errors
 	}
 	return nil
 }
 
-func (p *Producer) isSynchronous() bool {
-	return p.config.MaxBufferBytes < 2 && p.config.MaxBufferTime == 0
-}
-
 func (p *Producer) brokerProducerFor(broker *Broker) *brokerProducer {
 	p.m.RLock()
 	bp, ok := p.brokerProducers[broker]