|
|
@@ -15,7 +15,7 @@ type Client struct {
|
|
|
id string // client id for broker requests
|
|
|
brokers map[int32]*k.Broker // maps broker ids to brokers
|
|
|
leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
|
|
|
- lock sync.RWMutex // protects access to the maps, only one since they're always accessed together
|
|
|
+ lock sync.RWMutex // protects access to the maps, only one since they're always written together
|
|
|
}
|
|
|
|
|
|
// NewClient creates a new Client with the given client ID. It connects to the broker at the given
|
|
|
@@ -34,8 +34,10 @@ func NewClient(id string, host string, port int32) (client *Client, err error) {
|
|
|
client.brokers = make(map[int32]*k.Broker)
|
|
|
client.leaders = make(map[string]map[int32]int32)
|
|
|
|
|
|
- // add it temporarily with an invalid ID so that refreshTopics can find it
|
|
|
- client.brokers[-1] = tmp
|
|
|
+ // add it temporarily so that refreshTopics can find it
|
|
|
+ // brokers created through NewBroker() have an ID of -1, which won't conflict with
|
|
|
+ // whatever the metadata request returns
|
|
|
+ client.brokers[tmp.ID()] = tmp
|
|
|
|
|
|
// do an initial fetch of all cluster metadata by specifing an empty list of topics
|
|
|
err = client.refreshTopics(make([]string, 0))
|
|
|
@@ -47,8 +49,7 @@ func NewClient(id string, host string, port int32) (client *Client, err error) {
|
|
|
// now remove our tmp broker - the successful metadata request will have returned it
|
|
|
// with a valid ID, so it will already be in the hash somewhere else and we don't need
|
|
|
// the incomplete tmp one anymore
|
|
|
- go client.brokers[-1].Close()
|
|
|
- delete(client.brokers, -1)
|
|
|
+ client.disconnectBroker(tmp)
|
|
|
|
|
|
return client, nil
|
|
|
}
|
|
|
@@ -67,6 +68,9 @@ func (client *Client) Close() {
|
|
|
client.leaders = nil
|
|
|
}
|
|
|
|
|
|
+// functions for use by producers and consumers
|
|
|
+// if Go had the concept they would be protected instead of private
|
|
|
+
|
|
|
func (client *Client) leader(topic string, partition_id int32) (*k.Broker, error) {
|
|
|
leader := client.cachedLeader(topic, partition_id)
|
|
|
|
|
|
@@ -105,23 +109,44 @@ func (client *Client) partitions(topic string) ([]int32, error) {
|
|
|
return partitions, nil
|
|
|
}
|
|
|
|
|
|
-func (client *Client) cachedLeader(topic string, partition_id int32) *k.Broker {
|
|
|
- client.lock.RLock()
|
|
|
- defer client.lock.RUnlock()
|
|
|
+func (client *Client) disconnectBroker(broker *k.Broker) {
|
|
|
+ client.lock.Lock()
|
|
|
+ defer client.lock.Unlock()
|
|
|
|
|
|
- partitions := client.leaders[topic]
|
|
|
- if partitions != nil {
|
|
|
- leader := partitions[partition_id]
|
|
|
- if leader == -1 {
|
|
|
- return nil
|
|
|
- } else {
|
|
|
- return client.brokers[leader]
|
|
|
+ // we don't need to update the leaders hash, it will automatically get refreshed next time because
|
|
|
+ // the broker lookup will return nil
|
|
|
+ delete(client.brokers, broker.ID())
|
|
|
+ go broker.Close()
|
|
|
+}
|
|
|
+
|
|
|
+func (client *Client) refreshTopics(topics []string) error {
|
|
|
+ for broker := client.any(); broker != nil; broker = client.any() {
|
|
|
+ response, err := broker.GetMetadata(client.id, &k.MetadataRequest{Topics: topics})
|
|
|
+
|
|
|
+ switch err.(type) {
|
|
|
+ case nil:
|
|
|
+ // valid response, use it
|
|
|
+ return client.update(response)
|
|
|
+ case k.EncodingError:
|
|
|
+ // didn't even send, return the error
|
|
|
+ return err
|
|
|
}
|
|
|
+
|
|
|
+ // some other error, remove that broker and try again
|
|
|
+ client.disconnectBroker(broker)
|
|
|
}
|
|
|
|
|
|
- return nil
|
|
|
+ return OutOfBrokers
|
|
|
}
|
|
|
|
|
|
+func (client *Client) refreshTopic(topic string) error {
|
|
|
+ tmp := make([]string, 1)
|
|
|
+ tmp[0] = topic
|
|
|
+ return client.refreshTopics(tmp)
|
|
|
+}
|
|
|
+
|
|
|
+// truly private helper functions
|
|
|
+
|
|
|
func (client *Client) any() *k.Broker {
|
|
|
client.lock.RLock()
|
|
|
defer client.lock.RUnlock()
|
|
|
@@ -133,6 +158,23 @@ func (client *Client) any() *k.Broker {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+func (client *Client) cachedLeader(topic string, partition_id int32) *k.Broker {
|
|
|
+ client.lock.RLock()
|
|
|
+ defer client.lock.RUnlock()
|
|
|
+
|
|
|
+ partitions := client.leaders[topic]
|
|
|
+ if partitions != nil {
|
|
|
+ leader := partitions[partition_id]
|
|
|
+ if leader == -1 {
|
|
|
+ return nil
|
|
|
+ } else {
|
|
|
+ return client.brokers[leader]
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
func (client *Client) cachedPartitions(topic string) []int32 {
|
|
|
client.lock.RLock()
|
|
|
defer client.lock.RUnlock()
|
|
|
@@ -198,32 +240,3 @@ func (client *Client) update(data *k.MetadataResponse) error {
|
|
|
|
|
|
return nil
|
|
|
}
|
|
|
-
|
|
|
-func (client *Client) refreshTopics(topics []string) error {
|
|
|
- for broker := client.any(); broker != nil; broker = client.any() {
|
|
|
- response, err := broker.GetMetadata(client.id, &k.MetadataRequest{Topics: topics})
|
|
|
-
|
|
|
- switch err.(type) {
|
|
|
- case nil:
|
|
|
- // valid response, use it
|
|
|
- return client.update(response)
|
|
|
- case k.EncodingError:
|
|
|
- // didn't even send, return the error
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
- // some other error, remove that broker and try again
|
|
|
- client.lock.Lock()
|
|
|
- delete(client.brokers, broker.ID())
|
|
|
- go broker.Close()
|
|
|
- client.lock.Unlock()
|
|
|
- }
|
|
|
-
|
|
|
- return OutOfBrokers
|
|
|
-}
|
|
|
-
|
|
|
-func (client *Client) refreshTopic(topic string) error {
|
|
|
- tmp := make([]string, 1)
|
|
|
- tmp[0] = topic
|
|
|
- return client.refreshTopics(tmp)
|
|
|
-}
|