浏览代码

Add NewConsumerConfig()

Willem van Bergen 11 年之前
父节点
当前提交
8c789b7d01
共有 2 个文件被更改,包括 19 次插入10 次删除
  1. 11 7
      consumer.go
  2. 8 3
      consumer_test.go

+ 11 - 7
consumer.go

@@ -72,7 +72,7 @@ type Consumer struct {
 // part of the named consumer group.
 func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error) {
 	if config == nil {
-		config = new(ConsumerConfig)
+		config = NewConsumerConfig()
 	}
 
 	if err := config.Validate(); err != nil {
@@ -320,21 +320,25 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
 	return -1, block.Err
 }
 
+func NewConsumerConfig() *ConsumerConfig {
+	return &ConsumerConfig{
+		DefaultFetchSize: 1024,
+		MinFetchSize:     1,
+		MaxWaitTime:      250,
+	}
+}
+
 // Validates a ConsumerConfig instance. This will change zero
 // values into sensible defaults if possible, and it will return a
 // ConfigurationError if the specified value doesn't make sense and
 // cannot be corrected.
 func (config *ConsumerConfig) Validate() error {
-	if config.DefaultFetchSize < 0 {
+	if config.DefaultFetchSize <= 0 {
 		return ConfigurationError("Invalid DefaultFetchSize")
-	} else if config.DefaultFetchSize == 0 {
-		config.DefaultFetchSize = 1024
 	}
 
-	if config.MinFetchSize < 0 {
+	if config.MinFetchSize <= 0 {
 		return ConfigurationError("Invalid MinFetchSize")
-	} else if config.MinFetchSize == 0 {
-		config.MinFetchSize = 1
 	}
 
 	if config.MaxMessageSize < 0 {

+ 8 - 3
consumer_test.go

@@ -28,7 +28,7 @@ func TestSimpleConsumer(t *testing.T) {
 	}
 	defer client.Close()
 
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", &ConsumerConfig{MaxWaitTime: 100})
+	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -64,7 +64,10 @@ func TestConsumerRawOffset(t *testing.T) {
 	}
 	defer client.Close()
 
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", &ConsumerConfig{OffsetMethod: OffsetMethodManual, OffsetValue: 1234, MaxWaitTime: 100})
+	config := NewConsumerConfig()
+	config.OffsetMethod = OffsetMethodManual
+	config.OffsetValue = 1234
+	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", config)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -98,7 +101,9 @@ func TestConsumerLatestOffset(t *testing.T) {
 	}
 	defer client.Close()
 
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", &ConsumerConfig{OffsetMethod: OffsetMethodNewest, MaxWaitTime: 100})
+	config := NewConsumerConfig()
+	config.OffsetMethod = OffsetMethodNewest
+	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", config)
 	if err != nil {
 		t.Fatal(err)
 	}