|
|
@@ -2,18 +2,13 @@ package kafka
|
|
|
|
|
|
import "sync"
|
|
|
|
|
|
-type topicPartition struct {
|
|
|
- topic string
|
|
|
- partition int32
|
|
|
-}
|
|
|
-
|
|
|
type brokerManager struct {
|
|
|
client *Client
|
|
|
defaultBroker *broker
|
|
|
|
|
|
- brokers map[int32]*broker
|
|
|
- leaders map[topicPartition]int32
|
|
|
- brokersLock sync.RWMutex
|
|
|
+ brokers map[int32]*broker // maps broker ids to brokers
|
|
|
+ partitions map[string]map[int32]*partitionMetadata // maps topics to partition ids to partitions
|
|
|
+ lock sync.RWMutex // protects access to the maps, only one since they're always accessed together
|
|
|
}
|
|
|
|
|
|
func newBrokerManager(client *Client, host string, port int32) (bm *brokerManager, err error) {
|
|
|
@@ -29,7 +24,7 @@ func newBrokerManager(client *Client, host string, port int32) (bm *brokerManage
|
|
|
}
|
|
|
|
|
|
bm.brokers = make(map[int32]*broker)
|
|
|
- bm.leaders = make(map[topicPartition]int32)
|
|
|
+ bm.partitions = make(map[string]map[int32]*partitionMetadata)
|
|
|
|
|
|
// do an initial fetch of all cluster metadata by specifing an empty list of topics
|
|
|
err = bm.refreshTopics(make([]*string, 0))
|
|
|
@@ -40,34 +35,51 @@ func newBrokerManager(client *Client, host string, port int32) (bm *brokerManage
|
|
|
return bm, nil
|
|
|
}
|
|
|
|
|
|
-func (bm *brokerManager) getLeader(topic string, partition int32) (*broker, error) {
|
|
|
- var broker *broker = nil
|
|
|
- bm.brokersLock.RLock()
|
|
|
- id, ok := bm.leaders[topicPartition{topic, partition}]
|
|
|
- if ok {
|
|
|
- broker = bm.brokers[id]
|
|
|
+func (bm *brokerManager) terminateBroker(id int32) {
|
|
|
+ bm.lock.Lock()
|
|
|
+ delete(bm.brokers, id)
|
|
|
+ bm.lock.Unlock()
|
|
|
+}
|
|
|
+
|
|
|
+func (bm *brokerManager) getLeader(topic string, partition_id int32) *broker {
|
|
|
+ var leader *broker = nil
|
|
|
+
|
|
|
+ bm.lock.RLock()
|
|
|
+ defer bm.lock.RUnlock()
|
|
|
+
|
|
|
+ id_map := bm.partitions[topic]
|
|
|
+ if id_map != nil {
|
|
|
+ partition := id_map[partition_id]
|
|
|
+ if partition != nil {
|
|
|
+ leader = bm.brokers[partition.leader]
|
|
|
+ }
|
|
|
}
|
|
|
- bm.brokersLock.RUnlock()
|
|
|
|
|
|
- if broker == nil {
|
|
|
+ return leader
|
|
|
+}
|
|
|
+
|
|
|
+func (bm *brokerManager) getValidLeader(topic string, partition_id int32) (*broker, error) {
|
|
|
+
|
|
|
+ leader := bm.getLeader(topic, partition_id)
|
|
|
+
|
|
|
+ if leader == nil {
|
|
|
err := bm.refreshTopic(topic)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- bm.brokersLock.RLock()
|
|
|
- broker = bm.brokers[bm.leaders[topicPartition{topic, partition}]]
|
|
|
- bm.brokersLock.RUnlock()
|
|
|
+
|
|
|
+ leader = bm.getLeader(topic, partition_id)
|
|
|
}
|
|
|
|
|
|
- if broker == nil {
|
|
|
+ if leader == nil {
|
|
|
return nil, UNKNOWN_TOPIC_OR_PARTITION
|
|
|
}
|
|
|
|
|
|
- return broker, nil
|
|
|
+ return leader, nil
|
|
|
}
|
|
|
|
|
|
func (bm *brokerManager) tryLeader(topic string, partition int32, req encoder, res decoder) error {
|
|
|
- b, err := bm.getLeader(topic, partition)
|
|
|
+ b, err := bm.getValidLeader(topic, partition)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -88,19 +100,18 @@ func (bm *brokerManager) tryLeader(topic string, partition int32, req encoder, r
|
|
|
|
|
|
if err == nil {
|
|
|
// successfully received and decoded the packet, we're done
|
|
|
+ // (the actual decoded data is stored in `res decoder`)
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
// we got an error, so discard that broker
|
|
|
- bm.brokersLock.Lock()
|
|
|
- delete(bm.brokers, b.id)
|
|
|
- bm.brokersLock.Unlock()
|
|
|
+ bm.terminateBroker(b.id)
|
|
|
|
|
|
// then do the whole thing again
|
|
|
- // (the metadata for the broker gets refreshed automatically in getLeader)
|
|
|
+ // (the metadata for the broker gets refreshed automatically in getValidLeader)
|
|
|
// if we get a broker here, it's guaranteed to be fresh, so if it fails then
|
|
|
// we pass that error back to the user (as opposed to retrying indefinitely)
|
|
|
- b, err = bm.getLeader(topic, partition)
|
|
|
+ b, err = bm.getValidLeader(topic, partition)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -114,19 +125,19 @@ func (bm *brokerManager) tryLeader(topic string, partition int32, req encoder, r
|
|
|
case buf := <-responseChan.packets:
|
|
|
decoder := realDecoder{raw: buf}
|
|
|
err = res.decode(&decoder)
|
|
|
- return err
|
|
|
case err = <-responseChan.errors:
|
|
|
- return err
|
|
|
}
|
|
|
|
|
|
+ return err // will be nil if this broker worked
|
|
|
+
|
|
|
}
|
|
|
|
|
|
func (bm *brokerManager) getDefault() *broker {
|
|
|
|
|
|
if bm.defaultBroker == nil {
|
|
|
- bm.brokersLock.RLock()
|
|
|
- defer bm.brokersLock.RUnlock()
|
|
|
- for _, id := range bm.leaders {
|
|
|
+ bm.lock.RLock()
|
|
|
+ defer bm.lock.RUnlock()
|
|
|
+ for id, _ := range bm.brokers {
|
|
|
bm.defaultBroker = bm.brokers[id]
|
|
|
break
|
|
|
}
|
|
|
@@ -149,9 +160,7 @@ func (bm *brokerManager) tryDefaultBrokers(req encoder, res decoder) error {
|
|
|
return err
|
|
|
case <-responseChan.errors:
|
|
|
bm.defaultBroker = nil
|
|
|
- bm.brokersLock.Lock()
|
|
|
- delete(bm.brokers, b.id)
|
|
|
- bm.brokersLock.Unlock()
|
|
|
+ bm.terminateBroker(b.id)
|
|
|
}
|
|
|
}
|
|
|
return OutOfBrokers{}
|
|
|
@@ -164,8 +173,8 @@ func (bm *brokerManager) refreshTopics(topics []*string) error {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- bm.brokersLock.Lock()
|
|
|
- defer bm.brokersLock.Unlock()
|
|
|
+ bm.lock.Lock()
|
|
|
+ defer bm.lock.Unlock()
|
|
|
|
|
|
for i := range response.brokers {
|
|
|
broker := &response.brokers[i]
|
|
|
@@ -182,7 +191,11 @@ func (bm *brokerManager) refreshTopics(topics []*string) error {
|
|
|
if partition.err != NO_ERROR {
|
|
|
return partition.err
|
|
|
}
|
|
|
- bm.leaders[topicPartition{*topic.name, partition.id}] = partition.leader
|
|
|
+ id_map := bm.partitions[*topic.name]
|
|
|
+ if id_map == nil {
|
|
|
+ bm.partitions[*topic.name] = make(map[int32]*partitionMetadata)
|
|
|
+ }
|
|
|
+ bm.partitions[*topic.name][partition.id] = partition
|
|
|
}
|
|
|
}
|
|
|
|