Browse Source

Take config pointers, and treat nil as all-defaults.

Evan Huus 11 years ago
parent
commit
fd19279439
6 changed files with 30 additions and 18 deletions
  1. 6 2
      client.go
  2. 4 4
      client_test.go
  3. 6 2
      consumer.go
  4. 4 4
      consumer_test.go
  5. 6 2
      producer.go
  6. 4 4
      producer_test.go

+ 6 - 2
client.go

@@ -27,7 +27,11 @@ type Client struct {
 // NewClient creates a new Client with the given client ID. It connects to the broker at the given
 // host:port address, and uses that broker to automatically fetch metadata on the rest of the kafka cluster.
 // If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
-func NewClient(id string, host string, port int32, config ClientConfig) (client *Client, err error) {
+func NewClient(id string, host string, port int32, config *ClientConfig) (client *Client, err error) {
+	if config == nil {
+		config = new(ClientConfig)
+	}
+
 	if config.MetadataRetries < 0 {
 		return nil, ConfigurationError("Invalid MetadataRetries")
 	}
@@ -40,7 +44,7 @@ func NewClient(id string, host string, port int32, config ClientConfig) (client
 
 	client = new(Client)
 	client.id = id
-	client.config = config
+	client.config = *config
 
 	client.brokers = make(map[int32]*Broker)
 	client.leaders = make(map[string]map[int32]int32)

+ 4 - 4
client_test.go

@@ -13,7 +13,7 @@ func TestSimpleClient(t *testing.T) {
 	// Only one response needed, an empty metadata response
 	responses <- []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port(), ClientConfig{})
+	client, err := NewClient("clientID", "localhost", mockBroker.Port(), nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -37,7 +37,7 @@ func TestClientExtraBrokers(t *testing.T) {
 	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
 	responses <- response
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port(), ClientConfig{})
+	client, err := NewClient("clientID", "localhost", mockBroker.Port(), nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -70,7 +70,7 @@ func TestClientMetadata(t *testing.T) {
 	binary.BigEndian.PutUint32(response[19:], uint32(mockExtra.Port()))
 	responses <- response
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port(), ClientConfig{})
+	client, err := NewClient("clientID", "localhost", mockBroker.Port(), nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -131,7 +131,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 		0x00, 0x00, 0x00, 0x00,
 		0x00, 0x00, 0x00, 0x00}
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port(), ClientConfig{MetadataRetries: 1})
+	client, err := NewClient("clientID", "localhost", mockBroker.Port(), &ClientConfig{MetadataRetries: 1})
 	if err != nil {
 		t.Fatal(err)
 	}

+ 6 - 2
consumer.go

@@ -35,7 +35,11 @@ type Consumer struct {
 
 // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
 // part of the named consumer group.
-func NewConsumer(client *Client, topic string, partition int32, group string, config ConsumerConfig) (*Consumer, error) {
+func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error) {
+	if config == nil {
+		config = new(ConsumerConfig)
+	}
+
 	if config.DefaultFetchSize < 0 {
 		return nil, ConfigurationError("Invalid DefaultFetchSize")
 	} else if config.DefaultFetchSize == 0 {
@@ -66,7 +70,7 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 	c.topic = topic
 	c.partition = partition
 	c.group = group
-	c.config = config
+	c.config = *config
 
 	// We should really be sending an OffsetFetchRequest, but that doesn't seem to
 	// work in kafka yet. Hopefully will in beta 2...

+ 4 - 4
consumer_test.go

@@ -64,12 +64,12 @@ func TestSimpleConsumer(t *testing.T) {
 			0x00, 0x00, 0x00, 0x00}
 	}()
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port(), ClientConfig{})
+	client, err := NewClient("clientID", "localhost", mockBroker.Port(), nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup", ConsumerConfig{})
+	consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup", nil)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -90,14 +90,14 @@ func TestSimpleConsumer(t *testing.T) {
 }
 
 func ExampleConsumer() {
-	client, err := NewClient("myClient", "localhost", 9092, ClientConfig{})
+	client, err := NewClient("myClient", "localhost", 9092, nil)
 	if err != nil {
 		panic(err)
 	} else {
 		fmt.Println("> connected")
 	}
 
-	consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup", ConsumerConfig{})
+	consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup", nil)
 	if err != nil {
 		panic(err)
 	} else {

+ 6 - 2
producer.go

@@ -17,7 +17,11 @@ type Producer struct {
 }
 
 // NewProducer creates a new Producer using the given client. The resulting producer will publish messages on the given topic.
-func NewProducer(client *Client, topic string, config ProducerConfig) (*Producer, error) {
+func NewProducer(client *Client, topic string, config *ProducerConfig) (*Producer, error) {
+	if config == nil {
+		config = new(ProducerConfig)
+	}
+
 	if config.RequiredAcks < -1 {
 		return nil, ConfigurationError("Invalid RequiredAcks")
 	}
@@ -33,7 +37,7 @@ func NewProducer(client *Client, topic string, config ProducerConfig) (*Producer
 	p := new(Producer)
 	p.client = client
 	p.topic = topic
-	p.config = config
+	p.config = *config
 
 	return p, nil
 }

+ 4 - 4
producer_test.go

@@ -45,12 +45,12 @@ func TestSimpleProducer(t *testing.T) {
 		}
 	}()
 
-	client, err := NewClient("clientID", "localhost", mockBroker.Port(), ClientConfig{})
+	client, err := NewClient("clientID", "localhost", mockBroker.Port(), nil)
 	if err != nil {
 		t.Fatal(err)
 	}
 
-	producer, err := NewProducer(client, "myTopic", ProducerConfig{RequiredAcks: WAIT_FOR_LOCAL})
+	producer, err := NewProducer(client, "myTopic", &ProducerConfig{RequiredAcks: WAIT_FOR_LOCAL})
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -65,13 +65,13 @@ func TestSimpleProducer(t *testing.T) {
 }
 
 func ExampleProducer() {
-	client, err := NewClient("myClient", "localhost", 9092, ClientConfig{})
+	client, err := NewClient("myClient", "localhost", 9092, nil)
 	if err != nil {
 		panic(err)
 	} else {
 		fmt.Println("> connected")
 	}
-	producer, err := NewProducer(client, "myTopic", ProducerConfig{RequiredAcks: WAIT_FOR_LOCAL})
+	producer, err := NewProducer(client, "myTopic", &ProducerConfig{RequiredAcks: WAIT_FOR_LOCAL})
 	if err != nil {
 		panic(err)
 	}