|
|
@@ -13,10 +13,10 @@ import (
|
|
|
// automatically when it passes out of scope. A single client can be safely shared by
|
|
|
// multiple concurrent Producers and Consumers.
|
|
|
type Client struct {
|
|
|
- id string // client id for broker requests
|
|
|
- brokers map[int32]*k.Broker // maps broker ids to brokers
|
|
|
- leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
|
|
|
- sync.RWMutex // protects access to the maps, only one since they're always written together
|
|
|
+ id string // client id for broker requests
|
|
|
+ brokers map[int32]*k.Broker // maps broker ids to brokers
|
|
|
+ leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
|
|
|
+ lock sync.RWMutex // protects access to the maps, only one since they're always written together
|
|
|
}
|
|
|
|
|
|
// NewClient creates a new Client with the given client ID. It connects to the broker at the given
|
|
|
@@ -59,8 +59,8 @@ func NewClient(id string, host string, port int32) (client *Client, err error) {
|
|
|
// a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
|
|
|
// using a client before you close the client.
|
|
|
func (client *Client) Close() {
|
|
|
- client.Lock()
|
|
|
- defer client.Unlock()
|
|
|
+ client.lock.Lock()
|
|
|
+ defer client.lock.Unlock()
|
|
|
|
|
|
for _, broker := range client.brokers {
|
|
|
go broker.Close()
|
|
|
@@ -109,8 +109,8 @@ func (client *Client) partitions(topic string) ([]int32, error) {
|
|
|
}
|
|
|
|
|
|
func (client *Client) disconnectBroker(broker *k.Broker) {
|
|
|
- client.Lock()
|
|
|
- defer client.Unlock()
|
|
|
+ client.lock.Lock()
|
|
|
+ defer client.lock.Unlock()
|
|
|
|
|
|
// we don't need to update the leaders hash, it will automatically get refreshed next time because
|
|
|
// the broker lookup will return nil
|
|
|
@@ -160,8 +160,8 @@ func (client *Client) refreshTopics(topics []string, retries int) error {
|
|
|
}
|
|
|
|
|
|
func (client *Client) any() *k.Broker {
|
|
|
- client.RLock()
|
|
|
- defer client.RUnlock()
|
|
|
+ client.lock.RLock()
|
|
|
+ defer client.lock.RUnlock()
|
|
|
|
|
|
for _, broker := range client.brokers {
|
|
|
return broker
|
|
|
@@ -171,8 +171,8 @@ func (client *Client) any() *k.Broker {
|
|
|
}
|
|
|
|
|
|
func (client *Client) cachedLeader(topic string, partition_id int32) *k.Broker {
|
|
|
- client.RLock()
|
|
|
- defer client.RUnlock()
|
|
|
+ client.lock.RLock()
|
|
|
+ defer client.lock.RUnlock()
|
|
|
|
|
|
partitions := client.leaders[topic]
|
|
|
if partitions != nil {
|
|
|
@@ -186,8 +186,8 @@ func (client *Client) cachedLeader(topic string, partition_id int32) *k.Broker {
|
|
|
}
|
|
|
|
|
|
func (client *Client) cachedPartitions(topic string) []int32 {
|
|
|
- client.RLock()
|
|
|
- defer client.RUnlock()
|
|
|
+ client.lock.RLock()
|
|
|
+ defer client.lock.RUnlock()
|
|
|
|
|
|
partitions := client.leaders[topic]
|
|
|
if partitions == nil {
|
|
|
@@ -209,13 +209,13 @@ func (client *Client) update(data *k.MetadataResponse) ([]string, error) {
|
|
|
// and especially avoids closing valid connections out from under other code which may be trying
|
|
|
// to use them. We only need a read-lock for this.
|
|
|
var newBrokers []*k.Broker
|
|
|
- client.RLock()
|
|
|
+ client.lock.RLock()
|
|
|
for _, broker := range data.Brokers {
|
|
|
if !broker.Equals(client.brokers[broker.ID()]) {
|
|
|
newBrokers = append(newBrokers, broker)
|
|
|
}
|
|
|
}
|
|
|
- client.RUnlock()
|
|
|
+ client.lock.RUnlock()
|
|
|
|
|
|
// connect to the brokers before taking the write lock, as this can take a while
|
|
|
// to timeout if one of them isn't reachable
|
|
|
@@ -226,8 +226,8 @@ func (client *Client) update(data *k.MetadataResponse) ([]string, error) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- client.Lock()
|
|
|
- defer client.Unlock()
|
|
|
+ client.lock.Lock()
|
|
|
+ defer client.lock.Unlock()
|
|
|
|
|
|
for _, broker := range newBrokers {
|
|
|
if client.brokers[broker.ID()] != nil {
|