Просмотр исходного кода

Add ProducerConfig in line with ClientConfig

Also add ConfigurationError type for use reporting invalid configuration values.
Evan Huus 12 лет назад
Родитель
Сommit
6e2e9ed805
4 измененных файлов с 51 добавлено и 21 удалено
  1. 4 0
      client.go
  2. 8 0
      errors.go
  3. 31 19
      producer.go
  4. 8 2
      producer_test.go

+ 4 - 0
client.go

@@ -28,6 +28,10 @@ type Client struct {
 // host:port address, and uses that broker to automatically fetch metadata on the rest of the kafka cluster.
 // If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
 func NewClient(id string, host string, port int32, config ClientConfig) (client *Client, err error) {
+	if config.MetadataRetries < 0 {
+		return nil, ConfigurationError("Invalid MetadataRetries")
+	}
+
 	tmp := NewBroker(host, port)
 	err = tmp.Connect()
 	if err != nil {

+ 8 - 0
errors.go

@@ -36,6 +36,14 @@ var InsufficientData = errors.New("kafka: Insufficient data to decode packet, mo
 // This can be a bad CRC or length field, or any other invalid value.
 var DecodingError = errors.New("kafka: Error while decoding packet.")
 
+// ConfigurationError is the type of error returned from NewClient, NewProducer or NewConsumer when the specified
+// configuration is invalid.
+type ConfigurationError string
+
+func (err ConfigurationError) Error() string {
+	return "Invalid Configuration: " + string(err)
+}
+
 // KError is the type of error that can be returned directly by the Kafka broker.
 // See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes
 type KError int16

+ 31 - 19
producer.go

@@ -1,26 +1,45 @@
 package sarama
 
+// ProducerConfig is used to pass multiple configuration options to NewProducer.
+type ProducerConfig struct {
+	Partitioner  Partitioner  // Chooses the partition to send messages to, or randomly if this is nil
+	RequiredAcks RequiredAcks // The level of acknowledgement reliability needed from the broker
+	Timeout      int32        // The maximum time in ms the broker will wait the receipt of the number of RequiredAcks
+}
+
 // Producer publishes Kafka messages on a given topic. It routes messages to the correct broker, refreshing metadata as appropriate,
 // and parses responses for errors. A Producer itself does not need to be closed (thus no Close method) but you still need to close
 // its underlying Client.
 type Producer struct {
-	client      *Client
-	topic       string
-	partitioner Partitioner
+	client *Client
+	topic  string
+	config ProducerConfig
 }
 
-// NewProducer creates a new Producer using the given client. The resulting producer will publish messages on the given topic,
-// and partition messages using the given partitioner.
-func NewProducer(client *Client, topic string, partitioner Partitioner) *Producer {
+// NewProducer creates a new Producer using the given client. The resulting producer will publish messages on the given topic.
+func NewProducer(client *Client, topic string, config ProducerConfig) (*Producer, error) {
+	if config.RequiredAcks < -1 {
+		return nil, ConfigurationError("Invalid RequiredAcks")
+	}
+
+	if config.Timeout < 0 {
+		return nil, ConfigurationError("Invalid Timeout")
+	}
+
 	p := new(Producer)
 	p.client = client
 	p.topic = topic
-	p.partitioner = partitioner
-	return p
+	p.config = config
+
+	if p.config.Partitioner == nil {
+		p.config.Partitioner = RandomPartitioner{}
+	}
+
+	return p, nil
 }
 
-// SendMessage sends a message with the given key and value. If key is nil, the partition to send to is selected randomly, otherwise it
-// is selected by the Producer's Partitioner. To send strings as either key or value, see the StringEncoder type.
+// SendMessage sends a message with the given key and value. The partition to send to is selected by the Producer's Partitioner.
+// To send strings as either key or value, see the StringEncoder type.
 func (p *Producer) SendMessage(key, value Encoder) error {
 	return p.safeSendMessage(key, value, true)
 }
@@ -31,14 +50,7 @@ func (p *Producer) choosePartition(key Encoder) (int32, error) {
 		return -1, err
 	}
 
-	var partitioner Partitioner
-	if key == nil {
-		partitioner = RandomPartitioner{}
-	} else {
-		partitioner = p.partitioner
-	}
-
-	choice := partitioner.Partition(key, len(partitions))
+	choice := p.config.Partitioner.Partition(key, len(partitions))
 
 	if choice >= len(partitions) {
 		return -1, InvalidPartition
@@ -72,7 +84,7 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
 		return err
 	}
 
-	request := &ProduceRequest{RequiredAcks: WAIT_FOR_LOCAL, Timeout: 0}
+	request := &ProduceRequest{RequiredAcks: p.config.RequiredAcks, Timeout: p.config.Timeout}
 	request.AddMessage(p.topic, partition, &Message{Key: keyBytes, Value: valBytes})
 
 	response, err := broker.Produce(p.client.id, request)

+ 8 - 2
producer_test.go

@@ -50,7 +50,10 @@ func TestSimpleProducer(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	producer := NewProducer(client, "myTopic", &RandomPartitioner{})
+	producer, err := NewProducer(client, "myTopic", ProducerConfig{RequiredAcks: WAIT_FOR_LOCAL})
+	if err != nil {
+		t.Fatal(err)
+	}
 	for i := 0; i < 10; i++ {
 		err = producer.SendMessage(nil, StringEncoder("ABC THE MESSAGE"))
 		if err != nil {
@@ -68,7 +71,10 @@ func ExampleProducer() {
 	} else {
 		fmt.Println("> connected")
 	}
-	producer := NewProducer(client, "myTopic", RandomPartitioner{})
+	producer, err := NewProducer(client, "myTopic", ProducerConfig{})
+	if err != nil {
+		panic(err)
+	}
 
 	err = producer.SendMessage(nil, StringEncoder("testing 123"))
 	if err != nil {