Browse Source

Make Client.Leader() public.

Also add a more comprehensive comment to disconnectBroker().

This is the last piece of #23 that doesn't heavily depend on getting #15 right
first.
Evan Huus 11 years ago
parent
commit
d1bbdae1d5
4 changed files with 29 additions and 27 deletions
  1. 21 19
      client.go
  2. 2 2
      client_test.go
  3. 5 5
      consumer.go
  4. 1 1
      producer.go

+ 21 - 19
client.go

@@ -119,23 +119,9 @@ func (client *Client) Topics() ([]string, error) {
 	return ret, nil
 }
 
-// RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
-// available metadata for those topics.
-func (client *Client) RefreshTopicMetadata(topics ...string) error {
-	return client.refreshMetadata(topics, client.config.MetadataRetries)
-}
-
-// RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
-func (client *Client) RefreshAllMetadata() error {
-	// Kafka refreshes all when you encode it an empty array...
-	return client.refreshMetadata(make([]string, 0), client.config.MetadataRetries)
-}
-
-// functions for use by producers and consumers
-// if Go had the concept they would be marked 'protected'
-// TODO: see https://github.com/Shopify/sarama/issues/23
-
-func (client *Client) leader(topic string, partition_id int32) (*Broker, error) {
+// Leader returns the broker object that is the leader of the current topic/partition, as
+// determined by querying the cluster metadata.
+func (client *Client) Leader(topic string, partition_id int32) (*Broker, error) {
 	leader := client.cachedLeader(topic, partition_id)
 
 	if leader == nil {
@@ -153,6 +139,24 @@ func (client *Client) leader(topic string, partition_id int32) (*Broker, error)
 	return leader, nil
 }
 
+// RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
+// available metadata for those topics.
+func (client *Client) RefreshTopicMetadata(topics ...string) error {
+	return client.refreshMetadata(topics, client.config.MetadataRetries)
+}
+
+// RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
+func (client *Client) RefreshAllMetadata() error {
+	// Kafka refreshes all when you encode it an empty array...
+	return client.refreshMetadata(make([]string, 0), client.config.MetadataRetries)
+}
+
+// misc private helper functions
+
+// XXX: see https://github.com/Shopify/sarama/issues/15
+//      and https://github.com/Shopify/sarama/issues/23
+// disconnectBroker is a bad hacky way to accomplish broker management. It should be replaced with
+// something sane and the replacement should be made part of the public Client API
 func (client *Client) disconnectBroker(broker *Broker) {
 	client.lock.Lock()
 	defer client.lock.Unlock()
@@ -174,8 +178,6 @@ func (client *Client) disconnectBroker(broker *Broker) {
 	go broker.Close()
 }
 
-// truly private helper functions
-
 func (client *Client) refreshMetadata(topics []string, retries int) error {
 	for broker := client.any(); broker != nil; broker = client.any() {
 		response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics})

+ 2 - 2
client_test.go

@@ -90,7 +90,7 @@ func TestClientMetadata(t *testing.T) {
 		t.Error("Client returned incorrect partitions for my_topic:", parts)
 	}
 
-	tst, err := client.leader("my_topic", 0)
+	tst, err := client.Leader("my_topic", 0)
 	if err != nil {
 		t.Error(err)
 	} else if tst.ID() != 5 {
@@ -151,7 +151,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 		t.Error("Client returned incorrect partitions for my_topic:", parts)
 	}
 
-	tst, err := client.leader("my_topic", 0xb)
+	tst, err := client.Leader("my_topic", 0xb)
 	if err != nil {
 		t.Error(err)
 	} else if tst.ID() != 0xaa {

+ 5 - 5
consumer.go

@@ -96,7 +96,7 @@ func NewConsumer(client *Client, topic string, partition int32, group string, co
 		return nil, ConfigurationError("Invalid EventBufferSize")
 	}
 
-	broker, err := client.leader(topic, partition)
+	broker, err := client.Leader(topic, partition)
 	if err != nil {
 		return nil, err
 	}
@@ -194,7 +194,7 @@ func (c *Consumer) fetchMessages() {
 			}
 		default:
 			c.client.disconnectBroker(c.broker)
-			for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
+			for c.broker = nil; err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
 				if !c.sendError(err) {
 					return
 				}
@@ -217,7 +217,7 @@ func (c *Consumer) fetchMessages() {
 		case UNKNOWN_TOPIC_OR_PARTITION, NOT_LEADER_FOR_PARTITION, LEADER_NOT_AVAILABLE:
 			err = c.client.RefreshTopicMetadata(c.topic)
 			if c.sendError(err) {
-				for c.broker = nil; err != nil; c.broker, err = c.client.leader(c.topic, c.partition) {
+				for c.broker = nil; err != nil; c.broker, err = c.client.Leader(c.topic, c.partition) {
 					if !c.sendError(err) {
 						return
 					}
@@ -295,7 +295,7 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
 			return -1, err
 		}
 		c.client.disconnectBroker(c.broker)
-		c.broker, err = c.client.leader(c.topic, c.partition)
+		c.broker, err = c.client.Leader(c.topic, c.partition)
 		if err != nil {
 			return -1, err
 		}
@@ -321,7 +321,7 @@ func (c *Consumer) getOffset(where OffsetTime, retry bool) (int64, error) {
 		if err != nil {
 			return -1, err
 		}
-		c.broker, err = c.client.leader(c.topic, c.partition)
+		c.broker, err = c.client.Leader(c.topic, c.partition)
 		if err != nil {
 			return -1, err
 		}

+ 1 - 1
producer.go

@@ -95,7 +95,7 @@ func (p *Producer) safeSendMessage(key, value Encoder, retry bool) error {
 		return err
 	}
 
-	broker, err := p.client.leader(p.topic, partition)
+	broker, err := p.client.Leader(p.topic, partition)
 	if err != nil {
 		return err
 	}