client.go 1.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. package kafka
  2. import k "sarama/protocol"
  3. type Client struct {
  4. id string
  5. cache *metadataCache
  6. }
  7. func NewClient(id string, host string, port int32) (client *Client, err error) {
  8. client = new(Client)
  9. client.id = id
  10. client.cache, err = newMetadataCache(client, host, port)
  11. if err != nil {
  12. return nil, err
  13. }
  14. return client, nil
  15. }
  16. func (client *Client) leader(topic string, partition_id int32) (*k.Broker, error) {
  17. leader := client.cache.leader(topic, partition_id)
  18. if leader == nil {
  19. err := client.cache.refreshTopic(topic)
  20. if err != nil {
  21. return nil, err
  22. }
  23. leader = client.cache.leader(topic, partition_id)
  24. }
  25. if leader == nil {
  26. return nil, k.UNKNOWN_TOPIC_OR_PARTITION
  27. }
  28. return leader, nil
  29. }
  30. func (client *Client) partitions(topic string) ([]int32, error) {
  31. partitions := client.cache.partitions(topic)
  32. if partitions == nil {
  33. err := client.cache.refreshTopic(topic)
  34. if err != nil {
  35. return nil, err
  36. }
  37. partitions = client.cache.partitions(topic)
  38. }
  39. if partitions == nil {
  40. return nil, NoSuchTopic
  41. }
  42. return partitions, nil
  43. }