|
|
@@ -5,11 +5,7 @@ It is built on sister package sarama/protocol.
|
|
|
*/
|
|
|
package kafka
|
|
|
|
|
|
-import k "sarama/protocol"
|
|
|
-
|
|
|
import (
|
|
|
- "sarama/encoding"
|
|
|
- "sarama/types"
|
|
|
"sort"
|
|
|
"sync"
|
|
|
"time"
|
|
|
@@ -21,7 +17,7 @@ import (
|
|
|
// multiple concurrent Producers and Consumers.
|
|
|
type Client struct {
|
|
|
id string // client id for broker requests
|
|
|
- brokers map[int32]*k.Broker // maps broker ids to brokers
|
|
|
+ brokers map[int32]*Broker // maps broker ids to brokers
|
|
|
leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
|
|
|
lock sync.RWMutex // protects access to the maps, only one since they're always written together
|
|
|
}
|
|
|
@@ -30,7 +26,7 @@ type Client struct {
|
|
|
// host:port address, and uses that broker to automatically fetch metadata on the rest of the kafka cluster.
|
|
|
// If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
|
|
|
func NewClient(id string, host string, port int32) (client *Client, err error) {
|
|
|
- tmp := k.NewBroker(host, port)
|
|
|
+ tmp := NewBroker(host, port)
|
|
|
err = tmp.Connect()
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
@@ -39,7 +35,7 @@ func NewClient(id string, host string, port int32) (client *Client, err error) {
|
|
|
client = new(Client)
|
|
|
client.id = id
|
|
|
|
|
|
- client.brokers = make(map[int32]*k.Broker)
|
|
|
+ client.brokers = make(map[int32]*Broker)
|
|
|
client.leaders = make(map[string]map[int32]int32)
|
|
|
|
|
|
// add it to the set so that refreshTopics can find it
|
|
|
@@ -81,7 +77,7 @@ func (client *Client) Close() {
|
|
|
// functions for use by producers and consumers
|
|
|
// if Go had the concept they would be marked 'protected'
|
|
|
|
|
|
-func (client *Client) leader(topic string, partition_id int32) (*k.Broker, error) {
|
|
|
+func (client *Client) leader(topic string, partition_id int32) (*Broker, error) {
|
|
|
leader := client.cachedLeader(topic, partition_id)
|
|
|
|
|
|
if leader == nil {
|
|
|
@@ -93,7 +89,7 @@ func (client *Client) leader(topic string, partition_id int32) (*k.Broker, error
|
|
|
}
|
|
|
|
|
|
if leader == nil {
|
|
|
- return nil, types.UNKNOWN_TOPIC_OR_PARTITION
|
|
|
+ return nil, UNKNOWN_TOPIC_OR_PARTITION
|
|
|
}
|
|
|
|
|
|
return leader, nil
|
|
|
@@ -117,7 +113,7 @@ func (client *Client) partitions(topic string) ([]int32, error) {
|
|
|
return partitions, nil
|
|
|
}
|
|
|
|
|
|
-func (client *Client) disconnectBroker(broker *k.Broker) {
|
|
|
+func (client *Client) disconnectBroker(broker *Broker) {
|
|
|
client.lock.Lock()
|
|
|
defer client.lock.Unlock()
|
|
|
|
|
|
@@ -138,7 +134,7 @@ func (client *Client) refreshTopic(topic string) error {
|
|
|
|
|
|
func (client *Client) refreshTopics(topics []string, retries int) error {
|
|
|
for broker := client.any(); broker != nil; broker = client.any() {
|
|
|
- response, err := broker.GetMetadata(client.id, &k.MetadataRequest{Topics: topics})
|
|
|
+ response, err := broker.GetMetadata(client.id, &MetadataRequest{Topics: topics})
|
|
|
|
|
|
switch err {
|
|
|
case nil:
|
|
|
@@ -151,12 +147,12 @@ func (client *Client) refreshTopics(topics []string, retries int) error {
|
|
|
return nil
|
|
|
default:
|
|
|
if retries <= 0 {
|
|
|
- return types.LEADER_NOT_AVAILABLE
|
|
|
+ return LEADER_NOT_AVAILABLE
|
|
|
}
|
|
|
time.Sleep(250 * time.Millisecond) // wait for leader election
|
|
|
return client.refreshTopics(retry, retries-1)
|
|
|
}
|
|
|
- case encoding.EncodingError:
|
|
|
+ case EncodingError:
|
|
|
// didn't even send, return the error
|
|
|
return err
|
|
|
}
|
|
|
@@ -168,7 +164,7 @@ func (client *Client) refreshTopics(topics []string, retries int) error {
|
|
|
return OutOfBrokers
|
|
|
}
|
|
|
|
|
|
-func (client *Client) any() *k.Broker {
|
|
|
+func (client *Client) any() *Broker {
|
|
|
client.lock.RLock()
|
|
|
defer client.lock.RUnlock()
|
|
|
|
|
|
@@ -179,7 +175,7 @@ func (client *Client) any() *k.Broker {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (client *Client) cachedLeader(topic string, partition_id int32) *k.Broker {
|
|
|
+func (client *Client) cachedLeader(topic string, partition_id int32) *Broker {
|
|
|
client.lock.RLock()
|
|
|
defer client.lock.RUnlock()
|
|
|
|
|
|
@@ -213,11 +209,11 @@ func (client *Client) cachedPartitions(topic string) []int32 {
|
|
|
}
|
|
|
|
|
|
// if no fatal error, returns a list of topics that need retrying due to LEADER_NOT_AVAILABLE
|
|
|
-func (client *Client) update(data *k.MetadataResponse) ([]string, error) {
|
|
|
+func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
|
// First discard brokers that we already know about. This avoids bouncing TCP connections,
|
|
|
// and especially avoids closing valid connections out from under other code which may be trying
|
|
|
// to use them. We only need a read-lock for this.
|
|
|
- var newBrokers []*k.Broker
|
|
|
+ var newBrokers []*Broker
|
|
|
client.lock.RLock()
|
|
|
for _, broker := range data.Brokers {
|
|
|
if !broker.Equals(client.brokers[broker.ID()]) {
|
|
|
@@ -249,9 +245,9 @@ func (client *Client) update(data *k.MetadataResponse) ([]string, error) {
|
|
|
|
|
|
for _, topic := range data.Topics {
|
|
|
switch topic.Err {
|
|
|
- case types.NO_ERROR:
|
|
|
+ case NO_ERROR:
|
|
|
break
|
|
|
- case types.LEADER_NOT_AVAILABLE:
|
|
|
+ case LEADER_NOT_AVAILABLE:
|
|
|
toRetry[topic.Name] = true
|
|
|
default:
|
|
|
return nil, topic.Err
|
|
|
@@ -259,13 +255,13 @@ func (client *Client) update(data *k.MetadataResponse) ([]string, error) {
|
|
|
client.leaders[topic.Name] = make(map[int32]int32, len(topic.Partitions))
|
|
|
for _, partition := range topic.Partitions {
|
|
|
switch partition.Err {
|
|
|
- case types.LEADER_NOT_AVAILABLE:
|
|
|
+ case LEADER_NOT_AVAILABLE:
|
|
|
// in the LEADER_NOT_AVAILABLE case partition.Leader will be -1 because the
|
|
|
// partition is in the middle of leader election, so we fallthrough to save it
|
|
|
// anyways in order to avoid returning the stale leader (since -1 isn't a valid broker ID)
|
|
|
toRetry[topic.Name] = true
|
|
|
fallthrough
|
|
|
- case types.NO_ERROR:
|
|
|
+ case NO_ERROR:
|
|
|
client.leaders[topic.Name][partition.Id] = partition.Leader
|
|
|
default:
|
|
|
return nil, partition.Err
|