Ver Fonte

Merge pull request #208 from Shopify/expose-all-metadata

Expose all metadata
Willem van Bergen há 11 anos atrás
pai
commit
56eea89b19
6 ficheiros alterados com 117 adições e 30 exclusões
  1. 65 15
      client.go
  2. 24 2
      client_test.go
  3. 4 4
      consumer_test.go
  4. 3 1
      metadata_response.go
  5. 7 7
      producer_test.go
  6. 14 1
      utils.go

+ 65 - 15
client.go

@@ -30,9 +30,9 @@ type Client struct {
 	seedBroker      *Broker
 	deadBrokerAddrs map[string]struct{}
 
-	brokers map[int32]*Broker          // maps broker ids to brokers
-	leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
-	lock    sync.RWMutex               // protects access to the maps, only one since they're always written together
+	brokers  map[int32]*Broker                       // maps broker ids to brokers
+	metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
+	lock     sync.RWMutex                            // protects access to the maps, only one since they're always written together
 }
 
 // NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses
@@ -61,7 +61,7 @@ func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error)
 		seedBroker:      NewBroker(addrs[0]),
 		deadBrokerAddrs: make(map[string]struct{}),
 		brokers:         make(map[int32]*Broker),
-		leaders:         make(map[string]map[int32]int32),
+		metadata:        make(map[string]map[int32]*PartitionMetadata),
 	}
 	_ = client.seedBroker.Open(config.DefaultBrokerConf)
 
@@ -104,7 +104,7 @@ func (client *Client) Close() error {
 		safeAsyncClose(broker)
 	}
 	client.brokers = nil
-	client.leaders = nil
+	client.metadata = nil
 
 	if client.seedBroker != nil {
 		safeAsyncClose(client.seedBroker)
@@ -155,14 +155,52 @@ func (client *Client) Topics() ([]string, error) {
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 
-	ret := make([]string, 0, len(client.leaders))
-	for topic := range client.leaders {
+	ret := make([]string, 0, len(client.metadata))
+	for topic := range client.metadata {
 		ret = append(ret, topic)
 	}
 
 	return ret, nil
 }
 
+func (client *Client) getMetadata(topic string, partitionID int32) (*PartitionMetadata, error) {
+	metadata := client.cachedMetadata(topic, partitionID)
+
+	if metadata == nil {
+		err := client.RefreshTopicMetadata(topic)
+		if err != nil {
+			return nil, err
+		}
+		metadata = client.cachedMetadata(topic, partitionID)
+	}
+
+	if metadata == nil {
+		return nil, UnknownTopicOrPartition
+	}
+
+	return metadata, nil
+}
+
+func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error) {
+	metadata, err := client.getMetadata(topic, partitionID)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return dupeAndSort(metadata.Replicas), nil
+}
+
+func (client *Client) ReplicasInSync(topic string, partitionID int32) ([]int32, error) {
+	metadata, err := client.getMetadata(topic, partitionID)
+
+	if err != nil {
+		return nil, err
+	}
+
+	return dupeAndSort(metadata.Isr), nil
+}
+
 // Leader returns the broker object that is the leader of the current topic/partition, as
 // determined by querying the cluster metadata.
 func (client *Client) Leader(topic string, partitionID int32) (*Broker, error) {
@@ -352,22 +390,34 @@ func (client *Client) cachedLeader(topic string, partitionID int32) *Broker {
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 
-	partitions := client.leaders[topic]
+	partitions := client.metadata[topic]
 	if partitions != nil {
-		leader, ok := partitions[partitionID]
+		metadata, ok := partitions[partitionID]
 		if ok {
-			return client.brokers[leader]
+			return client.brokers[metadata.Leader]
 		}
 	}
 
 	return nil
 }
 
+func (client *Client) cachedMetadata(topic string, partitionID int32) *PartitionMetadata {
+	client.lock.RLock()
+	defer client.lock.RUnlock()
+
+	partitions := client.metadata[topic]
+	if partitions != nil {
+		return partitions[partitionID]
+	}
+
+	return nil
+}
+
 func (client *Client) cachedPartitions(topic string) []int32 {
 	client.lock.RLock()
 	defer client.lock.RUnlock()
 
-	partitions := client.leaders[topic]
+	partitions := client.metadata[topic]
 	if partitions == nil {
 		return nil
 	}
@@ -436,22 +486,22 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
 		default:
 			return nil, topic.Err
 		}
-		client.leaders[topic.Name] = make(map[int32]int32, len(topic.Partitions))
+		client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
 		for _, partition := range topic.Partitions {
 			switch partition.Err {
 			case LeaderNotAvailable:
 				toRetry[topic.Name] = true
-				delete(client.leaders[topic.Name], partition.ID)
+				delete(client.metadata[topic.Name], partition.ID)
 			case NoError:
 				broker := client.brokers[partition.Leader]
 				if _, present := client.deadBrokerAddrs[broker.Addr()]; present {
 					if connected, _ := broker.Connected(); !connected {
 						toRetry[topic.Name] = true
-						delete(client.leaders[topic.Name], partition.ID)
+						delete(client.metadata[topic.Name], partition.ID)
 						continue
 					}
 				}
-				client.leaders[topic.Name][partition.ID] = partition.Leader
+				client.metadata[topic.Name][partition.ID] = partition
 			default:
 				return nil, partition.Err
 			}

+ 24 - 2
client_test.go

@@ -55,10 +55,12 @@ func TestClientMetadata(t *testing.T) {
 
 	mb1 := NewMockBroker(t, 1)
 	mb5 := NewMockBroker(t, 5)
+	replicas := []int32{3, 1, 5}
+	isr := []int32{5, 1}
 
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb5.Addr(), mb5.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, mb5.BrokerID())
+	mdr.AddTopicPartition("my_topic", 0, mb5.BrokerID(), replicas, isr)
 	mb1.Returns(mdr)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
@@ -89,6 +91,26 @@ func TestClientMetadata(t *testing.T) {
 	} else if tst.ID() != 5 {
 		t.Error("Leader for my_topic had incorrect ID.")
 	}
+
+	replicas, err = client.Replicas("my_topic", 0)
+	if err != nil {
+		t.Error(err)
+	} else if replicas[0] != 1 {
+		t.Error("Incorrect (or unsorted) replica")
+	} else if replicas[1] != 3 {
+		t.Error("Incorrect (or unsorted) replica")
+	} else if replicas[2] != 5 {
+		t.Error("Incorrect (or unsorted) replica")
+	}
+
+	isr, err = client.ReplicasInSync("my_topic", 0)
+	if err != nil {
+		t.Error(err)
+	} else if isr[0] != 1 {
+		t.Error("Incorrect (or unsorted) isr")
+	} else if isr[1] != 5 {
+		t.Error("Incorrect (or unsorted) isr")
+	}
 }
 
 func TestClientRefreshBehaviour(t *testing.T) {
@@ -100,7 +122,7 @@ func TestClientRefreshBehaviour(t *testing.T) {
 	mb1.Returns(mdr)
 
 	mdr2 := new(MetadataResponse)
-	mdr2.AddTopicPartition("my_topic", 0xb, mb5.BrokerID())
+	mdr2.AddTopicPartition("my_topic", 0xb, mb5.BrokerID(), nil, nil)
 	mb5.Returns(mdr2)
 
 	client, err := NewClient("clientID", []string{mb1.Addr()}, nil)

+ 4 - 4
consumer_test.go

@@ -19,7 +19,7 @@ func TestSimpleConsumer(t *testing.T) {
 
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2)
+	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil)
 	mb1.Returns(mdr)
 
 	for i := 0; i < 10; i++ {
@@ -62,7 +62,7 @@ func TestConsumerRawOffset(t *testing.T) {
 
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2)
+	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil)
 	mb1.Returns(mdr)
 
 	client, err := NewClient("client_id", []string{mb1.Addr()}, nil)
@@ -95,7 +95,7 @@ func TestConsumerLatestOffset(t *testing.T) {
 
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2)
+	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil)
 	mb1.Returns(mdr)
 
 	or := new(OffsetResponse)
@@ -130,7 +130,7 @@ func TestConsumerPrelude(t *testing.T) {
 
 	mdr := new(MetadataResponse)
 	mdr.AddBroker(mb2.Addr(), mb2.BrokerID())
-	mdr.AddTopicPartition("my_topic", 0, 2)
+	mdr.AddTopicPartition("my_topic", 0, 2, nil, nil)
 	mb1.Returns(mdr)
 
 	fr := new(FetchResponse)

+ 3 - 1
metadata_response.go

@@ -182,7 +182,7 @@ func (m *MetadataResponse) AddBroker(addr string, id int32) {
 	m.Brokers = append(m.Brokers, &Broker{id: id, addr: addr})
 }
 
-func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32) {
+func (m *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32) {
 	var match *TopicMetadata
 
 	for _, tm := range m.Topics {
@@ -214,5 +214,7 @@ foundTopic:
 foundPartition:
 
 	pmatch.Leader = brokerID
+	pmatch.Replicas = replicas
+	pmatch.Isr = isr
 
 }

+ 7 - 7
producer_test.go

@@ -22,7 +22,7 @@ func TestSimpleProducer(t *testing.T) {
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, 2)
+	response1.AddTopicPartition("my_topic", 0, 2, nil, nil)
 	broker1.Returns(response1)
 
 	response2 := new(ProduceResponse)
@@ -59,7 +59,7 @@ func TestProducer(t *testing.T) {
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil)
 	broker1.Returns(response1)
 
 	response2 := new(ProduceResponse)
@@ -101,7 +101,7 @@ func TestProducerMultipleFlushes(t *testing.T) {
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil)
 	broker1.Returns(response1)
 
 	response2 := new(ProduceResponse)
@@ -150,8 +150,8 @@ func TestProducerMultipleBrokers(t *testing.T) {
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
 	response1.AddBroker(broker3.Addr(), broker3.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 1, broker3.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil)
+	response1.AddTopicPartition("my_topic", 1, broker3.BrokerID(), nil, nil)
 	broker1.Returns(response1)
 
 	response2 := new(ProduceResponse)
@@ -197,7 +197,7 @@ func TestProducerFailureRetry(t *testing.T) {
 
 	response1 := new(MetadataResponse)
 	response1.AddBroker(broker2.Addr(), broker2.BrokerID())
-	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID())
+	response1.AddTopicPartition("my_topic", 0, broker2.BrokerID(), nil, nil)
 	broker1.Returns(response1)
 
 	client, err := NewClient("client_id", []string{broker1.Addr()}, nil)
@@ -223,7 +223,7 @@ func TestProducerFailureRetry(t *testing.T) {
 
 	response3 := new(MetadataResponse)
 	response3.AddBroker(broker3.Addr(), broker3.BrokerID())
-	response3.AddTopicPartition("my_topic", 0, broker3.BrokerID())
+	response3.AddTopicPartition("my_topic", 0, broker3.BrokerID(), nil, nil)
 	broker2.Returns(response3)
 
 	response4 := new(ProduceResponse)

+ 14 - 1
utils.go

@@ -1,6 +1,9 @@
 package sarama
 
-import "io"
+import (
+	"io"
+	"sort"
+)
 
 // make []int32 sortable so we can sort partition numbers
 type int32Slice []int32
@@ -17,6 +20,16 @@ func (slice int32Slice) Swap(i, j int) {
 	slice[i], slice[j] = slice[j], slice[i]
 }
 
+func dupeAndSort(input []int32) []int32 {
+	ret := make([]int32, 0, len(input))
+	for _, val := range input {
+		ret = append(ret, val)
+	}
+
+	sort.Sort(int32Slice(ret))
+	return ret
+}
+
 func withRecover(fn func()) {
 	defer func() {
 		handler := PanicHandler