|
|
@@ -10,7 +10,54 @@ import (
|
|
|
// You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
|
|
|
// automatically when it passes out of scope. A single client can be safely shared by
|
|
|
// multiple concurrent Producers and Consumers.
|
|
|
-type Client struct {
|
|
|
+type Client interface {
|
|
|
+ // Config returns the Config struct of the client. This struct should not be altered after it
|
|
|
+ // has been created.
|
|
|
+ Config() *Config
|
|
|
+
|
|
|
+ // Topics returns the set of available topics as retrieved from the cluster metadata.
|
|
|
+ Topics() ([]string, error)
|
|
|
+
|
|
|
+ // Partitions returns the sorted list of all partition IDs for the given topic.
|
|
|
+ Partitions(topic string) ([]int32, error)
|
|
|
+
|
|
|
+ // WritablePartitions returns the sorted list of all writable partition IDs for the given topic,
|
|
|
+ // where "writable" means "having a valid leader accepting writes".
|
|
|
+ WritablePartitions(topic string) ([]int32, error)
|
|
|
+
|
|
|
+ // Leader returns the broker object that is the leader of the current topic/partition, as
|
|
|
+ // determined by querying the cluster metadata.
|
|
|
+ Leader(topic string, partitionID int32) (*Broker, error)
|
|
|
+
|
|
|
+ // Replicas returns the set of all replica IDs for the given partition.
|
|
|
+ Replicas(topic string, partitionID int32) ([]int32, error)
|
|
|
+
|
|
|
+ // ReplicasInSync returns the set of all in-sync replica IDs for the given partition.
|
|
|
+ // Note: kafka's metadata here is known to be stale in many cases, and should not generally be trusted.
|
|
|
+ // This method should be considered effectively deprecated.
|
|
|
+ ReplicasInSync(topic string, partitionID int32) ([]int32, error)
|
|
|
+
|
|
|
+ // RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
|
|
|
+ // available metadata for those topics.
|
|
|
+ RefreshTopicMetadata(topics ...string) error
|
|
|
+
|
|
|
+ // RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
|
|
|
+ RefreshAllMetadata() error
|
|
|
+
|
|
|
+ // GetOffset queries the cluster to get the most recent available offset at the given
|
|
|
+ // time on the topic/partition combination.
|
|
|
+ GetOffset(topic string, partitionID int32, where OffsetTime) (int64, error)
|
|
|
+
|
|
|
+ // Close shuts down all broker connections managed by this client. It is required to call this function before
|
|
|
+ // a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
|
|
|
+ // using a client before you close the client.
|
|
|
+ Close() error
|
|
|
+
|
|
|
+ // Closed returns true if the client has already had Close called on it
|
|
|
+ Closed() bool
|
|
|
+}
|
|
|
+
|
|
|
+type client struct {
|
|
|
conf *Config
|
|
|
closer chan none
|
|
|
|
|
|
@@ -33,7 +80,7 @@ type Client struct {
|
|
|
// NewClient creates a new Client. It connects to one of the given broker addresses
|
|
|
// and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
|
|
|
// be retrieved from any of the given broker addresses, the client is not created.
|
|
|
-func NewClient(addrs []string, conf *Config) (*Client, error) {
|
|
|
+func NewClient(addrs []string, conf *Config) (Client, error) {
|
|
|
Logger.Println("Initializing new client")
|
|
|
|
|
|
if conf == nil {
|
|
|
@@ -48,7 +95,7 @@ func NewClient(addrs []string, conf *Config) (*Client, error) {
|
|
|
return nil, ConfigurationError("You must provide at least one broker address")
|
|
|
}
|
|
|
|
|
|
- client := &Client{
|
|
|
+ client := &client{
|
|
|
conf: conf,
|
|
|
closer: make(chan none),
|
|
|
seedBrokerAddrs: addrs,
|
|
|
@@ -79,10 +126,11 @@ func NewClient(addrs []string, conf *Config) (*Client, error) {
|
|
|
return client, nil
|
|
|
}
|
|
|
|
|
|
-// Close shuts down all broker connections managed by this client. It is required to call this function before
|
|
|
-// a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
|
|
|
-// using a client before you close the client.
|
|
|
-func (client *Client) Close() error {
|
|
|
+func (client *client) Config() *Config {
|
|
|
+ return client.conf
|
|
|
+}
|
|
|
+
|
|
|
+func (client *client) Close() error {
|
|
|
// Check to see whether the client is closed
|
|
|
if client.Closed() {
|
|
|
// Chances are this is being called from a defer() and the error will go unobserved
|
|
|
@@ -110,13 +158,11 @@ func (client *Client) Close() error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-// Closed returns true if the client has already had Close called on it
|
|
|
-func (client *Client) Closed() bool {
|
|
|
+func (client *client) Closed() bool {
|
|
|
return client.brokers == nil
|
|
|
}
|
|
|
|
|
|
-// Topics returns the set of available topics as retrieved from the cluster metadata.
|
|
|
-func (client *Client) Topics() ([]string, error) {
|
|
|
+func (client *client) Topics() ([]string, error) {
|
|
|
// Check to see whether the client is closed
|
|
|
if client.Closed() {
|
|
|
return nil, ErrClosedClient
|
|
|
@@ -133,8 +179,7 @@ func (client *Client) Topics() ([]string, error) {
|
|
|
return ret, nil
|
|
|
}
|
|
|
|
|
|
-// Partitions returns the sorted list of all partition IDs for the given topic.
|
|
|
-func (client *Client) Partitions(topic string) ([]int32, error) {
|
|
|
+func (client *client) Partitions(topic string) ([]int32, error) {
|
|
|
// Check to see whether the client is closed
|
|
|
if client.Closed() {
|
|
|
return nil, ErrClosedClient
|
|
|
@@ -157,9 +202,7 @@ func (client *Client) Partitions(topic string) ([]int32, error) {
|
|
|
return partitions, nil
|
|
|
}
|
|
|
|
|
|
-// WritablePartitions returns the sorted list of all writable partition IDs for the given topic,
|
|
|
-// where "writable" means "having a valid leader accepting writes".
|
|
|
-func (client *Client) WritablePartitions(topic string) ([]int32, error) {
|
|
|
+func (client *client) WritablePartitions(topic string) ([]int32, error) {
|
|
|
// Check to see whether the client is closed
|
|
|
if client.Closed() {
|
|
|
return nil, ErrClosedClient
|
|
|
@@ -188,8 +231,7 @@ func (client *Client) WritablePartitions(topic string) ([]int32, error) {
|
|
|
return partitions, nil
|
|
|
}
|
|
|
|
|
|
-// Replicas returns the set of all replica IDs for the given partition.
|
|
|
-func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error) {
|
|
|
+func (client *client) Replicas(topic string, partitionID int32) ([]int32, error) {
|
|
|
if client.Closed() {
|
|
|
return nil, ErrClosedClient
|
|
|
}
|
|
|
@@ -206,10 +248,7 @@ func (client *Client) Replicas(topic string, partitionID int32) ([]int32, error)
|
|
|
return dupeAndSort(metadata.Replicas), nil
|
|
|
}
|
|
|
|
|
|
-// ReplicasInSync returns the set of all in-sync replica IDs for the given partition.
|
|
|
-// Note: kafka's metadata here is known to be stale in many cases, and should not generally be trusted.
|
|
|
-// This method should be considered effectively deprecated.
|
|
|
-func (client *Client) ReplicasInSync(topic string, partitionID int32) ([]int32, error) {
|
|
|
+func (client *client) ReplicasInSync(topic string, partitionID int32) ([]int32, error) {
|
|
|
if client.Closed() {
|
|
|
return nil, ErrClosedClient
|
|
|
}
|
|
|
@@ -226,9 +265,7 @@ func (client *Client) ReplicasInSync(topic string, partitionID int32) ([]int32,
|
|
|
return dupeAndSort(metadata.Isr), nil
|
|
|
}
|
|
|
|
|
|
-// Leader returns the broker object that is the leader of the current topic/partition, as
|
|
|
-// determined by querying the cluster metadata.
|
|
|
-func (client *Client) Leader(topic string, partitionID int32) (*Broker, error) {
|
|
|
+func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
|
|
|
leader, err := client.cachedLeader(topic, partitionID)
|
|
|
|
|
|
if leader == nil {
|
|
|
@@ -242,21 +279,16 @@ func (client *Client) Leader(topic string, partitionID int32) (*Broker, error) {
|
|
|
return leader, err
|
|
|
}
|
|
|
|
|
|
-// RefreshTopicMetadata takes a list of topics and queries the cluster to refresh the
|
|
|
-// available metadata for those topics.
|
|
|
-func (client *Client) RefreshTopicMetadata(topics ...string) error {
|
|
|
+func (client *client) RefreshTopicMetadata(topics ...string) error {
|
|
|
return client.refreshMetadata(topics, client.conf.Metadata.Retry.Max)
|
|
|
}
|
|
|
|
|
|
-// RefreshAllMetadata queries the cluster to refresh the available metadata for all topics.
|
|
|
-func (client *Client) RefreshAllMetadata() error {
|
|
|
+func (client *client) RefreshAllMetadata() error {
|
|
|
// Kafka refreshes all when you encode it an empty array...
|
|
|
return client.refreshMetadata(make([]string, 0), client.conf.Metadata.Retry.Max)
|
|
|
}
|
|
|
|
|
|
-// GetOffset queries the cluster to get the most recent available offset at the given
|
|
|
-// time on the topic/partition combination.
|
|
|
-func (client *Client) GetOffset(topic string, partitionID int32, where OffsetTime) (int64, error) {
|
|
|
+func (client *client) GetOffset(topic string, partitionID int32, where OffsetTime) (int64, error) {
|
|
|
broker, err := client.Leader(topic, partitionID)
|
|
|
if err != nil {
|
|
|
return -1, err
|
|
|
@@ -290,7 +322,7 @@ func (client *Client) GetOffset(topic string, partitionID int32, where OffsetTim
|
|
|
// and https://github.com/Shopify/sarama/issues/23
|
|
|
// disconnectBroker is a bad hacky way to accomplish broker management. It should be replaced with
|
|
|
// something sane and the replacement should be made part of the public Client API
|
|
|
-func (client *Client) disconnectBroker(broker *Broker) {
|
|
|
+func (client *client) disconnectBroker(broker *Broker) {
|
|
|
client.lock.Lock()
|
|
|
defer client.lock.Unlock()
|
|
|
Logger.Printf("Disconnecting Broker %d\n", broker.ID())
|
|
|
@@ -314,7 +346,7 @@ func (client *Client) disconnectBroker(broker *Broker) {
|
|
|
safeAsyncClose(broker)
|
|
|
}
|
|
|
|
|
|
-func (client *Client) resurrectDeadBrokers() {
|
|
|
+func (client *client) resurrectDeadBrokers() {
|
|
|
client.lock.Lock()
|
|
|
defer client.lock.Unlock()
|
|
|
|
|
|
@@ -332,7 +364,7 @@ func (client *Client) resurrectDeadBrokers() {
|
|
|
_ = client.seedBroker.Open(client.conf)
|
|
|
}
|
|
|
|
|
|
-func (client *Client) any() *Broker {
|
|
|
+func (client *client) any() *Broker {
|
|
|
client.lock.RLock()
|
|
|
defer client.lock.RUnlock()
|
|
|
|
|
|
@@ -362,7 +394,7 @@ const (
|
|
|
maxPartitionIndex
|
|
|
)
|
|
|
|
|
|
-func (client *Client) getMetadata(topic string, partitionID int32) (*PartitionMetadata, error) {
|
|
|
+func (client *client) getMetadata(topic string, partitionID int32) (*PartitionMetadata, error) {
|
|
|
metadata := client.cachedMetadata(topic, partitionID)
|
|
|
|
|
|
if metadata == nil {
|
|
|
@@ -380,7 +412,7 @@ func (client *Client) getMetadata(topic string, partitionID int32) (*PartitionMe
|
|
|
return metadata, nil
|
|
|
}
|
|
|
|
|
|
-func (client *Client) cachedMetadata(topic string, partitionID int32) *PartitionMetadata {
|
|
|
+func (client *client) cachedMetadata(topic string, partitionID int32) *PartitionMetadata {
|
|
|
client.lock.RLock()
|
|
|
defer client.lock.RUnlock()
|
|
|
|
|
|
@@ -392,7 +424,7 @@ func (client *Client) cachedMetadata(topic string, partitionID int32) *Partition
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-func (client *Client) cachedPartitions(topic string, partitionSet partitionType) []int32 {
|
|
|
+func (client *client) cachedPartitions(topic string, partitionSet partitionType) []int32 {
|
|
|
client.lock.RLock()
|
|
|
defer client.lock.RUnlock()
|
|
|
|
|
|
@@ -404,7 +436,7 @@ func (client *Client) cachedPartitions(topic string, partitionSet partitionType)
|
|
|
return partitions[partitionSet]
|
|
|
}
|
|
|
|
|
|
-func (client *Client) setPartitionCache(topic string, partitionSet partitionType) []int32 {
|
|
|
+func (client *client) setPartitionCache(topic string, partitionSet partitionType) []int32 {
|
|
|
partitions := client.metadata[topic]
|
|
|
|
|
|
if partitions == nil {
|
|
|
@@ -423,7 +455,7 @@ func (client *Client) setPartitionCache(topic string, partitionSet partitionType
|
|
|
return ret
|
|
|
}
|
|
|
|
|
|
-func (client *Client) cachedLeader(topic string, partitionID int32) (*Broker, error) {
|
|
|
+func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, error) {
|
|
|
client.lock.RLock()
|
|
|
defer client.lock.RUnlock()
|
|
|
|
|
|
@@ -448,7 +480,7 @@ func (client *Client) cachedLeader(topic string, partitionID int32) (*Broker, er
|
|
|
|
|
|
// core metadata update logic
|
|
|
|
|
|
-func (client *Client) backgroundMetadataUpdater() {
|
|
|
+func (client *client) backgroundMetadataUpdater() {
|
|
|
if client.conf.Metadata.RefreshFrequency == time.Duration(0) {
|
|
|
return
|
|
|
}
|
|
|
@@ -467,7 +499,7 @@ func (client *Client) backgroundMetadataUpdater() {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-func (client *Client) refreshMetadata(topics []string, retriesRemaining int) error {
|
|
|
+func (client *client) refreshMetadata(topics []string, retriesRemaining int) error {
|
|
|
// This function is a sort of central point for most functions that create new
|
|
|
// resources. Check to see if we're dealing with a closed Client and error
|
|
|
// out immediately if so.
|
|
|
@@ -533,7 +565,7 @@ func (client *Client) refreshMetadata(topics []string, retriesRemaining int) err
|
|
|
}
|
|
|
|
|
|
// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
|
|
|
-func (client *Client) update(data *MetadataResponse) ([]string, error) {
|
|
|
+func (client *client) update(data *MetadataResponse) ([]string, error) {
|
|
|
client.lock.Lock()
|
|
|
defer client.lock.Unlock()
|
|
|
|