Browse Source

Improve metadata cache's updateTopics

Don't hold the lock while connecting to new Brokers. Don't fail if one broker
doesn't gives us metadata, try others.

Also add a getter Broker.ID()
Evan Huus 12 years ago
parent
commit
da4ea44315
3 changed files with 65 additions and 29 deletions
  1. 5 0
      broker.go
  2. 48 20
      metadata_cache.go
  3. 12 9
      metadata_response.go

+ 5 - 0
broker.go

@@ -76,6 +76,11 @@ func (b *Broker) Close() error {
 	return err
 }
 
+// Returns the broker ID from Kafka, or -1 if that is not known.
+func (b *Broker) ID() int32 {
+	return b.id
+}
+
 func (b *Broker) RequestMetadata(clientID *string, request *MetadataRequest) (*MetadataResponse, error) {
 	response := new(MetadataResponse)
 

+ 48 - 20
metadata_cache.go

@@ -25,7 +25,7 @@ func newMetadataCache(client *Client, host string, port int32) (*metadataCache,
 	mc.brokers = make(map[int32]*Broker)
 	mc.leaders = make(map[string]map[int32]int32)
 
-	mc.brokers[starter.id] = starter
+	mc.brokers[starter.ID()] = starter
 
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
 	err = mc.refreshTopics(make([]*string, 0))
@@ -36,6 +36,18 @@ func newMetadataCache(client *Client, host string, port int32) (*metadataCache,
 	return mc, nil
 }
 
+func (mc *metadataCache) removeBroker(broker *Broker) {
+	if broker == nil {
+		return
+	}
+
+	mc.lock.RLock()
+	defer mc.lock.RUnlock()
+
+	delete(mc.brokers, broker.ID())
+	go broker.Close()
+}
+
 func (mc *metadataCache) leader(topic string, partition_id int32) *Broker {
 	mc.lock.RLock()
 	defer mc.lock.RUnlock()
@@ -82,37 +94,32 @@ func (mc *metadataCache) partitions(topic string) []int32 {
 	return ret
 }
 
-func (mc *metadataCache) refreshTopics(topics []*string) error {
-	broker := mc.any()
-	if broker == nil {
-		return OutOfBrokers{}
-	}
-
-	response, err := broker.RequestMetadata(mc.client.id, &MetadataRequest{topics})
-	if err != nil {
-		return err
+func (mc *metadataCache) update(data *MetadataResponse) error {
+	// connect to the brokers before taking the lock, as this can take a while
+	// to timeout if one of them isn't reachable
+	for _, broker := range data.Brokers {
+		err := broker.Connect()
+		if err != nil {
+			return err
+		}
 	}
 
 	mc.lock.Lock()
 	defer mc.lock.Unlock()
 
-	for i := range response.Brokers {
-		broker := &response.Brokers[i]
-		err = broker.Connect()
-		if err != nil {
-			return err
+	for _, broker := range data.Brokers {
+		if mc.brokers[broker.ID()] != nil {
+			go mc.brokers[broker.ID()].Close()
 		}
-		mc.brokers[broker.id] = broker
+		mc.brokers[broker.ID()] = broker
 	}
 
-	for i := range response.Topics {
-		topic := &response.Topics[i]
+	for _, topic := range data.Topics {
 		if topic.Err != NO_ERROR {
 			return topic.Err
 		}
 		mc.leaders[*topic.Name] = make(map[int32]int32, len(topic.Partitions))
-		for j := range topic.Partitions {
-			partition := &topic.Partitions[j]
+		for _, partition := range topic.Partitions {
 			if partition.Err != NO_ERROR {
 				return partition.Err
 			}
@@ -123,6 +130,27 @@ func (mc *metadataCache) refreshTopics(topics []*string) error {
 	return nil
 }
 
+func (mc *metadataCache) refreshTopics(topics []*string) error {
+	for broker := mc.any(); broker != nil; broker = mc.any() {
+		response, err := broker.RequestMetadata(mc.client.id, &MetadataRequest{topics})
+
+		switch err.(type) {
+		case nil:
+			// valid response, use it
+			return mc.update(response)
+		case EncodingError:
+			// didn't even send, return the error
+			return err
+		}
+
+		// some other error, remove that broker and try again
+		mc.removeBroker(broker)
+
+	}
+
+	return OutOfBrokers{}
+}
+
 func (mc *metadataCache) refreshTopic(topic string) error {
 	tmp := make([]*string, 1)
 	tmp[0] = &topic

+ 12 - 9
metadata_response.go

@@ -40,7 +40,7 @@ func (pm *PartitionMetadata) decode(pd packetDecoder) (err error) {
 type TopicMetadata struct {
 	Err        KError
 	Name       *string
-	Partitions []PartitionMetadata
+	Partitions []*PartitionMetadata
 }
 
 func (tm *TopicMetadata) decode(pd packetDecoder) (err error) {
@@ -58,9 +58,10 @@ func (tm *TopicMetadata) decode(pd packetDecoder) (err error) {
 	if err != nil {
 		return err
 	}
-	tm.Partitions = make([]PartitionMetadata, n)
+	tm.Partitions = make([]*PartitionMetadata, n)
 	for i := 0; i < n; i++ {
-		err = (&tm.Partitions[i]).decode(pd)
+		tm.Partitions[i] = new(PartitionMetadata)
+		err = tm.Partitions[i].decode(pd)
 		if err != nil {
 			return err
 		}
@@ -70,8 +71,8 @@ func (tm *TopicMetadata) decode(pd packetDecoder) (err error) {
 }
 
 type MetadataResponse struct {
-	Brokers []Broker
-	Topics  []TopicMetadata
+	Brokers []*Broker
+	Topics  []*TopicMetadata
 }
 
 func (m *MetadataResponse) decode(pd packetDecoder) (err error) {
@@ -80,9 +81,10 @@ func (m *MetadataResponse) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	m.Brokers = make([]Broker, n)
+	m.Brokers = make([]*Broker, n)
 	for i := 0; i < n; i++ {
-		err = (&m.Brokers[i]).decode(pd)
+		m.Brokers[i] = new(Broker)
+		err = m.Brokers[i].decode(pd)
 		if err != nil {
 			return err
 		}
@@ -93,9 +95,10 @@ func (m *MetadataResponse) decode(pd packetDecoder) (err error) {
 		return err
 	}
 
-	m.Topics = make([]TopicMetadata, n)
+	m.Topics = make([]*TopicMetadata, n)
 	for i := 0; i < n; i++ {
-		err = (&m.Topics[i]).decode(pd)
+		m.Topics[i] = new(TopicMetadata)
+		err = m.Topics[i].decode(pd)
 		if err != nil {
 			return err
 		}