Browse Source

Add NewClientConfig

Willem van Bergen 11 years ago
parent
commit
6910a5531c
4 changed files with 27 additions and 21 deletions
  1. 12 5
      client.go
  2. 4 5
      client_test.go
  3. 5 5
      consumer_test.go
  4. 6 6
      producer_test.go

+ 12 - 5
client.go

@@ -40,7 +40,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 	Logger.Println("Initializing new client")
 
 	if config == nil {
-		config = new(ClientConfig)
+		config = NewClientConfig()
 	}
 
 	if err := config.Validate(); err != nil {
@@ -367,10 +367,17 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 	return ret, nil
 }
 
-// Validates a ClientConfig instance. This will change zero
-// values into sensible defaults if possible, and it will return a
-// ConfigurationError if the specified value doesn't make sense and
-// cannot be corrected.
+// Creates a new ClientConfig instance with sensible defaults
+func NewClientConfig() *ClientConfig {
+	return &ClientConfig{
+		MetadataRetries:      3,
+		WaitForElection:      250 * time.Millisecond,
+		ConcurrencyPerBroker: 0,
+	}
+}
+
+// Validates a ClientConfig instance. This will return a
+// ConfigurationError if the specified value doesn't make sense..
 func (config *ClientConfig) Validate() error {
 	if config.MetadataRetries <= 0 {
 		return ConfigurationError("Invalid MetadataRetries. Try 10")

+ 4 - 5
client_test.go

@@ -2,7 +2,6 @@ package sarama
 
 import (
 	"testing"
-	"time"
 )
 
 func TestSimpleClient(t *testing.T) {
@@ -11,7 +10,7 @@ func TestSimpleClient(t *testing.T) {
 
 	mb.Returns(new(MetadataResponse))
 
-	client, err := NewClient("client_id", []string{mb.Addr()}, &ClientConfig{MetadataRetries: 10, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -28,7 +27,7 @@ func TestClientExtraBrokers(t *testing.T) {
 	mdr.AddBroker(mb2.Addr(), int32(mb2.BrokerID()))
 	mb1.Returns(mdr)
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 10, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -47,7 +46,7 @@ func TestClientMetadata(t *testing.T) {
 	mdr.AddTopicPartition("my_topic", 0, int32(mb5.BrokerID()))
 	mb1.Returns(mdr)
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 10, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -89,7 +88,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	mdr2.AddTopicPartition("my_topic", 0xb, int32(mb5.BrokerID()))
 	mb5.Returns(mdr2)
 
-	client, err := NewClient("clientID", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("clientID", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}

+ 5 - 5
consumer_test.go

@@ -21,7 +21,7 @@ func TestSimpleConsumer(t *testing.T) {
 		mb2.Returns(fr)
 	}
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 
 	if err != nil {
 		t.Fatal(err)
@@ -58,7 +58,7 @@ func TestConsumerRawOffset(t *testing.T) {
 	mdr.AddTopicPartition("my_topic", 0, 2)
 	mb1.Returns(mdr)
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -95,7 +95,7 @@ func TestConsumerLatestOffset(t *testing.T) {
 	or.AddTopicPartition("my_topic", 0, 0x010101)
 	mb2.Returns(or)
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -118,7 +118,7 @@ func TestConsumerLatestOffset(t *testing.T) {
 }
 
 func ExampleConsumer() {
-	client, err := NewClient("my_client", []string{"localhost:9092"}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("my_client", []string{"localhost:9092"}, nil)
 	if err != nil {
 		panic(err)
 	} else {
@@ -126,7 +126,7 @@ func ExampleConsumer() {
 	}
 	defer client.Close()
 
-	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", &ConsumerConfig{MaxWaitTime: 200})
+	consumer, err := NewConsumer(client, "my_topic", 0, "my_consumer_group", NewConsumerConfig())
 	if err != nil {
 		panic(err)
 	} else {

+ 6 - 6
producer_test.go

@@ -31,7 +31,7 @@ func TestSimpleProducer(t *testing.T) {
 	pr.AddTopicPartition("my_topic", 0, NoError)
 	mb2.Returns(pr)
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -68,7 +68,7 @@ func TestSimpleSyncProducer(t *testing.T) {
 		mb2.Returns(pr)
 	}
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -102,7 +102,7 @@ func TestMultipleFlushes(t *testing.T) {
 	mb2.Returns(pr)
 	mb2.Returns(pr) // yes, twice.
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -148,7 +148,7 @@ func TestMultipleProducer(t *testing.T) {
 	pr2.AddTopicPartition("topic_c", 0, NoError)
 	mb3.Returns(pr2)
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -235,7 +235,7 @@ func TestFailureRetry(t *testing.T) {
 	/* 	AddTopicPartition("topic_c", 0, 1, NoError). */
 	/* 	AddTopicPartition("topic_b", 0, 1, NoError) */
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -302,7 +302,7 @@ func assertNoMessages(t *testing.T, ch chan error) {
 }
 
 func ExampleProducer() {
-	client, err := NewClient("client_id", []string{"localhost:9092"}, &ClientConfig{MetadataRetries: 1, WaitForElection: 250 * time.Millisecond})
+	client, err := NewClient("client_id", []string{"localhost:9092"}, NewClientConfig())
 	if err != nil {
 		panic(err)
 	} else {