client.go 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. /*
  2. Package kafka provides a high-level API for writing Kafka clients.
  3. It is built on sister package sarama/protocol.
  4. */
  5. package kafka
  6. import k "sarama/protocol"
  7. import (
  8. "sarama/encoding"
  9. "sarama/types"
  10. "sort"
  11. "sync"
  12. "time"
  13. )
  14. // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
  15. // You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
  16. // automatically when it passes out of scope. A single client can be safely shared by
  17. // multiple concurrent Producers and Consumers.
  18. type Client struct {
  19. id string // client id for broker requests
  20. brokers map[int32]*k.Broker // maps broker ids to brokers
  21. leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
  22. lock sync.RWMutex // protects access to the maps, only one since they're always written together
  23. }
  24. // NewClient creates a new Client with the given client ID. It connects to the broker at the given
  25. // host:port address, and uses that broker to automatically fetch metadata on the rest of the kafka cluster.
  26. // If metadata cannot be retrieved (even if the connection otherwise succeeds) then the client is not created.
  27. func NewClient(id string, host string, port int32) (client *Client, err error) {
  28. tmp := k.NewBroker(host, port)
  29. err = tmp.Connect()
  30. if err != nil {
  31. return nil, err
  32. }
  33. client = new(Client)
  34. client.id = id
  35. client.brokers = make(map[int32]*k.Broker)
  36. client.leaders = make(map[string]map[int32]int32)
  37. // add it temporarily so that refreshTopics can find it
  38. // brokers created through NewBroker() have an ID of -1, which won't conflict with
  39. // whatever the metadata request returns
  40. client.brokers[tmp.ID()] = tmp
  41. // do an initial fetch of all cluster metadata by specifing an empty list of topics
  42. err = client.refreshTopics(make([]string, 0), 3)
  43. if err != nil {
  44. client.Close() // this closes tmp, since it's still in the brokers hash
  45. return nil, err
  46. }
  47. // now remove our tmp broker - the successful metadata request will have returned it
  48. // with a valid ID, so it will already be in the hash somewhere else and we don't need
  49. // the incomplete tmp one anymore
  50. client.disconnectBroker(tmp)
  51. return client, nil
  52. }
  53. // Close shuts down all broker connections managed by this client. It is required to call this function before
  54. // a client object passes out of scope, as it will otherwise leak memory. You must close any Producers or Consumers
  55. // using a client before you close the client.
  56. func (client *Client) Close() {
  57. client.lock.Lock()
  58. defer client.lock.Unlock()
  59. for _, broker := range client.brokers {
  60. go broker.Close()
  61. }
  62. client.brokers = nil
  63. client.leaders = nil
  64. }
  65. // functions for use by producers and consumers
  66. // if Go had the concept they would be marked 'protected'
  67. func (client *Client) leader(topic string, partition_id int32) (*k.Broker, error) {
  68. leader := client.cachedLeader(topic, partition_id)
  69. if leader == nil {
  70. err := client.refreshTopic(topic)
  71. if err != nil {
  72. return nil, err
  73. }
  74. leader = client.cachedLeader(topic, partition_id)
  75. }
  76. if leader == nil {
  77. return nil, types.UNKNOWN_TOPIC_OR_PARTITION
  78. }
  79. return leader, nil
  80. }
  81. func (client *Client) partitions(topic string) ([]int32, error) {
  82. partitions := client.cachedPartitions(topic)
  83. if partitions == nil {
  84. err := client.refreshTopic(topic)
  85. if err != nil {
  86. return nil, err
  87. }
  88. partitions = client.cachedPartitions(topic)
  89. }
  90. if partitions == nil {
  91. return nil, NoSuchTopic
  92. }
  93. return partitions, nil
  94. }
  95. func (client *Client) disconnectBroker(broker *k.Broker) {
  96. client.lock.Lock()
  97. defer client.lock.Unlock()
  98. // we don't need to update the leaders hash, it will automatically get refreshed next time because
  99. // the broker lookup will return nil
  100. delete(client.brokers, broker.ID())
  101. go broker.Close()
  102. }
  103. func (client *Client) refreshTopic(topic string) error {
  104. tmp := make([]string, 1)
  105. tmp[0] = topic
  106. // we permit three retries by default, 'cause that seemed like a nice number
  107. return client.refreshTopics(tmp, 3)
  108. }
  109. // truly private helper functions
  110. func (client *Client) refreshTopics(topics []string, retries int) error {
  111. for broker := client.any(); broker != nil; broker = client.any() {
  112. response, err := broker.GetMetadata(client.id, &k.MetadataRequest{Topics: topics})
  113. switch err {
  114. case nil:
  115. // valid response, use it
  116. retry, err := client.update(response)
  117. switch {
  118. case err != nil:
  119. return err
  120. case len(retry) == 0:
  121. return nil
  122. default:
  123. if retries <= 0 {
  124. return types.LEADER_NOT_AVAILABLE
  125. }
  126. time.Sleep(250 * time.Millisecond) // wait for leader election
  127. return client.refreshTopics(retry, retries-1)
  128. }
  129. case encoding.EncodingError:
  130. // didn't even send, return the error
  131. return err
  132. }
  133. // some other error, remove that broker and try again
  134. client.disconnectBroker(broker)
  135. }
  136. return OutOfBrokers
  137. }
  138. func (client *Client) any() *k.Broker {
  139. client.lock.RLock()
  140. defer client.lock.RUnlock()
  141. for _, broker := range client.brokers {
  142. return broker
  143. }
  144. return nil
  145. }
  146. func (client *Client) cachedLeader(topic string, partition_id int32) *k.Broker {
  147. client.lock.RLock()
  148. defer client.lock.RUnlock()
  149. partitions := client.leaders[topic]
  150. if partitions != nil {
  151. leader, ok := partitions[partition_id]
  152. if ok && leader != -1 {
  153. return client.brokers[leader]
  154. }
  155. }
  156. return nil
  157. }
  158. func (client *Client) cachedPartitions(topic string) []int32 {
  159. client.lock.RLock()
  160. defer client.lock.RUnlock()
  161. partitions := client.leaders[topic]
  162. if partitions == nil {
  163. return nil
  164. }
  165. ret := make([]int32, len(partitions))
  166. for id, _ := range partitions {
  167. ret = append(ret, id)
  168. }
  169. sort.Sort(int32Slice(ret))
  170. return ret
  171. }
  172. // if no fatal error, returns a list of topics that need retrying due to LEADER_NOT_AVAILABLE
  173. func (client *Client) update(data *k.MetadataResponse) ([]string, error) {
  174. // First discard brokers that we already know about. This avoids bouncing TCP connections,
  175. // and especially avoids closing valid connections out from under other code which may be trying
  176. // to use them. We only need a read-lock for this.
  177. var newBrokers []*k.Broker
  178. client.lock.RLock()
  179. for _, broker := range data.Brokers {
  180. if !broker.Equals(client.brokers[broker.ID()]) {
  181. newBrokers = append(newBrokers, broker)
  182. }
  183. }
  184. client.lock.RUnlock()
  185. // connect to the brokers before taking the write lock, as this can take a while
  186. // to timeout if one of them isn't reachable
  187. for _, broker := range newBrokers {
  188. err := broker.Connect()
  189. if err != nil {
  190. return nil, err
  191. }
  192. }
  193. client.lock.Lock()
  194. defer client.lock.Unlock()
  195. for _, broker := range newBrokers {
  196. if client.brokers[broker.ID()] != nil {
  197. go client.brokers[broker.ID()].Close()
  198. }
  199. client.brokers[broker.ID()] = broker
  200. }
  201. toRetry := make(map[string]bool)
  202. for _, topic := range data.Topics {
  203. switch topic.Err {
  204. case types.NO_ERROR:
  205. break
  206. case types.LEADER_NOT_AVAILABLE:
  207. toRetry[topic.Name] = true
  208. default:
  209. return nil, topic.Err
  210. }
  211. client.leaders[topic.Name] = make(map[int32]int32, len(topic.Partitions))
  212. for _, partition := range topic.Partitions {
  213. switch partition.Err {
  214. case types.LEADER_NOT_AVAILABLE:
  215. // in the LEADER_NOT_AVAILABLE case partition.Leader will be -1 because the
  216. // partition is in the middle of leader election, so we fallthrough to save it
  217. // anyways in order to avoid returning the stale leader (since -1 isn't a valid broker ID)
  218. toRetry[topic.Name] = true
  219. fallthrough
  220. case types.NO_ERROR:
  221. client.leaders[topic.Name][partition.Id] = partition.Leader
  222. default:
  223. return nil, partition.Err
  224. }
  225. }
  226. }
  227. ret := make([]string, 0, len(toRetry))
  228. for topic, _ := range toRetry {
  229. ret = append(ret, topic)
  230. }
  231. return ret, nil
  232. }