|
|
@@ -6,12 +6,19 @@ import (
|
|
|
"time"
|
|
|
)
|
|
|
|
|
|
+// ClientConfig is used to pass multiple configuration options to NewClient.
|
|
|
+type ClientConfig struct {
|
|
|
+ MetadataRetries int // How many times to retry a metadata request when a partition is in the middle of leader election.
|
|
|
+ WaitForElection time.Duration // How long to wait for leader election to finish between retries.
|
|
|
+}
|
|
|
+
|
|
|
// Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
|
|
|
// You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
|
|
|
// automatically when it passes out of scope. A single client can be safely shared by
|
|
|
// multiple concurrent Producers and Consumers.
|
|
|
type Client struct {
|
|
|
- id string // client id for broker requests
|
|
|
+ id string
|
|
|
+ config ClientConfig
|
|
|
brokers map[int32]*Broker // maps broker ids to brokers
|
|
|
leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
|
|
|
lock sync.RWMutex // protects access to the maps, only one since they're always written together
|
|
|
@@ -20,7 +27,7 @@ type Client struct {
|
|
|
// NewClient creates a new Client with the given client ID. It connects to the broker at the given
|
|
|
// host:port address, and uses that broker to automatically fetch metadata on the rest of the kafka cluster.
|
|
|
// If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
|
|
|
-func NewClient(id string, host string, port int32) (client *Client, err error) {
|
|
|
+func NewClient(id string, host string, port int32, config ClientConfig) (client *Client, err error) {
|
|
|
tmp := NewBroker(host, port)
|
|
|
err = tmp.Connect()
|
|
|
if err != nil {
|
|
|
@@ -29,6 +36,7 @@ func NewClient(id string, host string, port int32) (client *Client, err error) {
|
|
|
|
|
|
client = new(Client)
|
|
|
client.id = id
|
|
|
+ client.config = config
|
|
|
|
|
|
client.brokers = make(map[int32]*Broker)
|
|
|
client.leaders = make(map[string]map[int32]int32)
|
|
|
@@ -39,7 +47,7 @@ func NewClient(id string, host string, port int32) (client *Client, err error) {
|
|
|
client.brokers[tmp.ID()] = tmp
|
|
|
|
|
|
// do an initial fetch of all cluster metadata by specifing an empty list of topics
|
|
|
- err = client.refreshTopics(make([]string, 0), 3)
|
|
|
+ err = client.refreshTopics(make([]string, 0), client.config.MetadataRetries)
|
|
|
if err != nil {
|
|
|
client.Close() // this closes tmp, since it's still in the brokers hash
|
|
|
return nil, err
|
|
|
@@ -122,7 +130,7 @@ func (client *Client) refreshTopic(topic string) error {
|
|
|
tmp := make([]string, 1)
|
|
|
tmp[0] = topic
|
|
|
// we permit three retries by default, 'cause that seemed like a nice number
|
|
|
- return client.refreshTopics(tmp, 3)
|
|
|
+ return client.refreshTopics(tmp, client.config.MetadataRetries)
|
|
|
}
|
|
|
|
|
|
// truly private helper functions
|
|
|
@@ -144,7 +152,7 @@ func (client *Client) refreshTopics(topics []string, retries int) error {
|
|
|
if retries <= 0 {
|
|
|
return LEADER_NOT_AVAILABLE
|
|
|
}
|
|
|
- time.Sleep(250 * time.Millisecond) // wait for leader election
|
|
|
+ time.Sleep(client.config.WaitForElection) // wait for leader election
|
|
|
return client.refreshTopics(retry, retries-1)
|
|
|
}
|
|
|
case EncodingError:
|