Browse Source

Address Evan's comments on the previous version. Add checks for Closed() to a few other locations including consumer.go.

Rob Rodgers 12 years ago
parent
commit
15782aa08d
4 changed files with 38 additions and 13 deletions
  1. 24 11
      client.go
  2. 6 0
      consumer.go
  3. 2 2
      errors.go
  4. 6 0
      producer.go

+ 24 - 11
client.go

@@ -21,9 +21,6 @@ type Client struct {
 	id     string
 	config ClientConfig
 
-	// True if this client was closed, for error tracking
-	closed bool
-
 	// the broker addresses given to us through the constructor are not guaranteed to be returned in
 	// the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
 	// so we store them separately
@@ -80,6 +77,14 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 // a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
 // using a client before you close the client.
 func (client *Client) Close() error {
+	// Check to see whether the client is closed
+	if client.Closed() {
+		// Chances are this is being called from a defer() and the error will go unobserved
+		// so we go ahead and log the event in this case.
+		Logger.Printf("Close() called on already closed client")
+		return ClosedClient
+	}
+
 	client.lock.Lock()
 	defer client.lock.Unlock()
 	Logger.Println("Closing Client")
@@ -90,7 +95,6 @@ func (client *Client) Close() error {
 	}
 	client.brokers = nil
 	client.leaders = nil
-	client.closed = true
 
 	if client.extraBroker != nil {
 		go withRecover(func() { client.extraBroker.Close() })
@@ -101,6 +105,11 @@ func (client *Client) Close() error {
 
 // Partitions returns the sorted list of available partition IDs for the given topic.
 func (client *Client) Partitions(topic string) ([]int32, error) {
+	// Check to see whether the client is closed
+	if client.Closed() {
+		return nil, ClosedClient
+	}
+
 	partitions := client.cachedPartitions(topic)
 
 	// len==0 catches when it's nil (no such topic) and the odd case when every single
@@ -126,6 +135,11 @@ func (client *Client) Partitions(topic string) ([]int32, error) {
 
 // Topics returns the set of available topics as retrieved from the cluster metadata.
 func (client *Client) Topics() ([]string, error) {
+	// Check to see whether the client is closed
+	if client.Closed() {
+		return nil, ClosedClient
+	}
+
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 
@@ -230,20 +244,19 @@ func (client *Client) disconnectBroker(broker *Broker) {
 	go withRecover(func() { myBroker.Close() })
 }
 
-func (client *Client) CheckUsable() error {
-	if client.closed {
-		return ClosedClient
+func (client *Client) Closed() bool {
+	if client.brokers == nil {
+		return true
 	}
-	return nil
+	return false
 }
 
-
 func (client *Client) refreshMetadata(topics []string, retries int) error {
 	// This function is a sort of central point for most functions that create new
 	// resources.  Check to see if we're dealing with a closed Client and error
 	// out immediately if so.
-	if err := client.CheckUsable(); err != nil {
-		return err
+	if client.Closed() {
+		return ClosedClient
 	}
 
 	// Kafka will throw exceptions on an empty topic and not return a proper

+ 6 - 0
consumer.go

@@ -75,6 +75,12 @@ type Consumer struct {
 // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
 // part of the named consumer group.
 func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error) {
+	// Check that we are not dealing with a closed Client before processing
+	// any other arguments
+	if client.Closed() {
+		return nil, ClosedClient
+	}
+
 	if config == nil {
 		config = NewConsumerConfig()
 	}

+ 2 - 2
errors.go

@@ -9,8 +9,8 @@ import (
 // or otherwise failed to respond.
 var OutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")
 
-// ClosedClient is the error returned when the client was previously closed but used subsequently.
-var ClosedClient = errors.New("kafka: Use of a client that was previously closed")
+// ClosedClient is the error returned when a method is called on a client that has been closed.
+var ClosedClient = errors.New("kafka: Tried to use a client that was closed.")
 
 // NoSuchTopic is the error returned when the supplied topic is rejected by the Kafka servers.
 var NoSuchTopic = errors.New("kafka: Topic not recognized by brokers.")

+ 6 - 0
producer.go

@@ -66,6 +66,12 @@ type topicPartition struct {
 
 // NewProducer creates a new Producer using the given client.
 func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
+	// Check that we are not dealing with a closed Client before processing
+	// any other arguments
+	if client.Closed() {
+		return nil, ClosedClient
+	}
+
 	if config == nil {
 		config = NewProducerConfig()
 	}