client.go 1.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. package kafka
  2. type Client struct {
  3. id *string
  4. cache *metadataCache
  5. }
  6. func NewClient(id *string, host string, port int32) (client *Client, err error) {
  7. client = new(Client)
  8. client.id = id
  9. client.cache, err = newMetadataCache(client, host, port)
  10. if err != nil {
  11. return nil, err
  12. }
  13. return client, nil
  14. }
  15. func (client *Client) leader(topic string, partition_id int32) (*Broker, error) {
  16. leader := client.cache.leader(topic, partition_id)
  17. if leader == nil {
  18. err := client.cache.refreshTopic(topic)
  19. if err != nil {
  20. return nil, err
  21. }
  22. leader = client.cache.leader(topic, partition_id)
  23. }
  24. if leader == nil {
  25. return nil, UNKNOWN_TOPIC_OR_PARTITION
  26. }
  27. return leader, nil
  28. }
  29. func (client *Client) partitions(topic string) ([]int32, error) {
  30. partitions := client.cache.partitions(topic)
  31. if partitions == nil {
  32. err := client.cache.refreshTopic(topic)
  33. if err != nil {
  34. return nil, err
  35. }
  36. partitions = client.cache.partitions(topic)
  37. }
  38. if partitions == nil {
  39. return nil, UNKNOWN_TOPIC_OR_PARTITION
  40. }
  41. return partitions, nil
  42. }