Browse Source

Merge pull request #117 from Shopify/rsrsps

Rebased version of #54 (closed checks)
Willem van Bergen 11 years ago
parent
commit
7a259fb770
4 changed files with 44 additions and 0 deletions
  1. 29 0
      client.go
  2. 6 0
      consumer.go
  3. 3 0
      errors.go
  4. 6 0
      producer.go

+ 29 - 0
client.go

@@ -77,6 +77,14 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 // a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
 // a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
 // using a client before you close the client.
 // using a client before you close the client.
 func (client *Client) Close() error {
 func (client *Client) Close() error {
+	// Check to see whether the client is closed
+	if client.Closed() {
+		// Chances are this is being called from a defer() and the error will go unobserved
+		// so we go ahead and log the event in this case.
+		Logger.Printf("Close() called on already closed client")
+		return ClosedClient
+	}
+
 	client.lock.Lock()
 	client.lock.Lock()
 	defer client.lock.Unlock()
 	defer client.lock.Unlock()
 	Logger.Println("Closing Client")
 	Logger.Println("Closing Client")
@@ -97,6 +105,11 @@ func (client *Client) Close() error {
 
 
 // Partitions returns the sorted list of available partition IDs for the given topic.
 // Partitions returns the sorted list of available partition IDs for the given topic.
 func (client *Client) Partitions(topic string) ([]int32, error) {
 func (client *Client) Partitions(topic string) ([]int32, error) {
+	// Check to see whether the client is closed
+	if client.Closed() {
+		return nil, ClosedClient
+	}
+
 	partitions := client.cachedPartitions(topic)
 	partitions := client.cachedPartitions(topic)
 
 
 	// len==0 catches when it's nil (no such topic) and the odd case when every single
 	// len==0 catches when it's nil (no such topic) and the odd case when every single
@@ -122,6 +135,11 @@ func (client *Client) Partitions(topic string) ([]int32, error) {
 
 
 // Topics returns the set of available topics as retrieved from the cluster metadata.
 // Topics returns the set of available topics as retrieved from the cluster metadata.
 func (client *Client) Topics() ([]string, error) {
 func (client *Client) Topics() ([]string, error) {
+	// Check to see whether the client is closed
+	if client.Closed() {
+		return nil, ClosedClient
+	}
+
 	client.lock.RLock()
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 	defer client.lock.RUnlock()
 
 
@@ -226,7 +244,18 @@ func (client *Client) disconnectBroker(broker *Broker) {
 	go withRecover(func() { myBroker.Close() })
 	go withRecover(func() { myBroker.Close() })
 }
 }
 
 
+func (client *Client) Closed() bool {
+	return client.brokers == nil
+}
+
 func (client *Client) refreshMetadata(topics []string, retries int) error {
 func (client *Client) refreshMetadata(topics []string, retries int) error {
+	// This function is a sort of central point for most functions that create new
+	// resources.  Check to see if we're dealing with a closed Client and error
+	// out immediately if so.
+	if client.Closed() {
+		return ClosedClient
+	}
+
 	// Kafka will throw exceptions on an empty topic and not return a proper
 	// Kafka will throw exceptions on an empty topic and not return a proper
 	// error. This handles the case by returning an error instead of sending it
 	// error. This handles the case by returning an error instead of sending it
 	// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
 	// off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310

+ 6 - 0
consumer.go

@@ -75,6 +75,12 @@ type Consumer struct {
 // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
 // NewConsumer creates a new consumer attached to the given client. It will read messages from the given topic and partition, as
 // part of the named consumer group.
 // part of the named consumer group.
 func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error) {
 func NewConsumer(client *Client, topic string, partition int32, group string, config *ConsumerConfig) (*Consumer, error) {
+	// Check that we are not dealing with a closed Client before processing
+	// any other arguments
+	if client.Closed() {
+		return nil, ClosedClient
+	}
+
 	if config == nil {
 	if config == nil {
 		config = NewConsumerConfig()
 		config = NewConsumerConfig()
 	}
 	}

+ 3 - 0
errors.go

@@ -9,6 +9,9 @@ import (
 // or otherwise failed to respond.
 // or otherwise failed to respond.
 var OutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")
 var OutOfBrokers = errors.New("kafka: Client has run out of available brokers to talk to. Is your cluster reachable?")
 
 
+// ClosedClient is the error returned when a method is called on a client that has been closed.
+var ClosedClient = errors.New("kafka: Tried to use a client that was closed.")
+
 // NoSuchTopic is the error returned when the supplied topic is rejected by the Kafka servers.
 // NoSuchTopic is the error returned when the supplied topic is rejected by the Kafka servers.
 var NoSuchTopic = errors.New("kafka: Topic not recognized by brokers.")
 var NoSuchTopic = errors.New("kafka: Topic not recognized by brokers.")
 
 

+ 6 - 0
producer.go

@@ -66,6 +66,12 @@ type topicPartition struct {
 
 
 // NewProducer creates a new Producer using the given client.
 // NewProducer creates a new Producer using the given client.
 func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
 func NewProducer(client *Client, config *ProducerConfig) (*Producer, error) {
+	// Check that we are not dealing with a closed Client before processing
+	// any other arguments
+	if client.Closed() {
+		return nil, ClosedClient
+	}
+
 	if config == nil {
 	if config == nil {
 		config = NewProducerConfig()
 		config = NewProducerConfig()
 	}
 	}