|
|
@@ -34,7 +34,7 @@ type Client struct {
|
|
|
// NewClient creates a new Client with the given client ID. It connects to one of the given broker addresses
|
|
|
// and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
|
|
|
// be retrieved from any of the given broker addresses, the client is not created.
|
|
|
-func NewClient(id string, addrs []string, config *ClientConfig) (client *Client, err error) {
|
|
|
+func NewClient(id string, addrs []string, config *ClientConfig) (*Client, error) {
|
|
|
if config == nil {
|
|
|
config = new(ClientConfig)
|
|
|
}
|
|
|
@@ -47,18 +47,18 @@ func NewClient(id string, addrs []string, config *ClientConfig) (client *Client,
|
|
|
return nil, ConfigurationError("You must provide at least one broker address")
|
|
|
}
|
|
|
|
|
|
- client = new(Client)
|
|
|
- client.id = id
|
|
|
- client.config = *config
|
|
|
- client.extraBrokerAddrs = addrs
|
|
|
- client.extraBroker = NewBroker(client.extraBrokerAddrs[0])
|
|
|
+ client := &Client{
|
|
|
+ id: id,
|
|
|
+ config: *config,
|
|
|
+ extraBrokerAddrs: addrs,
|
|
|
+ extraBroker: NewBroker(addrs[0]),
|
|
|
+ brokers: make(map[int32]*Broker),
|
|
|
+ leaders: make(map[string]map[int32]int32),
|
|
|
+ }
|
|
|
client.extraBroker.Open()
|
|
|
|
|
|
- client.brokers = make(map[int32]*Broker)
|
|
|
- client.leaders = make(map[string]map[int32]int32)
|
|
|
-
|
|
|
// do an initial fetch of all cluster metadata by specifing an empty list of topics
|
|
|
- err = client.RefreshAllMetadata()
|
|
|
+ err := client.RefreshAllMetadata()
|
|
|
if err != nil {
|
|
|
client.Close() // this closes tmp, since it's still in the brokers hash
|
|
|
return nil, err
|
|
|
@@ -112,7 +112,7 @@ func (client *Client) Topics() ([]string, error) {
|
|
|
defer client.lock.RUnlock()
|
|
|
|
|
|
ret := make([]string, 0, len(client.leaders))
|
|
|
- for topic, _ := range client.leaders {
|
|
|
+ for topic := range client.leaders {
|
|
|
ret = append(ret, topic)
|
|
|
}
|
|
|
|
|
|
@@ -121,19 +121,19 @@ func (client *Client) Topics() ([]string, error) {
|
|
|
|
|
|
// Leader returns the broker object that is the leader of the current topic/partition, as
|
|
|
// determined by querying the cluster metadata.
|
|
|
-func (client *Client) Leader(topic string, partition_id int32) (*Broker, error) {
|
|
|
- leader := client.cachedLeader(topic, partition_id)
|
|
|
+func (client *Client) Leader(topic string, partitionID int32) (*Broker, error) {
|
|
|
+ leader := client.cachedLeader(topic, partitionID)
|
|
|
|
|
|
if leader == nil {
|
|
|
err := client.RefreshTopicMetadata(topic)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- leader = client.cachedLeader(topic, partition_id)
|
|
|
+ leader = client.cachedLeader(topic, partitionID)
|
|
|
}
|
|
|
|
|
|
if leader == nil {
|
|
|
- return nil, UNKNOWN_TOPIC_OR_PARTITION
|
|
|
+ return nil, UnknownTopicOrPartition
|
|
|
}
|
|
|
|
|
|
return leader, nil
|
|
|
@@ -193,7 +193,7 @@ func (client *Client) refreshMetadata(topics []string, retries int) error {
|
|
|
return nil
|
|
|
default:
|
|
|
if retries <= 0 {
|
|
|
- return LEADER_NOT_AVAILABLE
|
|
|
+ return LeaderNotAvailable
|
|
|
}
|
|
|
time.Sleep(client.config.WaitForElection) // wait for leader election
|
|
|
return client.refreshMetadata(retry, retries-1)
|
|
|
@@ -221,13 +221,13 @@ func (client *Client) any() *Broker {
|
|
|
return client.extraBroker
|
|
|
}
|
|
|
|
|
|
-func (client *Client) cachedLeader(topic string, partition_id int32) *Broker {
|
|
|
+func (client *Client) cachedLeader(topic string, partitionID int32) *Broker {
|
|
|
client.lock.RLock()
|
|
|
defer client.lock.RUnlock()
|
|
|
|
|
|
partitions := client.leaders[topic]
|
|
|
if partitions != nil {
|
|
|
- leader, ok := partitions[partition_id]
|
|
|
+ leader, ok := partitions[partitionID]
|
|
|
if ok {
|
|
|
return client.brokers[leader]
|
|
|
}
|
|
|
@@ -246,7 +246,7 @@ func (client *Client) cachedPartitions(topic string) []int32 {
|
|
|
}
|
|
|
|
|
|
ret := make([]int32, 0, len(partitions))
|
|
|
- for id, _ := range partitions {
|
|
|
+ for id := range partitions {
|
|
|
ret = append(ret, id)
|
|
|
}
|
|
|
|
|
|
@@ -254,7 +254,7 @@ func (client *Client) cachedPartitions(topic string) []int32 {
|
|
|
return ret
|
|
|
}
|
|
|
|
|
|
-// if no fatal error, returns a list of topics that need retrying due to LEADER_NOT_AVAILABLE
|
|
|
+// if no fatal error, returns a list of topics that need retrying due to LeaderNotAvailable
|
|
|
func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
|
client.lock.Lock()
|
|
|
defer client.lock.Unlock()
|
|
|
@@ -281,9 +281,9 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
|
|
|
|
for _, topic := range data.Topics {
|
|
|
switch topic.Err {
|
|
|
- case NO_ERROR:
|
|
|
+ case NoError:
|
|
|
break
|
|
|
- case LEADER_NOT_AVAILABLE:
|
|
|
+ case LeaderNotAvailable:
|
|
|
toRetry[topic.Name] = true
|
|
|
default:
|
|
|
return nil, topic.Err
|
|
|
@@ -291,11 +291,11 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
|
client.leaders[topic.Name] = make(map[int32]int32, len(topic.Partitions))
|
|
|
for _, partition := range topic.Partitions {
|
|
|
switch partition.Err {
|
|
|
- case LEADER_NOT_AVAILABLE:
|
|
|
+ case LeaderNotAvailable:
|
|
|
toRetry[topic.Name] = true
|
|
|
- delete(client.leaders[topic.Name], partition.Id)
|
|
|
- case NO_ERROR:
|
|
|
- client.leaders[topic.Name][partition.Id] = partition.Leader
|
|
|
+ delete(client.leaders[topic.Name], partition.ID)
|
|
|
+ case NoError:
|
|
|
+ client.leaders[topic.Name][partition.ID] = partition.Leader
|
|
|
default:
|
|
|
return nil, partition.Err
|
|
|
}
|
|
|
@@ -303,7 +303,7 @@ func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
|
}
|
|
|
|
|
|
ret := make([]string, 0, len(toRetry))
|
|
|
- for topic, _ := range toRetry {
|
|
|
+ for topic := range toRetry {
|
|
|
ret = append(ret, topic)
|
|
|
}
|
|
|
return ret, nil
|