client.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. /*
  2. Package kafka provides a high-level API for writing Kafka 0.8 clients.
  3. It is built strictly on sister package sarama/protocol.
  4. */
  5. package kafka
  6. import k "sarama/protocol"
  7. import (
  8. "sort"
  9. "sync"
  10. "time"
  11. )
  12. // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
  13. // You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
  14. // automatically when it passes out of scope. A single client can be safely shared by
  15. // multiple concurrent Producers and Consumers.
  16. type Client struct {
  17. id string // client id for broker requests
  18. brokers map[int32]*k.Broker // maps broker ids to brokers
  19. leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
  20. lock sync.RWMutex // protects access to the maps, only one since they're always written together
  21. }
  22. // NewClient creates a new Client with the given client ID. It connects to the broker at the given
  23. // host:port address, and uses that broker to automatically fetch metadata on the rest of the kafka cluster.
  24. // If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
  25. func NewClient(id string, host string, port int32) (client *Client, err error) {
  26. tmp := k.NewBroker(host, port)
  27. err = tmp.Connect()
  28. if err != nil {
  29. return nil, err
  30. }
  31. client = new(Client)
  32. client.id = id
  33. client.brokers = make(map[int32]*k.Broker)
  34. client.leaders = make(map[string]map[int32]int32)
  35. // add it temporarily so that refreshTopics can find it
  36. // brokers created through NewBroker() have an ID of -1, which won't conflict with
  37. // whatever the metadata request returns
  38. client.brokers[tmp.ID()] = tmp
  39. // do an initial fetch of all cluster metadata by specifing an empty list of topics
  40. err = client.refreshTopics(make([]string, 0), 3)
  41. if err != nil {
  42. client.Close() // this closes tmp, since it's still in the brokers hash
  43. return nil, err
  44. }
  45. // now remove our tmp broker - the successful metadata request will have returned it
  46. // with a valid ID, so it will already be in the hash somewhere else and we don't need
  47. // the incomplete tmp one anymore
  48. client.disconnectBroker(tmp)
  49. return client, nil
  50. }
  51. // Close shuts down all broker connections managed by this client. It is required to call this function before
  52. // a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
  53. // using a client before you close the client.
  54. func (client *Client) Close() {
  55. client.lock.Lock()
  56. defer client.lock.Unlock()
  57. for _, broker := range client.brokers {
  58. go broker.Close()
  59. }
  60. client.brokers = nil
  61. client.leaders = nil
  62. }
  63. // functions for use by producers and consumers
  64. // if Go had the concept they would be marked 'protected'
  65. func (client *Client) leader(topic string, partition_id int32) (*k.Broker, error) {
  66. leader := client.cachedLeader(topic, partition_id)
  67. if leader == nil {
  68. err := client.refreshTopic(topic)
  69. if err != nil {
  70. return nil, err
  71. }
  72. leader = client.cachedLeader(topic, partition_id)
  73. }
  74. if leader == nil {
  75. return nil, k.UNKNOWN_TOPIC_OR_PARTITION
  76. }
  77. return leader, nil
  78. }
  79. func (client *Client) partitions(topic string) ([]int32, error) {
  80. partitions := client.cachedPartitions(topic)
  81. if partitions == nil {
  82. err := client.refreshTopic(topic)
  83. if err != nil {
  84. return nil, err
  85. }
  86. partitions = client.cachedPartitions(topic)
  87. }
  88. if partitions == nil {
  89. return nil, NoSuchTopic
  90. }
  91. return partitions, nil
  92. }
  93. func (client *Client) disconnectBroker(broker *k.Broker) {
  94. client.lock.Lock()
  95. defer client.lock.Unlock()
  96. // we don't need to update the leaders hash, it will automatically get refreshed next time because
  97. // the broker lookup will return nil
  98. delete(client.brokers, broker.ID())
  99. go broker.Close()
  100. }
  101. func (client *Client) refreshTopic(topic string) error {
  102. tmp := make([]string, 1)
  103. tmp[0] = topic
  104. // we permit three retries by default, 'cause that seemed like a nice number
  105. return client.refreshTopics(tmp, 3)
  106. }
  107. // truly private helper functions
  108. func (client *Client) refreshTopics(topics []string, retries int) error {
  109. for broker := client.any(); broker != nil; broker = client.any() {
  110. response, err := broker.GetMetadata(client.id, &k.MetadataRequest{Topics: topics})
  111. switch err.(type) {
  112. case nil:
  113. // valid response, use it
  114. retry, err := client.update(response)
  115. switch {
  116. case err != nil:
  117. return err
  118. case len(retry) == 0:
  119. return nil
  120. default:
  121. if retries <= 0 {
  122. return k.LEADER_NOT_AVAILABLE
  123. }
  124. time.Sleep(250 * time.Millisecond) // wait for leader election
  125. return client.refreshTopics(retry, retries-1)
  126. }
  127. case k.EncodingError:
  128. // didn't even send, return the error
  129. return err
  130. }
  131. // some other error, remove that broker and try again
  132. client.disconnectBroker(broker)
  133. }
  134. return OutOfBrokers
  135. }
  136. func (client *Client) any() *k.Broker {
  137. client.lock.RLock()
  138. defer client.lock.RUnlock()
  139. for _, broker := range client.brokers {
  140. return broker
  141. }
  142. return nil
  143. }
  144. func (client *Client) cachedLeader(topic string, partition_id int32) *k.Broker {
  145. client.lock.RLock()
  146. defer client.lock.RUnlock()
  147. partitions := client.leaders[topic]
  148. if partitions != nil {
  149. leader, ok := partitions[partition_id]
  150. if ok && leader != -1 {
  151. return client.brokers[leader]
  152. }
  153. }
  154. return nil
  155. }
  156. func (client *Client) cachedPartitions(topic string) []int32 {
  157. client.lock.RLock()
  158. defer client.lock.RUnlock()
  159. partitions := client.leaders[topic]
  160. if partitions == nil {
  161. return nil
  162. }
  163. ret := make([]int32, len(partitions))
  164. for id, _ := range partitions {
  165. ret = append(ret, id)
  166. }
  167. sort.Sort(int32Slice(ret))
  168. return ret
  169. }
  170. // if no fatal error, returns a list of topics that need retrying due to LEADER_NOT_AVAILABLE
  171. func (client *Client) update(data *k.MetadataResponse) ([]string, error) {
  172. // First discard brokers that we already know about. This avoids bouncing TCP connections,
  173. // and especially avoids closing valid connections out from under other code which may be trying
  174. // to use them. We only need a read-lock for this.
  175. var newBrokers []*k.Broker
  176. client.lock.RLock()
  177. for _, broker := range data.Brokers {
  178. if !broker.Equals(client.brokers[broker.ID()]) {
  179. newBrokers = append(newBrokers, broker)
  180. }
  181. }
  182. client.lock.RUnlock()
  183. // connect to the brokers before taking the write lock, as this can take a while
  184. // to timeout if one of them isn't reachable
  185. for _, broker := range newBrokers {
  186. err := broker.Connect()
  187. if err != nil {
  188. return nil, err
  189. }
  190. }
  191. client.lock.Lock()
  192. defer client.lock.Unlock()
  193. for _, broker := range newBrokers {
  194. if client.brokers[broker.ID()] != nil {
  195. go client.brokers[broker.ID()].Close()
  196. }
  197. client.brokers[broker.ID()] = broker
  198. }
  199. toRetry := make(map[string]bool)
  200. for _, topic := range data.Topics {
  201. switch topic.Err {
  202. case k.NO_ERROR:
  203. break
  204. case k.LEADER_NOT_AVAILABLE:
  205. toRetry[topic.Name] = true
  206. default:
  207. return nil, topic.Err
  208. }
  209. client.leaders[topic.Name] = make(map[int32]int32, len(topic.Partitions))
  210. for _, partition := range topic.Partitions {
  211. switch partition.Err {
  212. case k.LEADER_NOT_AVAILABLE:
  213. // in the LEADER_NOT_AVAILABLE case partition.Leader will be -1 because the
  214. // partition is in the middle of leader election, so we fallthrough to save it
  215. // anyways in order to avoid returning the stale leader (since -1 isn't a valid broker ID)
  216. toRetry[topic.Name] = true
  217. fallthrough
  218. case k.NO_ERROR:
  219. client.leaders[topic.Name][partition.Id] = partition.Leader
  220. default:
  221. return nil, partition.Err
  222. }
  223. }
  224. }
  225. ret := make([]string, 0, len(toRetry))
  226. for topic, _ := range toRetry {
  227. ret = append(ret, topic)
  228. }
  229. return ret, nil
  230. }