client.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. package kafka
  2. import k "sarama/protocol"
  3. import (
  4. "sort"
  5. "sync"
  6. )
  7. // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
  8. // You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
  9. // automatically when it passes out of scope. A single client can be safely shared by
  10. // multiple concurrent Producers and Consumers.
  11. type Client struct {
  12. id string // client id for broker requests
  13. brokers map[int32]*k.Broker // maps broker ids to brokers
  14. leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
  15. lock sync.RWMutex // protects access to the maps, only one since they're always written together
  16. }
  17. // NewClient creates a new Client with the given client ID. It connects to the broker at the given
  18. // host:port address, and uses that broker to automatically fetch metadata on the rest of the kafka cluster.
  19. // If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
  20. func NewClient(id string, host string, port int32) (client *Client, err error) {
  21. tmp := k.NewBroker(host, port)
  22. err = tmp.Connect()
  23. if err != nil {
  24. return nil, err
  25. }
  26. client = new(Client)
  27. client.id = id
  28. client.brokers = make(map[int32]*k.Broker)
  29. client.leaders = make(map[string]map[int32]int32)
  30. // add it temporarily so that refreshTopics can find it
  31. // brokers created through NewBroker() have an ID of -1, which won't conflict with
  32. // whatever the metadata request returns
  33. client.brokers[tmp.ID()] = tmp
  34. // do an initial fetch of all cluster metadata by specifing an empty list of topics
  35. err = client.refreshTopics(make([]string, 0))
  36. if err != nil {
  37. client.Close() // this closes tmp, since it's still in the brokers hash
  38. return nil, err
  39. }
  40. // now remove our tmp broker - the successful metadata request will have returned it
  41. // with a valid ID, so it will already be in the hash somewhere else and we don't need
  42. // the incomplete tmp one anymore
  43. client.disconnectBroker(tmp)
  44. return client, nil
  45. }
  46. // Close shuts down all broker connections managed by this client. It is required to call this function before
  47. // a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
  48. // using a client before you close the client.
  49. func (client *Client) Close() {
  50. client.lock.Lock()
  51. defer client.lock.Unlock()
  52. for _, broker := range client.brokers {
  53. go broker.Close()
  54. }
  55. client.brokers = nil
  56. client.leaders = nil
  57. }
  58. // functions for use by producers and consumers
  59. // if Go had the concept they would be marked 'protected'
  60. func (client *Client) leader(topic string, partition_id int32) (*k.Broker, error) {
  61. leader, kerr := client.cachedLeader(topic, partition_id)
  62. if kerr != k.NO_ERROR {
  63. err := client.refreshTopic(topic)
  64. if err != nil {
  65. return nil, err
  66. }
  67. leader, kerr = client.cachedLeader(topic, partition_id)
  68. }
  69. if kerr != k.NO_ERROR {
  70. return nil, kerr
  71. }
  72. return leader, nil
  73. }
  74. func (client *Client) partitions(topic string) ([]int32, error) {
  75. partitions := client.cachedPartitions(topic)
  76. if partitions == nil {
  77. err := client.refreshTopic(topic)
  78. if err != nil {
  79. return nil, err
  80. }
  81. partitions = client.cachedPartitions(topic)
  82. }
  83. if partitions == nil {
  84. return nil, NoSuchTopic
  85. }
  86. return partitions, nil
  87. }
  88. func (client *Client) disconnectBroker(broker *k.Broker) {
  89. client.lock.Lock()
  90. defer client.lock.Unlock()
  91. // we don't need to update the leaders hash, it will automatically get refreshed next time because
  92. // the broker lookup will return nil
  93. delete(client.brokers, broker.ID())
  94. go broker.Close()
  95. }
  96. func (client *Client) refreshTopics(topics []string) error {
  97. for broker := client.any(); broker != nil; broker = client.any() {
  98. response, err := broker.GetMetadata(client.id, &k.MetadataRequest{Topics: topics})
  99. switch err.(type) {
  100. case nil:
  101. // valid response, use it
  102. return client.update(response)
  103. case k.EncodingError:
  104. // didn't even send, return the error
  105. return err
  106. }
  107. // some other error, remove that broker and try again
  108. client.disconnectBroker(broker)
  109. }
  110. return OutOfBrokers
  111. }
  112. func (client *Client) refreshTopic(topic string) error {
  113. tmp := make([]string, 1)
  114. tmp[0] = topic
  115. return client.refreshTopics(tmp)
  116. }
  117. // truly private helper functions
  118. func (client *Client) any() *k.Broker {
  119. client.lock.RLock()
  120. defer client.lock.RUnlock()
  121. for _, broker := range client.brokers {
  122. return broker
  123. }
  124. return nil
  125. }
  126. func (client *Client) cachedLeader(topic string, partition_id int32) (*k.Broker, k.KError) {
  127. client.lock.RLock()
  128. defer client.lock.RUnlock()
  129. partitions := client.leaders[topic]
  130. if partitions != nil {
  131. leader, ok := partitions[partition_id]
  132. if ok {
  133. if leader == -1 {
  134. return nil, k.LEADER_NOT_AVAILABLE
  135. } else {
  136. return client.brokers[leader], k.NO_ERROR
  137. }
  138. }
  139. }
  140. return nil, k.UNKNOWN_TOPIC_OR_PARTITION
  141. }
  142. func (client *Client) cachedPartitions(topic string) []int32 {
  143. client.lock.RLock()
  144. defer client.lock.RUnlock()
  145. partitions := client.leaders[topic]
  146. if partitions == nil {
  147. return nil
  148. }
  149. ret := make([]int32, len(partitions))
  150. for id, _ := range partitions {
  151. ret = append(ret, id)
  152. }
  153. sort.Sort(int32Slice(ret))
  154. return ret
  155. }
  156. func (client *Client) update(data *k.MetadataResponse) error {
  157. // First discard brokers that we already know about. This avoids bouncing TCP connections,
  158. // and especially avoids closing valid connections out from under other code which may be trying
  159. // to use them. We only need a read-lock for this.
  160. var newBrokers []*k.Broker
  161. client.lock.RLock()
  162. for _, broker := range data.Brokers {
  163. if !broker.Equals(client.brokers[broker.ID()]) {
  164. newBrokers = append(newBrokers, broker)
  165. }
  166. }
  167. client.lock.RUnlock()
  168. // connect to the brokers before taking the write lock, as this can take a while
  169. // to timeout if one of them isn't reachable
  170. for _, broker := range newBrokers {
  171. err := broker.Connect()
  172. if err != nil {
  173. return err
  174. }
  175. }
  176. client.lock.Lock()
  177. defer client.lock.Unlock()
  178. for _, broker := range newBrokers {
  179. if client.brokers[broker.ID()] != nil {
  180. go client.brokers[broker.ID()].Close()
  181. }
  182. client.brokers[broker.ID()] = broker
  183. }
  184. for _, topic := range data.Topics {
  185. if topic.Err != k.NO_ERROR {
  186. return topic.Err
  187. }
  188. client.leaders[topic.Name] = make(map[int32]int32, len(topic.Partitions))
  189. for _, partition := range topic.Partitions {
  190. switch partition.Err {
  191. case k.NO_ERROR, k.LEADER_NOT_AVAILABLE:
  192. // in the LEADER_NOT_AVAILABLE case partition.Leader will be -1 because the
  193. // partition is in the middle of leader election, so we save it anyways to avoid
  194. // returning the stale leader (since our broker map should never have a broker with ID -1)
  195. client.leaders[topic.Name][partition.Id] = partition.Leader
  196. default:
  197. return partition.Err
  198. }
  199. }
  200. }
  201. return nil
  202. }