Browse Source

Expose some client methods.

The producer and consumer still make use of some private methods, but those are
harder to expose consistently. This is at least a good start towards #23.
Evan Huus 11 years ago
parent
commit
39a2698e08
4 changed files with 62 additions and 35 deletions
  1. 49 29
      client.go
  2. 9 2
      client_test.go
  3. 2 2
      consumer.go
  4. 2 2
      producer.go

+ 49 - 29
client.go

@@ -58,7 +58,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (client *Client,
 	client.leaders = make(map[string]map[int32]int32)
 
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
-	err = client.refreshTopics(make([]string, 0), client.config.MetadataRetries)
+	err = client.RefreshAllMetadata()
 	if err != nil {
 		client.Close() // this closes tmp, since it's still in the brokers hash
 		return nil, err
@@ -87,43 +87,70 @@ func (client *Client) Close() error {
 	return nil
 }
 
-// functions for use by producers and consumers
-// if Go had the concept they would be marked 'protected'
-
-func (client *Client) leader(topic string, partition_id int32) (*Broker, error) {
-	leader := client.cachedLeader(topic, partition_id)
+// Partitions returns the sorted list of available partition IDs for the given topic.
+func (client *Client) Partitions(topic string) ([]int32, error) {
+	partitions := client.cachedPartitions(topic)
 
-	if leader == nil {
-		err := client.refreshTopic(topic)
+	if partitions == nil {
+		err := client.RefreshTopicMetadata(topic)
 		if err != nil {
 			return nil, err
 		}
-		leader = client.cachedLeader(topic, partition_id)
+		partitions = client.cachedPartitions(topic)
 	}
 
-	if leader == nil {
-		return nil, UNKNOWN_TOPIC_OR_PARTITION
+	if partitions == nil {
+		return nil, NoSuchTopic
 	}
 
-	return leader, nil
+	return partitions, nil
 }
 
-func (client *Client) partitions(topic string) ([]int32, error) {
-	partitions := client.cachedPartitions(topic)
+// Topics returns the set of available topics as retrieved from the cluster metadata.
+func (client *Client) Topics() ([]string, error) {
+	client.lock.RLock()
+	defer client.lock.RUnlock()
 
-	if partitions == nil {
-		err := client.refreshTopic(topic)
+	ret := make([]string, 0, len(client.leaders))
+	for topic, _ := range client.leaders {
+		ret = append(ret, topic)
+	}
+
+	return ret, nil
+}
+
+// RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
+// available metadata for those topics.
+func (client *Client) RefreshTopicMetadata(topics ...string) error {
+	return client.refreshMetadata(topics, client.config.MetadataRetries)
+}
+
+// RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
+func (client *Client) RefreshAllMetadata() error {
+	// Kafka refreshes all when you encode it an empty array...
+	return client.refreshMetadata(make([]string, 0), client.config.MetadataRetries)
+}
+
+// functions for use by producers and consumers
+// if Go had the concept they would be marked 'protected'
+// TODO: see https://github.com/Shopify/sarama/issues/23
+
+func (client *Client) leader(topic string, partition_id int32) (*Broker, error) {
+	leader := client.cachedLeader(topic, partition_id)
+
+	if leader == nil {
+		err := client.RefreshTopicMetadata(topic)
 		if err != nil {
 			return nil, err
 		}
-		partitions = client.cachedPartitions(topic)
+		leader = client.cachedLeader(topic, partition_id)
 	}
 
-	if partitions == nil {
-		return nil, NoSuchTopic
+	if leader == nil {
+		return nil, UNKNOWN_TOPIC_OR_PARTITION
 	}
 
-	return partitions, nil
+	return leader, nil
 }
 
 func (client *Client) disconnectBroker(broker *Broker) {
@@ -147,16 +174,9 @@ func (client *Client) disconnectBroker(broker *Broker) {
 	go broker.Close()
 }
 
-func (client *Client) refreshTopic(topic string) error {
-	tmp := make([]string, 1)
-	tmp[0] = topic
-	// we permit three retries by default, 'cause that seemed like a nice number
-	return client.refreshTopics(tmp, client.config.MetadataRetries)
-}
-
 // truly private helper functions
 
-func (client *Client) refreshTopics(topics []string, retries int) error {
+func (client *Client) refreshMetadata(topics []string, retries int) error {
 	for broker := client.any(); broker != nil; broker = client.any() {
 		response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics})
 
@@ -174,7 +194,7 @@ func (client *Client) refreshTopics(topics []string, retries int) error {
 					return LEADER_NOT_AVAILABLE
 				}
 				time.Sleep(client.config.WaitForElection) // wait for leader election
-				return client.refreshTopics(retry, retries-1)
+				return client.refreshMetadata(retry, retries-1)
 			}
 		case EncodingError:
 			// didn't even send, return the error

+ 9 - 2
client_test.go

@@ -76,7 +76,14 @@ func TestClientMetadata(t *testing.T) {
 	}
 	defer client.Close()
 
-	parts, err := client.partitions("my_topic")
+	topics, err := client.Topics()
+	if err != nil {
+		t.Error(err)
+	} else if len(topics) != 1 || topics[0] != "my_topic" {
+		t.Error("Client returned incorrect topics:", topics)
+	}
+
+	parts, err := client.Partitions("my_topic")
 	if err != nil {
 		t.Error(err)
 	} else if len(parts) != 1 || parts[0] != 0 {
@@ -137,7 +144,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	}
 	defer client.Close()
 
-	parts, err := client.partitions("my_topic")
+	parts, err := client.Partitions("my_topic")
 	if err != nil {
 		t.Error(err)
 	} else if len(parts) != 1 || parts[0] != 0xb {

+ 2 - 2
consumer.go

@@ -215,7 +215,7 @@ func (c *Consumer) fetchMessages() {
 		case NO_ERROR:
 			break
 		case UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_FOR_PARTITION, LEADER_NOT_AVAILABLE:
-			err = c.client.refreshTopic(c.topic)
+			err = c.client.RefreshTopicMetadata(c.topic)
 			if c.sendError(err) {
 				for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
 					if !c.sendError(err) {
@@ -317,7 +317,7 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
 		if !retry {
 			return -1, block.Err
 		}
-		err = c.client.refreshTopic(c.topic)
+		err = c.client.RefreshTopicMetadata(c.topic)
 		if err != nil {
 			return -1, err
 		}

+ 2 - 2
producer.go

@@ -59,7 +59,7 @@ func (p *Producer) SendMessage(key, value Encoder) error {
 }
 
 func (p *Producer) choosePartition(key Encoder) (int32, error) {
-	partitions, err := p.client.partitions(p.topic)
+	partitions, err := p.client.Partitions(p.topic)
 	if err != nil {
 		return -1, err
 	}
@@ -133,7 +133,7 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
 		if !retry {
 			return block.Err
 		}
-		err = p.client.refreshTopic(p.topic)
+		err = p.client.RefreshTopicMetadata(p.topic)
 		if err != nil {
 			return err
 		}