Browse Source

Merge pull request #94 from Shopify/export_validate_config

Export Validate method on config instances
Willem van Bergen 11 years ago
parent
commit
804430ed00
6 changed files with 156 additions and 105 deletions
  1. 28 11
      client.go
  2. 4 5
      client_test.go
  3. 40 25
      consumer.go
  4. 13 8
      consumer_test.go
  5. 37 19
      producer.go
  6. 34 37
      producer_test.go

+ 28 - 11
client.go

@@ -40,19 +40,11 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 	Logger.Println("Initializing new client")
 
 	if config == nil {
-		config = new(ClientConfig)
+		config = NewClientConfig()
 	}
 
-	if config.MetadataRetries <= 0 {
-		return nil, ConfigurationError("Invalid MetadataRetries. Try 10")
-	}
-
-	if config.WaitForElection <= time.Duration(0) {
-		return nil, ConfigurationError("Invalid WaitForElection. Try 250*time.Millisecond")
-	}
-
-	if config.ConcurrencyPerBroker < 0 {
-		return nil, ConfigurationError("Invalid ConcurrencyPerBroker")
+	if err := config.Validate(); err != nil {
+		return nil, err
 	}
 
 	if len(addrs) < 1 {
@@ -374,3 +366,28 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 	}
 	return ret, nil
 }
+
+// Creates a new ClientConfig instance with sensible defaults
+func NewClientConfig() *ClientConfig {
+	return &ClientConfig{
+		MetadataRetries: 3,
+		WaitForElection: 250 * time.Millisecond,
+	}
+}
+
+// Validates a ClientConfig instance. This will return a
+// ConfigurationError if the specified value doesn't make sense..
+func (config *ClientConfig) Validate() error {
+	if config.MetadataRetries <= 0 {
+		return ConfigurationError("Invalid MetadataRetries. Try 10")
+	}
+
+	if config.WaitForElection <= time.Duration(0) {
+		return ConfigurationError("Invalid WaitForElection. Try 250*time.Millisecond")
+	}
+
+	if config.ConcurrencyPerBroker < 0 {
+		return ConfigurationError("Invalid ConcurrencyPerBroker")
+	}
+	return nil
+}

+ 4 - 5
client_test.go

@@ -2,7 +2,6 @@ package sarama
 
 import (
 	"testing"
-	"time"
 )
 
 func TestSimpleClient(t *testing.T) {
@@ -11,7 +10,7 @@ func TestSimpleClient(t *testing.T) {
 
 	mb.Returns(new(MetadataResponse))
 
-	client, err := NewClient("client_id", []string{mb.Addr()}, &ClientConfig{MetadataRetries: 10, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -28,7 +27,7 @@ func TestClientExtraBrokers(t *testing.T) {
 	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
 	mb1.Returns(mdr)
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 10, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -47,7 +46,7 @@ func TestClientMetadata(t *testing.T) {
 	mdr.AddTopicPartition("my_topic", 0, int32(mb5.BrokerID()))
 	mb1.Returns(mdr)
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 10, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -89,7 +88,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	mdr2.AddTopicPartition("my_topic", 0xb, int32(mb5.BrokerID()))
 	mb5.Returns(mdr2)
 
-	client, err := NewClient("clientID", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("clientID", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 40 - 25
consumer.go

@@ -72,33 +72,11 @@ type Consumer struct {
 // part of the named consumer group.
 func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error) {
 	if config == nil {
-		config = new(ConsumerConfig)
+		config = NewConsumerConfig()
 	}
 
-	if config.DefaultFetchSize < 0 {
-		return nil, ConfigurationError("Invalid DefaultFetchSize")
-	} else if config.DefaultFetchSize == 0 {
-		config.DefaultFetchSize = 1024
-	}
-
-	if config.MinFetchSize < 0 {
-		return nil, ConfigurationError("Invalid MinFetchSize")
-	} else if config.MinFetchSize == 0 {
-		config.MinFetchSize = 1
-	}
-
-	if config.MaxMessageSize < 0 {
-		return nil, ConfigurationError("Invalid MaxMessageSize")
-	}
-
-	if config.MaxWaitTime <= 0 {
-		return nil, ConfigurationError("Invalid MaxWaitTime")
-	} else if config.MaxWaitTime < 100 {
-		Logger.Println("ConsumerConfig.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
-	}
-
-	if config.EventBufferSize < 0 {
-		return nil, ConfigurationError("Invalid EventBufferSize")
+	if err := config.Validate(); err != nil {
+		return nil, err
 	}
 
 	if topic == "" {
@@ -341,3 +319,40 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
 
 	return -1, block.Err
 }
+
+// Creates a ConsumerConfig instance with sane defaults.
+func NewConsumerConfig() *ConsumerConfig {
+	return &ConsumerConfig{
+		DefaultFetchSize: 1024,
+		MinFetchSize:     1,
+		MaxWaitTime:      250,
+	}
+}
+
+// Validates a ConsumerConfig instance. It will return a
+// ConfigurationError if the specified value doesn't make sense.
+func (config *ConsumerConfig) Validate() error {
+	if config.DefaultFetchSize <= 0 {
+		return ConfigurationError("Invalid DefaultFetchSize")
+	}
+
+	if config.MinFetchSize <= 0 {
+		return ConfigurationError("Invalid MinFetchSize")
+	}
+
+	if config.MaxMessageSize < 0 {
+		return ConfigurationError("Invalid MaxMessageSize")
+	}
+
+	if config.MaxWaitTime <= 0 {
+		return ConfigurationError("Invalid MaxWaitTime")
+	} else if config.MaxWaitTime < 100 {
+		Logger.Println("ConsumerConfig.MaxWaitTime is very low, which can cause high CPU and network usage. See documentation for details.")
+	}
+
+	if config.EventBufferSize < 0 {
+		return ConfigurationError("Invalid EventBufferSize")
+	}
+
+	return nil
+}

+ 13 - 8
consumer_test.go

@@ -21,14 +21,14 @@ func TestSimpleConsumer(t *testing.T) {
 		mb2.Returns(fr)
 	}
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 
 	if err != nil {
 		t.Fatal(err)
 	}
 	defer client.Close()
 
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", &ConsumerConfig{MaxWaitTime: 100})
+	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -58,13 +58,16 @@ func TestConsumerRawOffset(t *testing.T) {
 	mdr.AddTopicPartition("my_topic", 0, 2)
 	mb1.Returns(mdr)
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 	defer client.Close()
 
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", &ConsumerConfig{OffsetMethod: OffsetMethodManual, OffsetValue: 1234, MaxWaitTime: 100})
+	config := NewConsumerConfig()
+	config.OffsetMethod = OffsetMethodManual
+	config.OffsetValue = 1234
+	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -92,13 +95,15 @@ func TestConsumerLatestOffset(t *testing.T) {
 	or.AddTopicPartition("my_topic", 0, 0x010101)
 	mb2.Returns(or)
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 	defer client.Close()
 
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", &ConsumerConfig{OffsetMethod: OffsetMethodNewest, MaxWaitTime: 100})
+	config := NewConsumerConfig()
+	config.OffsetMethod = OffsetMethodNewest
+	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -113,7 +118,7 @@ func TestConsumerLatestOffset(t *testing.T) {
 }
 
 func ExampleConsumer() {
-	client, err := NewClient("my_client", []string{"localhost:9092"}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
 	if err != nil {
 		panic(err)
 	} else {
@@ -121,7 +126,7 @@ func ExampleConsumer() {
 	}
 	defer client.Close()
 
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", &ConsumerConfig{MaxWaitTime: 200})
+	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", NewConsumerConfig())
 	if err != nil {
 		panic(err)
 	} else {

+ 37 - 19
producer.go

@@ -70,27 +70,11 @@ type topicPartition struct {
 // NewProducer creates a new Producer using the given client.
 func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 	if config == nil {
-		config = new(ProducerConfig)
+		config = NewProducerConfig()
 	}
 
-	if config.RequiredAcks < -1 {
-		return nil, ConfigurationError("Invalid RequiredAcks")
-	}
-
-	if config.Timeout < 0 {
-		return nil, ConfigurationError("Invalid Timeout")
-	}
-
-	if config.Partitioner == nil {
-		config.Partitioner = NewRandomPartitioner()
-	}
-
-	if config.MaxBufferedBytes == 0 {
-		return nil, ConfigurationError("Invalid MaxBufferedBytes")
-	}
-
-	if config.MaxBufferTime == 0 {
-		return nil, ConfigurationError("Invalid MaxBufferTime")
+	if err := config.Validate(); err != nil {
+		return nil, err
 	}
 
 	return &Producer{
@@ -463,3 +447,37 @@ func (p *Producer) choosePartition(topic string, key Encoder) (int32, error) {
 
 	return partitions[choice], nil
 }
+
+// Creates a new ProducerConfig instance with sensible defaults.
+func NewProducerConfig() *ProducerConfig {
+	return &ProducerConfig{
+		Partitioner:  NewRandomPartitioner(),
+		RequiredAcks: WaitForLocal,
+	}
+}
+
+// Validates a ProducerConfig instance. It will return a
+// ConfigurationError if the specified value doesn't make sense.
+func (config *ProducerConfig) Validate() error {
+	if config.RequiredAcks < -1 {
+		return ConfigurationError("Invalid RequiredAcks")
+	}
+
+	if config.Timeout < 0 {
+		return ConfigurationError("Invalid Timeout")
+	}
+
+	if config.MaxBufferedBytes == 0 {
+		return ConfigurationError("Invalid MaxBufferedBytes")
+	}
+
+	if config.MaxBufferTime == 0 {
+		return ConfigurationError("Invalid MaxBufferTime")
+	}
+
+	if config.Partitioner == nil {
+		return ConfigurationError("No partitioner set")
+	}
+
+	return nil
+}

+ 34 - 37
producer_test.go

@@ -8,6 +8,13 @@ import (
 
 const TestMessage = "ABC THE MESSAGE"
 
+func defaultProducerConfig() *ProducerConfig {
+	config := NewProducerConfig()
+	config.MaxBufferTime = 1000000                                // don't flush based on time
+	config.MaxBufferedBytes = uint32((len(TestMessage) * 10) - 1) // flush after 10 messages
+	return config
+}
+
 func TestSimpleProducer(t *testing.T) {
 
 	mb1 := NewMockBroker(t, 1)
@@ -24,17 +31,15 @@ func TestSimpleProducer(t *testing.T) {
 	pr.AddTopicPartition("my_topic", 0, NoError)
 	mb2.Returns(pr)
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	producer, err := NewProducer(client, &ProducerConfig{
-		RequiredAcks:  WaitForLocal,
-		MaxBufferTime: 1000000, // "never"
-		// So that we flush once, after the 10th message.
-		MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
-	})
+	producer, err := NewProducer(client, defaultProducerConfig())
+	if err != nil {
+		t.Fatal(err)
+	}
 	defer producer.Close()
 
 	// flush only on 10th and final message
@@ -63,17 +68,15 @@ func TestSimpleSyncProducer(t *testing.T) {
 		mb2.Returns(pr)
 	}
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	producer, err := NewProducer(client, &ProducerConfig{
-		RequiredAcks:  WaitForLocal,
-		MaxBufferTime: 1000000, // "never"
-		// So that we flush once, after the 10th message.
-		MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
-	})
+	producer, err := NewProducer(client, defaultProducerConfig())
+	if err != nil {
+		t.Fatal(err)
+	}
 	defer producer.Close()
 
 	for i := 0; i < 10; i++ {
@@ -99,17 +102,18 @@ func TestMultipleFlushes(t *testing.T) {
 	mb2.Returns(pr)
 	mb2.Returns(pr) // yes, twice.
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	producer, err := NewProducer(client, &ProducerConfig{
-		RequiredAcks:  WaitForLocal,
-		MaxBufferTime: 1000000, // "never"
-		// So that we flush once, after the 5th message.
-		MaxBufferedBytes: uint32((len(TestMessage) * 5) - 1),
-	})
+	config := defaultProducerConfig()
+	// So that we flush after the 2nd message.
+	config.MaxBufferedBytes = uint32((len(TestMessage) * 5) - 1)
+	producer, err := NewProducer(client, config)
+	if err != nil {
+		t.Fatal(err)
+	}
 	defer producer.Close()
 
 	returns := []int{0, 0, 0, 0, 1, 0, 0, 0, 0, 1}
@@ -144,17 +148,15 @@ func TestMultipleProducer(t *testing.T) {
 	pr2.AddTopicPartition("topic_c", 0, NoError)
 	mb3.Returns(pr2)
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	producer, err := NewProducer(client, &ProducerConfig{
-		RequiredAcks:  WaitForLocal,
-		MaxBufferTime: 1000000, // "never"
-		// So that we flush once, after the 10th message.
-		MaxBufferedBytes: uint32((len(TestMessage) * 10) - 1),
-	})
+	producer, err := NewProducer(client, defaultProducerConfig())
+	if err != nil {
+		t.Fatal(err)
+	}
 	defer producer.Close()
 
 	// flush only on 10th and final message
@@ -233,18 +235,13 @@ func TestFailureRetry(t *testing.T) {
 	/* 	AddTopicPartition("topic_c", 0, 1, NoError). */
 	/* 	AddTopicPartition("topic_b", 0, 1, NoError) */
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 	defer client.Close()
 
-	producer, err := NewProducer(client, &ProducerConfig{
-		RequiredAcks:  WaitForLocal,
-		MaxBufferTime: 1000000, // "never"
-		// So that we flush after the 2nd message.
-		MaxBufferedBytes: uint32((len(TestMessage) * 2) - 1),
-	})
+	producer, err := NewProducer(client, defaultProducerConfig())
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -305,7 +302,7 @@ func assertNoMessages(t *testing.T, ch chan error) {
 }
 
 func ExampleProducer() {
-	client, err := NewClient("client_id", []string{"localhost:9092"}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
 	if err != nil {
 		panic(err)
 	} else {
@@ -313,7 +310,7 @@ func ExampleProducer() {
 	}
 	defer client.Close()
 
-	producer, err := NewProducer(client, &ProducerConfig{RequiredAcks: WaitForLocal, MaxBufferedBytes: 1024, MaxBufferTime: 1000})
+	producer, err := NewProducer(client, nil)
 	if err != nil {
 		panic(err)
 	}