浏览代码

Merge pull request #12 from Shopify/producer_close

Closing Cleanups
Evan Huus 12 年之前
父节点
当前提交
da88bf8be7
共有 6 个文件被更改,包括 27 次插入18 次删除
  1. 2 1
      client.go
  2. 2 4
      client_test.go
  3. 2 1
      consumer.go
  4. 4 6
      consumer_test.go
  5. 11 2
      producer.go
  6. 6 4
      producer_test.go

+ 2 - 1
client.go

@@ -78,7 +78,7 @@ func NewClient(id string, host string, port int32, config *ClientConfig) (client
 // Close shuts down all broker connections managed by this client. It is required to call this function before
 // a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
 // using a client before you close the client.
-func (client *Client) Close() {
+func (client *Client) Close() error {
 	client.lock.Lock()
 	defer client.lock.Unlock()
 
@@ -87,6 +87,7 @@ func (client *Client) Close() {
 	}
 	client.brokers = nil
 	client.leaders = nil
+	return nil
 }
 
 // functions for use by producers and consumers

+ 2 - 4
client_test.go

@@ -74,6 +74,7 @@ func TestClientMetadata(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer client.Close()
 
 	parts, err := client.partitions("myTopic")
 	if err != nil {
@@ -88,8 +89,6 @@ func TestClientMetadata(t *testing.T) {
 	} else if tst.ID() != 5 {
 		t.Error("Leader for myTopic had incorrect ID.")
 	}
-
-	client.Close()
 }
 
 func TestClientRefreshBehaviour(t *testing.T) {
@@ -135,6 +134,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer client.Close()
 
 	parts, err := client.partitions("myTopic")
 	if err != nil {
@@ -151,6 +151,4 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	}
 
 	client.disconnectBroker(tst)
-
-	client.Close()
 }

+ 2 - 1
consumer.go

@@ -99,9 +99,10 @@ func (c *Consumer) Messages() <-chan *MessageBlock {
 // Close stops the consumer from fetching messages. It is required to call this function before
 // a consumer object passes out of scope, as it will otherwise leak memory. You must call this before
 // calling Close on the underlying client.
-func (c *Consumer) Close() {
+func (c *Consumer) Close() error {
 	close(c.stopper)
 	<-c.done
+	return nil
 }
 
 // helper function for safely sending an error on the errors channel

+ 4 - 6
consumer_test.go

@@ -68,11 +68,13 @@ func TestSimpleConsumer(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer client.Close()
 
 	consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup", nil)
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer consumer.Close()
 
 	for i := 0; i < 10; i++ {
 		select {
@@ -84,9 +86,6 @@ func TestSimpleConsumer(t *testing.T) {
 			t.Error(err)
 		}
 	}
-
-	consumer.Close()
-	client.Close()
 }
 
 func ExampleConsumer() {
@@ -96,6 +95,7 @@ func ExampleConsumer() {
 	} else {
 		fmt.Println("> connected")
 	}
+	defer client.Close()
 
 	consumer, err := NewConsumer(client, "myTopic", 0, "myConsumerGroup", nil)
 	if err != nil {
@@ -103,6 +103,7 @@ func ExampleConsumer() {
 	} else {
 		fmt.Println("> consumer ready")
 	}
+	defer consumer.Close()
 
 consumerLoop:
 	for {
@@ -116,7 +117,4 @@ consumerLoop:
 			break consumerLoop
 		}
 	}
-
-	consumer.Close()
-	client.Close()
 }

+ 11 - 2
producer.go

@@ -8,8 +8,8 @@ type ProducerConfig struct {
 }
 
 // Producer publishes Kafka messages on a given topic. It routes messages to the correct broker, refreshing metadata as appropriate,
-// and parses responses for errors. A Producer itself does not need to be closed (thus no Close method) but you still need to close
-// its underlying Client.
+// and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
+// it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
 type Producer struct {
 	client *Client
 	topic  string
@@ -42,6 +42,15 @@ func NewProducer(client *Client, topic string, config *ProducerConfig) (*Produce
 	return p, nil
 }
 
+// Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
+// a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
+// on the underlying client.
+func (p *Producer) Close() error {
+	// no-op for now, adding for consistency and so the API doesn't change when we add buffering
+	// (which will require a goroutine, which will require a close method in order to flush the buffer).
+	return nil
+}
+
 // SendMessage sends a message with the given key and value. The partition to send to is selected by the Producer's Partitioner.
 // To send strings as either key or value, see the StringEncoder type.
 func (p *Producer) SendMessage(key, value Encoder) error {

+ 6 - 4
producer_test.go

@@ -49,19 +49,20 @@ func TestSimpleProducer(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer client.Close()
 
 	producer, err := NewProducer(client, "myTopic", &ProducerConfig{RequiredAcks: WAIT_FOR_LOCAL})
 	if err != nil {
 		t.Fatal(err)
 	}
+	defer producer.Close()
+
 	for i := 0; i < 10; i++ {
 		err = producer.SendMessage(nil, StringEncoder("ABC THE MESSAGE"))
 		if err != nil {
 			t.Error(err)
 		}
 	}
-
-	client.Close()
 }
 
 func ExampleProducer() {
@@ -71,10 +72,13 @@ func ExampleProducer() {
 	} else {
 		fmt.Println("> connected")
 	}
+	defer client.Close()
+
 	producer, err := NewProducer(client, "myTopic", &ProducerConfig{RequiredAcks: WAIT_FOR_LOCAL})
 	if err != nil {
 		panic(err)
 	}
+	defer producer.Close()
 
 	err = producer.SendMessage(nil, StringEncoder("testing 123"))
 	if err != nil {
@@ -82,6 +86,4 @@ func ExampleProducer() {
 	} else {
 		fmt.Println("> message sent")
 	}
-
-	client.Close()
 }