Browse Source

Replace brokerManager with metadataCache

Much more descriptive, and cleaner definition of what its responsibilities are.
More refactoring and slimming and dead code removal of course.
Evan Huus 12 years ago
parent
commit
b389af0788
4 changed files with 152 additions and 200 deletions
  1. 0 194
      broker_manager.go
  2. 22 3
      client.go
  3. 122 0
      metadata_cache.go
  4. 8 3
      producer.go

+ 0 - 194
broker_manager.go

@@ -1,194 +0,0 @@
-package kafka
-
-import "sync"
-
-type brokerManager struct {
-	client        *Client
-	defaultBroker *broker
-
-	brokers    map[int32]*broker                       // maps broker ids to brokers
-	partitions map[string]map[int32]*partitionMetadata // maps topics to partition ids to partitions
-	lock       sync.RWMutex                            // protects access to the maps, only one since they're always accessed together
-}
-
-func newBrokerManager(client *Client, host string, port int32) (bm *brokerManager, err error) {
-	bm = new(brokerManager)
-
-	bm.client = client
-
-	// we create a new broker object as the default 'master' broker
-	// if this broker is also a leader then we will end up with two broker objects for it, but that's not a big deal
-	bm.defaultBroker, err = newBroker(host, port)
-	if err != nil {
-		return nil, err
-	}
-
-	bm.brokers = make(map[int32]*broker)
-	bm.partitions = make(map[string]map[int32]*partitionMetadata)
-
-	// do an initial fetch of all cluster metadata by specifing an empty list of topics
-	err = bm.refreshTopics(make([]*string, 0))
-	if err != nil {
-		return nil, err
-	}
-
-	return bm, nil
-}
-
-func (bm *brokerManager) terminateBroker(id int32) {
-	bm.lock.Lock()
-	delete(bm.brokers, id)
-	bm.lock.Unlock()
-}
-
-func (bm *brokerManager) getCachedLeader(topic string, partition_id int32) *broker {
-	var leader *broker = nil
-
-	bm.lock.RLock()
-	defer bm.lock.RUnlock()
-
-	id_map := bm.partitions[topic]
-	if id_map != nil {
-		partition := id_map[partition_id]
-		if partition != nil {
-			leader = bm.brokers[partition.leader]
-		}
-	}
-
-	return leader
-}
-
-func (bm *brokerManager) getLeader(topic string, partition_id int32) (*broker, error) {
-
-	leader := bm.getCachedLeader(topic, partition_id)
-
-	if leader == nil {
-		err := bm.refreshTopic(topic)
-		if err != nil {
-			return nil, err
-		}
-
-		leader = bm.getCachedLeader(topic, partition_id)
-	}
-
-	if leader == nil {
-		return nil, UNKNOWN_TOPIC_OR_PARTITION
-	}
-
-	return leader, nil
-}
-
-func (bm *brokerManager) partitionsForTopic(topic string) ([]int32, error) {
-	bm.lock.RLock()
-	id_map := bm.partitions[topic]
-	if id_map == nil {
-		bm.lock.RUnlock()
-		err := bm.refreshTopic(topic)
-		if err != nil {
-			return nil, err
-		}
-		bm.lock.RLock()
-		id_map = bm.partitions[topic]
-		if id_map == nil {
-			bm.lock.RUnlock()
-			return nil, UNKNOWN_TOPIC_OR_PARTITION
-		}
-	}
-	partitions := make([]int32, len(id_map))
-	i := 0
-	for id, _ := range id_map {
-		partitions[i] = id
-		i++
-	}
-	bm.lock.RUnlock()
-	return partitions, nil
-}
-
-func (bm *brokerManager) sendToPartition(topic string, partition int32, req requestEncoder, res decoder) error {
-	b, err := bm.getLeader(topic, partition)
-	if err != nil {
-		return err
-	}
-
-	err = b.SendAndReceive(bm.client.id, req, res)
-	switch err.(type) {
-	case EncodingError:
-		// encoding errors are our problem, not the broker's, so just return them
-		// rather than refreshing the broker metadata
-		return err
-	case nil:
-		return nil
-	default:
-		// broker error, so discard that broker
-		bm.terminateBroker(b.id)
-		return err
-	}
-}
-
-func (bm *brokerManager) getDefault() *broker {
-
-	if bm.defaultBroker == nil {
-		bm.lock.RLock()
-		defer bm.lock.RUnlock()
-		for id, _ := range bm.brokers {
-			bm.defaultBroker = bm.brokers[id]
-			break
-		}
-	}
-
-	return bm.defaultBroker
-}
-
-func (bm *brokerManager) sendToAny(req requestEncoder, res decoder) error {
-	for b := bm.getDefault(); b != nil; b = bm.getDefault() {
-		err := b.SendAndReceive(bm.client.id, req, res)
-		switch err.(type) {
-		case nil, EncodingError:
-			return err
-		default:
-			// broker error, so discard that broker
-			bm.defaultBroker = nil
-			bm.terminateBroker(b.id)
-		}
-	}
-	return OutOfBrokers{}
-}
-
-func (bm *brokerManager) refreshTopics(topics []*string) error {
-	response := new(metadataResponse)
-	err := bm.sendToAny(&metadataRequest{topics}, response)
-	if err != nil {
-		return err
-	}
-
-	bm.lock.Lock()
-	defer bm.lock.Unlock()
-
-	for i := range response.brokers {
-		broker := &response.brokers[i]
-		bm.brokers[broker.id] = broker
-	}
-
-	for i := range response.topics {
-		topic := &response.topics[i]
-		if topic.err != NO_ERROR {
-			return topic.err
-		}
-		bm.partitions[*topic.name] = make(map[int32]*partitionMetadata, len(topic.partitions))
-		for j := range topic.partitions {
-			partition := &topic.partitions[j]
-			if partition.err != NO_ERROR {
-				return partition.err
-			}
-			bm.partitions[*topic.name][partition.id] = partition
-		}
-	}
-
-	return nil
-}
-
-func (bm *brokerManager) refreshTopic(topic string) error {
-	tmp := make([]*string, 1)
-	tmp[0] = &topic
-	return bm.refreshTopics(tmp)
-}

+ 22 - 3
client.go

@@ -1,16 +1,35 @@
 package kafka
 
 type Client struct {
-	id      *string
-	brokers *brokerManager
+	id    *string
+	cache *metadataCache
 }
 
 func NewClient(id *string, host string, port int32) (client *Client, err error) {
 	client = new(Client)
 	client.id = id
-	client.brokers, err = newBrokerManager(client, host, port)
+	client.cache, err = newMetadataCache(client, host, port)
 	if err != nil {
 		return nil, err
 	}
 	return client, nil
 }
+
+func (client *Client) Leader(topic string, partition_id int32) (*broker, error) {
+	leader := client.cache.leader(topic, partition_id)
+
+	if leader == nil {
+		err := client.cache.refreshTopic(topic)
+		if err != nil {
+			return nil, err
+		}
+
+		leader = client.cache.leader(topic, partition_id)
+	}
+
+	if leader == nil {
+		return nil, UNKNOWN_TOPIC_OR_PARTITION
+	}
+
+	return leader, nil
+}

+ 122 - 0
metadata_cache.go

@@ -0,0 +1,122 @@
+package kafka
+
+import "sync"
+
+type metadataCache struct {
+	client  *Client
+	brokers map[int32]*broker          // maps broker ids to brokers
+	leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
+	lock    sync.RWMutex               // protects access to the maps, only one since they're always accessed together
+}
+
+func newMetadataCache(client *Client, host string, port int32) (*metadataCache, error) {
+	mc := new(metadataCache)
+
+	starter, err := newBroker(host, port)
+	if err != nil {
+		return nil, err
+	}
+
+	mc.client = client
+	mc.brokers = make(map[int32]*broker)
+	mc.leaders = make(map[string]map[int32]int32)
+
+	mc.brokers[starter.id] = starter
+
+	// do an initial fetch of all cluster metadata by specifing an empty list of topics
+	err = mc.refreshTopics(make([]*string, 0))
+	if err != nil {
+		return nil, err
+	}
+
+	return mc, nil
+}
+
+func (mc *metadataCache) leader(topic string, partition_id int32) *broker {
+	mc.lock.RLock()
+	defer mc.lock.RUnlock()
+
+	partitions := mc.leaders[topic]
+	if partitions != nil {
+		leader := partitions[partition_id]
+		if leader == -1 {
+			return nil
+		} else {
+			return mc.brokers[leader]
+		}
+	}
+
+	return nil
+}
+
+func (mc *metadataCache) any() *broker {
+	mc.lock.RLock()
+	defer mc.lock.RUnlock()
+
+	for _, broker := range mc.brokers {
+		return broker
+	}
+
+	return nil
+}
+
+func (mc *metadataCache) partitions(topic string) ([]int32, error) {
+	mc.lock.RLock()
+	defer mc.lock.RUnlock()
+
+	partitions := mc.leaders[topic]
+	if partitions == nil {
+		return nil, UNKNOWN_TOPIC_OR_PARTITION
+	}
+
+	ret := make([]int32, len(partitions))
+	for id, _ := range partitions {
+		ret = append(ret, id)
+	}
+
+	return ret, nil
+}
+
+func (mc *metadataCache) refreshTopics(topics []*string) error {
+	broker := mc.any()
+	if broker == nil {
+		return OutOfBrokers{}
+	}
+
+	response := new(metadataResponse)
+	err := broker.SendAndReceive(mc.client.id, &metadataRequest{topics}, response)
+	if err != nil {
+		return err
+	}
+
+	mc.lock.Lock()
+	defer mc.lock.Unlock()
+
+	for i := range response.brokers {
+		broker := &response.brokers[i]
+		mc.brokers[broker.id] = broker
+	}
+
+	for i := range response.topics {
+		topic := &response.topics[i]
+		if topic.err != NO_ERROR {
+			return topic.err
+		}
+		mc.leaders[*topic.name] = make(map[int32]int32, len(topic.partitions))
+		for j := range topic.partitions {
+			partition := &topic.partitions[j]
+			if partition.err != NO_ERROR {
+				return partition.err
+			}
+			mc.leaders[*topic.name][partition.id] = partition.leader
+		}
+	}
+
+	return nil
+}
+
+func (mc *metadataCache) refreshTopic(topic string) error {
+	tmp := make([]*string, 1)
+	tmp[0] = &topic
+	return mc.refreshTopics(tmp)
+}

+ 8 - 3
producer.go

@@ -1,7 +1,7 @@
 package kafka
 
 type Producer struct {
-	client            *Client
+	*Client
 	topic             string
 	partitioner       PartitionChooser
 	responseCondition int16
@@ -17,7 +17,7 @@ func NewSimpleProducer(client *Client, topic string) *Producer {
 }
 
 func (p *Producer) SendMessage(key, value encoder) (*ProduceResponse, error) {
-	partitions, err := p.client.brokers.partitionsForTopic(p.topic)
+	partitions, err := p.cache.partitions(p.topic)
 	if err != nil {
 		return nil, err
 	}
@@ -35,6 +35,11 @@ func (p *Producer) SendMessage(key, value encoder) (*ProduceResponse, error) {
 		return nil, err
 	}
 
+	broker, err := p.Leader(p.topic, partition)
+	if err != nil {
+		return nil, err
+	}
+
 	request := newSingletonProduceRequest(p.topic, partition, newSingletonMessageSet(msg))
 	request.requiredAcks = p.responseCondition
 	request.timeout = p.responseTimeout
@@ -43,7 +48,7 @@ func (p *Producer) SendMessage(key, value encoder) (*ProduceResponse, error) {
 	if request.expectResponse() {
 		response = new(ProduceResponse)
 	}
-	err = p.client.brokers.sendToPartition(p.topic, partition, request, response)
+	err = broker.SendAndReceive(p.id, request, response)
 
 	return response, err
 }