| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970 |
- package kafka
- import k "sarama/protocol"
- // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
- // You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
- // automatically when it passes out of scope. A single client can be safely shared by
- // multiple concurrent Producers and Consumers.
- type Client struct {
- id string
- cache *metadataCache
- }
- // NewClient creates a new Client with the given client ID. It connects to the broker at the given
- // host:port address, and uses that broker to automatically fetch metadata on the rest of the kafka cluster.
- // If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
- func NewClient(id string, host string, port int32) (client *Client, err error) {
- client = new(Client)
- client.id = id
- client.cache, err = newMetadataCache(client, host, port)
- if err != nil {
- return nil, err
- }
- return client, nil
- }
- // Close shuts down all broker connections managed by this client. It is required to call this function before
- // a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
- // using a client before you close the client.
- func (client *Client) Close() {
- client.cache.closeAll()
- }
- func (client *Client) leader(topic string, partition_id int32) (*k.Broker, error) {
- leader := client.cache.leader(topic, partition_id)
- if leader == nil {
- err := client.cache.refreshTopic(topic)
- if err != nil {
- return nil, err
- }
- leader = client.cache.leader(topic, partition_id)
- }
- if leader == nil {
- return nil, k.UNKNOWN_TOPIC_OR_PARTITION
- }
- return leader, nil
- }
- func (client *Client) partitions(topic string) ([]int32, error) {
- partitions := client.cache.partitions(topic)
- if partitions == nil {
- err := client.cache.refreshTopic(topic)
- if err != nil {
- return nil, err
- }
- partitions = client.cache.partitions(topic)
- }
- if partitions == nil {
- return nil, NoSuchTopic
- }
- return partitions, nil
- }
|