Browse Source

Fix two metadata-cache bugs

- don't bounce broker connections if the one we've got is fine
- don't leak brokers on an error in newMetadataCache
Evan Huus 12 years ago
parent
commit
bf6925bd05
2 changed files with 49 additions and 18 deletions
  1. 38 18
      kafka/metadata_cache.go
  2. 11 0
      protocol/broker.go

+ 38 - 18
kafka/metadata_cache.go

@@ -15,39 +15,45 @@ type metadataCache struct {
 }
 }
 
 
 func newMetadataCache(client *Client, host string, port int32) (*metadataCache, error) {
 func newMetadataCache(client *Client, host string, port int32) (*metadataCache, error) {
-	mc := new(metadataCache)
-
-	starter := k.NewBroker(host, port)
-	err := starter.Connect()
+	tmp := k.NewBroker(host, port)
+	err := tmp.Connect()
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
+	mc := new(metadataCache)
 	mc.client = client
 	mc.client = client
 	mc.brokers = make(map[int32]*k.Broker)
 	mc.brokers = make(map[int32]*k.Broker)
 	mc.leaders = make(map[string]map[int32]int32)
 	mc.leaders = make(map[string]map[int32]int32)
 
 
-	mc.brokers[starter.ID()] = starter
+	// add it temporarily with an invalid ID so that refreshTopics can find it
+	mc.brokers[-1] = tmp
 
 
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
 	// do an initial fetch of all cluster metadata by specifing an empty list of topics
 	err = mc.refreshTopics(make([]string, 0))
 	err = mc.refreshTopics(make([]string, 0))
 	if err != nil {
 	if err != nil {
+		mc.closeAll() // this closes tmp, since it's still in the brokers hash
 		return nil, err
 		return nil, err
 	}
 	}
 
 
+	// now remove our tmp broker - the successful metadata request will have returned it
+	// with a valid ID, so it will already be in the hash somewhere else and we don't need
+	// the incomplete tmp one anymore
+	go mc.brokers[-1].Close()
+	delete(mc.brokers, -1)
+
 	return mc, nil
 	return mc, nil
 }
 }
 
 
-func (mc *metadataCache) removeBroker(broker *k.Broker) {
-	if broker == nil {
-		return
-	}
-
-	mc.lock.RLock()
-	defer mc.lock.RUnlock()
+func (mc *metadataCache) closeAll() {
+	mc.lock.Lock()
+	defer mc.lock.Unlock()
 
 
-	delete(mc.brokers, broker.ID())
-	go broker.Close()
+	for _, broker := range mc.brokers {
+		go broker.Close()
+	}
+	mc.brokers = nil
+	mc.leaders = nil
 }
 }
 
 
 func (mc *metadataCache) leader(topic string, partition_id int32) *k.Broker {
 func (mc *metadataCache) leader(topic string, partition_id int32) *k.Broker {
@@ -97,9 +103,21 @@ func (mc *metadataCache) partitions(topic string) []int32 {
 }
 }
 
 
 func (mc *metadataCache) update(data *k.MetadataResponse) error {
 func (mc *metadataCache) update(data *k.MetadataResponse) error {
+	// First discard brokers that we already know about. This avoids bouncing TCP connections,
+	// and especially avoids closing valid connections out from under clients who may be trying
+	// to use them. We only need a read-lock for this.
+	var newBrokers []*k.Broker
+	mc.lock.RLock()
+	for _, broker := range data.Brokers {
+		if !broker.Equals(mc.brokers[broker.ID()]) {
+			newBrokers = append(newBrokers, broker)
+		}
+	}
+	mc.lock.RUnlock()
+
 	// connect to the brokers before taking the lock, as this can take a while
 	// connect to the brokers before taking the lock, as this can take a while
 	// to timeout if one of them isn't reachable
 	// to timeout if one of them isn't reachable
-	for _, broker := range data.Brokers {
+	for _, broker := range newBrokers {
 		err := broker.Connect()
 		err := broker.Connect()
 		if err != nil {
 		if err != nil {
 			return err
 			return err
@@ -109,7 +127,7 @@ func (mc *metadataCache) update(data *k.MetadataResponse) error {
 	mc.lock.Lock()
 	mc.lock.Lock()
 	defer mc.lock.Unlock()
 	defer mc.lock.Unlock()
 
 
-	for _, broker := range data.Brokers {
+	for _, broker := range newBrokers {
 		if mc.brokers[broker.ID()] != nil {
 		if mc.brokers[broker.ID()] != nil {
 			go mc.brokers[broker.ID()].Close()
 			go mc.brokers[broker.ID()].Close()
 		}
 		}
@@ -146,8 +164,10 @@ func (mc *metadataCache) refreshTopics(topics []string) error {
 		}
 		}
 
 
 		// some other error, remove that broker and try again
 		// some other error, remove that broker and try again
-		mc.removeBroker(broker)
-
+		mc.lock.Lock()
+		delete(mc.brokers, broker.ID())
+		go broker.Close()
+		mc.lock.Unlock()
 	}
 	}
 
 
 	return OutOfBrokers
 	return OutOfBrokers

+ 11 - 0
protocol/broker.go

@@ -91,6 +91,17 @@ func (b *Broker) ID() int32 {
 	return b.id
 	return b.id
 }
 }
 
 
+// Two brokers are equal if they have the same host, port, and id.
+func (b *Broker) Equals(a *Broker) bool {
+	switch {
+	case a == nil && b == nil:
+		return true
+	case (a == nil && b != nil) || (a != nil && b == nil):
+		return false
+	}
+	return a.id == b.id && a.host == b.host && a.port == b.port
+}
+
 func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*MetadataResponse, error) {
 func (b *Broker) GetMetadata(clientID string, request *MetadataRequest) (*MetadataResponse, error) {
 	response := new(MetadataResponse)
 	response := new(MetadataResponse)