Browse Source

Implement Client.WritablePartitions

It returns the list of partition IDs (like normal Client.Partitions) except
limited to those partitions which have leaders that are available and accepting
writes.

Bonus fixes:
- permit setting the error field in MetadataResponse.AddTopicPartition
- add "closed" check to Replicas and ReplicasInSync
- permit MetadataRetries == 0, which is at least logically coherent (and
  behaves sanely), even if it's not usually useful
Evan Huus 11 years ago
parent
commit
d33e005ce0
5 changed files with 76 additions and 29 deletions
  1. 45 9
      client.go
  2. 17 7
      client_test.go
  3. 4 4
      consumer_test.go
  4. 2 1
      metadata_response.go
  5. 8 8
      producer_test.go

+ 45 - 9
client.go

@@ -115,14 +115,39 @@ func (client *Client) Close() error {
 	return nil
 	return nil
 }
 }
 
 
-// Partitions returns the sorted list of available partition IDs for the given topic.
+// Partitions returns the sorted list of all partition IDs for the given topic.
 func (client *Client) Partitions(topic string) ([]int32, error) {
 func (client *Client) Partitions(topic string) ([]int32, error) {
 	// Check to see whether the client is closed
 	// Check to see whether the client is closed
 	if client.Closed() {
 	if client.Closed() {
 		return nil, ClosedClient
 		return nil, ClosedClient
 	}
 	}
 
 
-	partitions := client.cachedPartitions(topic)
+	partitions := client.cachedPartitions(topic, false)
+
+	if len(partitions) == 0 {
+		err := client.RefreshTopicMetadata(topic)
+		if err != nil {
+			return nil, err
+		}
+		partitions = client.cachedPartitions(topic, false)
+	}
+
+	if partitions == nil {
+		return nil, UnknownTopicOrPartition
+	}
+
+	return partitions, nil
+}
+
+// WritablePartitions returns the sorted list of all writable partition IDs for the given topic,
+// where "writable" means "having a valid leader accepting writes".
+func (client *Client) WritablePartitions(topic string) ([]int32, error) {
+	// Check to see whether the client is closed
+	if client.Closed() {
+		return nil, ClosedClient
+	}
+
+	partitions := client.cachedPartitions(topic, true)
 
 
 	// len==0 catches when it's nil (no such topic) and the odd case when every single
 	// len==0 catches when it's nil (no such topic) and the odd case when every single
 	// partition is undergoing leader election simultaneously. Callers have to be able to handle
 	// partition is undergoing leader election simultaneously. Callers have to be able to handle
@@ -135,7 +160,7 @@ func (client *Client) Partitions(topic string) ([]int32, error) {
 		if err != nil {
 		if err != nil {
 			return nil, err
 			return nil, err
 		}
 		}
-		partitions = client.cachedPartitions(topic)
+		partitions = client.cachedPartitions(topic, true)
 	}
 	}
 
 
 	if partitions == nil {
 	if partitions == nil {
@@ -182,6 +207,10 @@ func (client *Client) getMetadata(topic string, partitionID int32) (*PartitionMe
 }
 }
 
 
 func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error) {
 func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error) {
+	if client.Closed() {
+		return nil, ClosedClient
+	}
+
 	metadata, err := client.getMetadata(topic, partitionID)
 	metadata, err := client.getMetadata(topic, partitionID)
 
 
 	if err != nil {
 	if err != nil {
@@ -195,6 +224,10 @@ func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error)
 }
 }
 
 
 func (client *Client) ReplicasInSync(topic string, partitionID int32) ([]int32, error) {
 func (client *Client) ReplicasInSync(topic string, partitionID int32) ([]int32, error) {
+	if client.Closed() {
+		return nil, ClosedClient
+	}
+
 	metadata, err := client.getMetadata(topic, partitionID)
 	metadata, err := client.getMetadata(topic, partitionID)
 
 
 	if err != nil {
 	if err != nil {
@@ -418,7 +451,7 @@ func (client *Client) cachedMetadata(topic string, partitionID int32) *Partition
 	return nil
 	return nil
 }
 }
 
 
-func (client *Client) cachedPartitions(topic string) []int32 {
+func (client *Client) cachedPartitions(topic string, onlyWritable bool) []int32 {
 	client.lock.RLock()
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 	defer client.lock.RUnlock()
 
 
@@ -428,8 +461,11 @@ func (client *Client) cachedPartitions(topic string) []int32 {
 	}
 	}
 
 
 	ret := make([]int32, 0, len(partitions))
 	ret := make([]int32, 0, len(partitions))
-	for id := range partitions {
-		ret = append(ret, id)
+	for _, partition := range partitions {
+		if onlyWritable && partition.Err == LeaderNotAvailable {
+			continue
+		}
+		ret = append(ret, partition.ID)
 	}
 	}
 
 
 	sort.Sort(int32Slice(ret))
 	sort.Sort(int32Slice(ret))
@@ -529,12 +565,12 @@ func NewClientConfig() *ClientConfig {
 // Validate checks a ClientConfig instance. This will return a
 // Validate checks a ClientConfig instance. This will return a
 // ConfigurationError if the specified values don't make sense.
 // ConfigurationError if the specified values don't make sense.
 func (config *ClientConfig) Validate() error {
 func (config *ClientConfig) Validate() error {
-	if config.MetadataRetries <= 0 {
-		return ConfigurationError("Invalid MetadataRetries. Try 10")
+	if config.MetadataRetries < 0 {
+		return ConfigurationError("Invalid MetadataRetries")
 	}
 	}
 
 
 	if config.WaitForElection <= time.Duration(0) {
 	if config.WaitForElection <= time.Duration(0) {
-		return ConfigurationError("Invalid WaitForElection. Try 250*time.Millisecond")
+		return ConfigurationError("Invalid WaitForElection")
 	}
 	}
 
 
 	if config.DefaultBrokerConf != nil {
 	if config.DefaultBrokerConf != nil {

+ 17 - 7
client_test.go

@@ -60,10 +60,13 @@ func TestClientMetadata(t *testing.T) {
 
 
 	mdr := new(MetadataResponse)
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb5.Addr(), mb5.BrokerID())
 	mdr.AddBroker(mb5.Addr(), mb5.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, mb5.BrokerID(), replicas, isr)
+	mdr.AddTopicPartition("my_topic", 0, mb5.BrokerID(), replicas, isr, NoError)
+	mdr.AddTopicPartition("my_topic", 1, mb5.BrokerID(), replicas, isr, LeaderNotAvailable)
 	mb1.Returns(mdr)
 	mb1.Returns(mdr)
 
 
-	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
+	config := NewClientConfig()
+	config.MetadataRetries = 0
+	client, err := NewClient("client_id", []string{mb1.Addr()}, config)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -81,10 +84,17 @@ func TestClientMetadata(t *testing.T) {
 	parts, err := client.Partitions("my_topic")
 	parts, err := client.Partitions("my_topic")
 	if err != nil {
 	if err != nil {
 		t.Error(err)
 		t.Error(err)
-	} else if len(parts) != 1 || parts[0] != 0 {
+	} else if len(parts) != 2 || parts[0] != 0 || parts[1] != 1 {
 		t.Error("Client returned incorrect partitions for my_topic:", parts)
 		t.Error("Client returned incorrect partitions for my_topic:", parts)
 	}
 	}
 
 
+	parts, err = client.WritablePartitions("my_topic")
+	if err != nil {
+		t.Error(err)
+	} else if len(parts) != 1 || parts[0] != 0 {
+		t.Error("Client returned incorrect writable partitions for my_topic:", parts)
+	}
+
 	tst, err := client.Leader("my_topic", 0)
 	tst, err := client.Leader("my_topic", 0)
 	if err != nil {
 	if err != nil {
 		t.Error(err)
 		t.Error(err)
@@ -122,16 +132,13 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	mb1.Returns(mdr)
 	mb1.Returns(mdr)
 
 
 	mdr2 := new(MetadataResponse)
 	mdr2 := new(MetadataResponse)
-	mdr2.AddTopicPartition("my_topic", 0xb, mb5.BrokerID(), nil, nil)
+	mdr2.AddTopicPartition("my_topic", 0xb, mb5.BrokerID(), nil, nil, NoError)
 	mb5.Returns(mdr2)
 	mb5.Returns(mdr2)
 
 
 	client, err := NewClient("clientID", []string{mb1.Addr()}, nil)
 	client, err := NewClient("clientID", []string{mb1.Addr()}, nil)
 	if err != nil {
 	if err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	defer safeClose(t, client)
-	defer mb1.Close()
-	defer mb5.Close()
 
 
 	parts, err := client.Partitions("my_topic")
 	parts, err := client.Partitions("my_topic")
 	if err != nil {
 	if err != nil {
@@ -148,4 +155,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	}
 	}
 
 
 	client.disconnectBroker(tst)
 	client.disconnectBroker(tst)
+	mb5.Close()
+	mb1.Close()
+	safeClose(t, client)
 }
 }

+ 4 - 4
consumer_test.go

@@ -19,7 +19,7 @@ func TestSimpleConsumer(t *testing.T) {
 
 
 	mdr := new(MetadataResponse)
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil)
+	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
 	mb1.Returns(mdr)
 	mb1.Returns(mdr)
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
@@ -62,7 +62,7 @@ func TestConsumerRawOffset(t *testing.T) {
 
 
 	mdr := new(MetadataResponse)
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil)
+	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
 	mb1.Returns(mdr)
 	mb1.Returns(mdr)
 
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
@@ -95,7 +95,7 @@ func TestConsumerLatestOffset(t *testing.T) {
 
 
 	mdr := new(MetadataResponse)
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil)
+	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
 	mb1.Returns(mdr)
 	mb1.Returns(mdr)
 
 
 	or := new(OffsetResponse)
 	or := new(OffsetResponse)
@@ -130,7 +130,7 @@ func TestConsumerPrelude(t *testing.T) {
 
 
 	mdr := new(MetadataResponse)
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil)
+	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
 	mb1.Returns(mdr)
 	mb1.Returns(mdr)
 
 
 	fr := new(FetchResponse)
 	fr := new(FetchResponse)

+ 2 - 1
metadata_response.go

@@ -182,7 +182,7 @@ func (m *MetadataResponse) AddBroker(addr string, id int32) {
 	m.Brokers = append(m.Brokers, &Broker{id: id, addr: addr})
 	m.Brokers = append(m.Brokers, &Broker{id: id, addr: addr})
 }
 }
 
 
-func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32) {
+func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, err KError) {
 	var match *TopicMetadata
 	var match *TopicMetadata
 
 
 	for _, tm := range m.Topics {
 	for _, tm := range m.Topics {
@@ -216,5 +216,6 @@ foundPartition:
 	pmatch.Leader = brokerID
 	pmatch.Leader = brokerID
 	pmatch.Replicas = replicas
 	pmatch.Replicas = replicas
 	pmatch.Isr = isr
 	pmatch.Isr = isr
+	pmatch.Err = err
 
 
 }
 }

+ 8 - 8
producer_test.go

@@ -21,7 +21,7 @@ func TestSimpleProducer(t *testing.T) {
 
 
 	response1 := new(MetadataResponse)
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, 2, nil, nil)
+	response1.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
 	broker1.Returns(response1)
 	broker1.Returns(response1)
 
 
 	response2 := new(ProduceResponse)
 	response2 := new(ProduceResponse)
@@ -59,7 +59,7 @@ func TestConcurrentSimpleProducer(t *testing.T) {
 
 
 	response1 := new(MetadataResponse)
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, 2, nil, nil)
+	response1.AddTopicPartition("my_topic", 0, 2, nil, nil, NoError)
 	broker1.Returns(response1)
 	broker1.Returns(response1)
 
 
 	response2 := new(ProduceResponse)
 	response2 := new(ProduceResponse)
@@ -105,7 +105,7 @@ func TestProducer(t *testing.T) {
 
 
 	response1 := new(MetadataResponse)
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil)
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
 	broker1.Returns(response1)
 	broker1.Returns(response1)
 
 
 	response2 := new(ProduceResponse)
 	response2 := new(ProduceResponse)
@@ -153,7 +153,7 @@ func TestProducerMultipleFlushes(t *testing.T) {
 
 
 	response1 := new(MetadataResponse)
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil)
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
 	broker1.Returns(response1)
 	broker1.Returns(response1)
 
 
 	response2 := new(ProduceResponse)
 	response2 := new(ProduceResponse)
@@ -208,8 +208,8 @@ func TestProducerMultipleBrokers(t *testing.T) {
 	response1 := new(MetadataResponse)
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
 	response1.AddBroker(broker3.Addr(), broker3.BrokerID())
 	response1.AddBroker(broker3.Addr(), broker3.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil)
-	response1.AddTopicPartition("my_topic", 1, broker3.BrokerID(), nil, nil)
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
+	response1.AddTopicPartition("my_topic", 1, broker3.BrokerID(), nil, nil, NoError)
 	broker1.Returns(response1)
 	broker1.Returns(response1)
 
 
 	response2 := new(ProduceResponse)
 	response2 := new(ProduceResponse)
@@ -261,7 +261,7 @@ func TestProducerFailureRetry(t *testing.T) {
 
 
 	response1 := new(MetadataResponse)
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil)
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil, NoError)
 	broker1.Returns(response1)
 	broker1.Returns(response1)
 
 
 	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
 	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
@@ -287,7 +287,7 @@ func TestProducerFailureRetry(t *testing.T) {
 
 
 	response3 := new(MetadataResponse)
 	response3 := new(MetadataResponse)
 	response3.AddBroker(broker3.Addr(), broker3.BrokerID())
 	response3.AddBroker(broker3.Addr(), broker3.BrokerID())
-	response3.AddTopicPartition("my_topic", 0, broker3.BrokerID(), nil, nil)
+	response3.AddTopicPartition("my_topic", 0, broker3.BrokerID(), nil, nil, NoError)
 	broker2.Returns(response3)
 	broker2.Returns(response3)
 
 
 	response4 := new(ProduceResponse)
 	response4 := new(ProduceResponse)