Bladeren bron

Close method consistency

 * add a no-op Close method for producer, so when we need it the API behaviour
   doesn't change (adding a method is safe, but requiring that method to be
   called to avoid leaks isn't).
 * make all Close methods return an error (always nil) so they implement the
   built-in Closer interface in go.
Evan Huus 12 jaren geleden
bovenliggende
commit
043cdae0b7
3 gewijzigde bestanden met toevoegingen van 15 en 4 verwijderingen
  1. 2 1
      client.go
  2. 2 1
      consumer.go
  3. 11 2
      producer.go

+ 2 - 1
client.go

@@ -78,7 +78,7 @@ func NewClient(id string, host string, port int32, config *ClientConfig) (client
 // Close shuts down all broker connections managed by this client. It is required to call this function before
 // a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
 // using a client before you close the client.
-func (client *Client) Close() {
+func (client *Client) Close() error {
 	client.lock.Lock()
 	defer client.lock.Unlock()
 
@@ -87,6 +87,7 @@ func (client *Client) Close() {
 	}
 	client.brokers = nil
 	client.leaders = nil
+	return nil
 }
 
 // functions for use by producers and consumers

+ 2 - 1
consumer.go

@@ -99,9 +99,10 @@ func (c *Consumer) Messages() <-chan *MessageBlock {
 // Close stops the consumer from fetching messages. It is required to call this function before
 // a consumer object passes out of scope, as it will otherwise leak memory. You must call this before
 // calling Close on the underlying client.
-func (c *Consumer) Close() {
+func (c *Consumer) Close() error {
 	close(c.stopper)
 	<-c.done
+	return nil
 }
 
 // helper function for safely sending an error on the errors channel

+ 11 - 2
producer.go

@@ -8,8 +8,8 @@ type ProducerConfig struct {
 }
 
 // Producer publishes Kafka messages on a given topic. It routes messages to the correct broker, refreshing metadata as appropriate,
-// and parses responses for errors. A Producer itself does not need to be closed (thus no Close method) but you still need to close
-// its underlying Client.
+// and parses responses for errors. You must call Close() on a producer to avoid leaks, it may not be garbage-collected automatically when
+// it passes out of scope (this is in addition to calling Close on the underlying client, which is still necessary).
 type Producer struct {
 	client *Client
 	topic  string
@@ -42,6 +42,15 @@ func NewProducer(client *Client, topic string, config *ProducerConfig) (*Produce
 	return p, nil
 }
 
+// Close shuts down the producer and flushes any messages it may have buffered. You must call this function before
+// a producer object passes out of scope, as it may otherwise leak memory. You must call this before calling Close
+// on the underlying client.
+func (p *Producer) Close() error {
+	// no-op for now, adding for consistency and so the API doesn't change when we add buffering
+	// (which will require a goroutine, which will require a close method in order to flush the buffer).
+	return nil
+}
+
 // SendMessage sends a message with the given key and value. The partition to send to is selected by the Producer's Partitioner.
 // To send strings as either key or value, see the StringEncoder type.
 func (p *Producer) SendMessage(key, value Encoder) error {