client.go 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. package kafka
  2. import k "sarama/protocol"
  3. // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
  4. // You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
  5. // automatically when it passes out of scope. A single client can be safely shared by
  6. // multiple concurrent Producers and Consumers.
  7. type Client struct {
  8. id string
  9. cache *metadataCache
  10. }
  11. // NewClient creates a new Client with the given client ID. It connects to the broker at the given
  12. // host:port address, and uses that broker to automatically fetch metadata on the rest of the kafka cluster.
  13. // If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
  14. func NewClient(id string, host string, port int32) (client *Client, err error) {
  15. client = new(Client)
  16. client.id = id
  17. client.cache, err = newMetadataCache(client, host, port)
  18. if err != nil {
  19. return nil, err
  20. }
  21. return client, nil
  22. }
  23. // Close shuts down all broker connections managed by this client. It is required to call this function before
  24. // a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
  25. // using a client before you close the client.
  26. func (client *Client) Close() {
  27. client.cache.closeAll()
  28. }
  29. func (client *Client) leader(topic string, partition_id int32) (*k.Broker, error) {
  30. leader := client.cache.leader(topic, partition_id)
  31. if leader == nil {
  32. err := client.cache.refreshTopic(topic)
  33. if err != nil {
  34. return nil, err
  35. }
  36. leader = client.cache.leader(topic, partition_id)
  37. }
  38. if leader == nil {
  39. return nil, k.UNKNOWN_TOPIC_OR_PARTITION
  40. }
  41. return leader, nil
  42. }
  43. func (client *Client) partitions(topic string) ([]int32, error) {
  44. partitions := client.cache.partitions(topic)
  45. if partitions == nil {
  46. err := client.cache.refreshTopic(topic)
  47. if err != nil {
  48. return nil, err
  49. }
  50. partitions = client.cache.partitions(topic)
  51. }
  52. if partitions == nil {
  53. return nil, NoSuchTopic
  54. }
  55. return partitions, nil
  56. }