Browse Source

Add NewProducerConfig

Willem van Bergen 11 years ago
parent
commit
468aaaffb7
2 changed files with 43 additions and 40 deletions
  1. 15 9
      producer.go
  2. 28 31
      producer_test.go

+ 15 - 9
producer.go

@@ -70,7 +70,7 @@ type topicPartition struct {
 // NewProducer creates a new Producer using the given client.
 func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 	if config == nil {
-		config = new(ProducerConfig)
+		config = NewProducerConfig()
 	}
 
 	if err := config.Validate(); err != nil {
@@ -448,10 +448,16 @@ func (p *Producer) choosePartition(topic string, key Encoder) (int32, error) {
 	return partitions[choice], nil
 }
 
-// Validates a ProducerConfig instance. This will change zero
-// values into sensible defaults if possible, and it will return a
-// ConfigurationError if the specified value doesn't make sense and
-// cannot be corrected.
+// Creates a new ProducerConfig instance with sensible defaults.
+func NewProducerConfig() *ProducerConfig {
+	return &ProducerConfig{
+		Partitioner:  NewRandomPartitioner(),
+		RequiredAcks: WaitForLocal,
+	}
+}
+
+// Validates a ProducerConfig instance. t will return a
+// ConfigurationError if the specified value doesn't make sense.
 func (config *ProducerConfig) Validate() error {
 	if config.RequiredAcks < -1 {
 		return ConfigurationError("Invalid RequiredAcks")
@@ -461,10 +467,6 @@ func (config *ProducerConfig) Validate() error {
 		return ConfigurationError("Invalid Timeout")
 	}
 
-	if config.Partitioner == nil {
-		config.Partitioner = NewRandomPartitioner()
-	}
-
 	if config.MaxBufferedBytes == 0 {
 		return ConfigurationError("Invalid MaxBufferedBytes")
 	}
@@ -473,5 +475,9 @@ func (config *ProducerConfig) Validate() error {
 		return ConfigurationError("Invalid MaxBufferTime")
 	}
 
+	if config.Partitioner == nil {
+		return ConfigurationError("No partitioner set")
+	}
+
 	return nil
 }

+ 28 - 31
producer_test.go

@@ -8,6 +8,13 @@ import (
 
 const TestMessage = "ABC THE MESSAGE"
 
+func defaultProducerConfig() *ProducerConfig {
+	config := NewProducerConfig()
+	config.MaxBufferTime = 1000000                                // don't flush based on time
+	config.MaxBufferedBytes = uint32((len(TestMessage) * 10) - 1) // flush after 10 messages
+	return config
+}
+
 func TestSimpleProducer(t *testing.T) {
 
 	mb1 := NewMockBroker(t, 1)
@@ -29,12 +36,10 @@ func TestSimpleProducer(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	producer, err := NewProducer(client, &ProducerConfig{
-		RequiredAcks:  WaitForLocal,
-		MaxBufferTime: 1000000, // "never"
-		// So that we flush once, after the 10th message.
-		MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
-	})
+	producer, err := NewProducer(client, defaultProducerConfig())
+	if err != nil {
+		t.Fatal(err)
+	}
 	defer producer.Close()
 
 	// flush only on 10th and final message
@@ -68,12 +73,10 @@ func TestSimpleSyncProducer(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	producer, err := NewProducer(client, &ProducerConfig{
-		RequiredAcks:  WaitForLocal,
-		MaxBufferTime: 1000000, // "never"
-		// So that we flush once, after the 10th message.
-		MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
-	})
+	producer, err := NewProducer(client, defaultProducerConfig())
+	if err != nil {
+		t.Fatal(err)
+	}
 	defer producer.Close()
 
 	for i := 0; i < 10; i++ {
@@ -104,12 +107,13 @@ func TestMultipleFlushes(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	producer, err := NewProducer(client, &ProducerConfig{
-		RequiredAcks:  WaitForLocal,
-		MaxBufferTime: 1000000, // "never"
-		// So that we flush once, after the 5th message.
-		MaxBufferedBytes: uint32((len(TestMessage) * 5) - 1),
-	})
+	config := defaultProducerConfig()
+	// So that we flush after the 2nd message.
+	config.MaxBufferedBytes = uint32((len(TestMessage) * 5) - 1)
+	producer, err := NewProducer(client, config)
+	if err != nil {
+		t.Fatal(err)
+	}
 	defer producer.Close()
 
 	returns := []int{0, 0, 0, 0, 1, 0, 0, 0, 0, 1}
@@ -149,12 +153,10 @@ func TestMultipleProducer(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	producer, err := NewProducer(client, &ProducerConfig{
-		RequiredAcks:  WaitForLocal,
-		MaxBufferTime: 1000000, // "never"
-		// So that we flush once, after the 10th message.
-		MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
-	})
+	producer, err := NewProducer(client, defaultProducerConfig())
+	if err != nil {
+		t.Fatal(err)
+	}
 	defer producer.Close()
 
 	// flush only on 10th and final message
@@ -239,12 +241,7 @@ func TestFailureRetry(t *testing.T) {
 	}
 	defer client.Close()
 
-	producer, err := NewProducer(client, &ProducerConfig{
-		RequiredAcks:  WaitForLocal,
-		MaxBufferTime: 1000000, // "never"
-		// So that we flush after the 2nd message.
-		MaxBufferedBytes: uint32((len(TestMessage) * 2) - 1),
-	})
+	producer, err := NewProducer(client, defaultProducerConfig())
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -313,7 +310,7 @@ func ExampleProducer() {
 	}
 	defer client.Close()
 
-	producer, err := NewProducer(client, &ProducerConfig{RequiredAcks: WaitForLocal, MaxBufferedBytes: 1024, MaxBufferTime: 1000})
+	producer, err := NewProducer(client, nil)
 	if err != nil {
 		panic(err)
 	}