client.go 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100
  1. package sarama
  2. import (
  3. "math/rand"
  4. "sort"
  5. "sync"
  6. "time"
  7. )
  8. // Client is a generic Kafka client. It manages connections to one or more Kafka brokers.
  9. // You MUST call Close() on a client to avoid leaks, it will not be garbage-collected
  10. // automatically when it passes out of scope. It is safe to share a client amongst many
  11. // users, however Kafka will process requests from a single client strictly in serial,
  12. // so it is generally more efficient to use the default one client per producer/consumer.
  13. type Client interface {
  14. // Config returns the Config struct of the client. This struct should not be
  15. // altered after it has been created.
  16. Config() *Config
  17. // Controller returns the cluster controller broker. It will return a
  18. // locally cached value if it's available. You can call RefreshController
  19. // to update the cached value. Requires Kafka 0.10 or higher.
  20. Controller() (*Broker, error)
  21. // RefreshController retrieves the cluster controller from fresh metadata
  22. // and stores it in the local cache. Requires Kafka 0.10 or higher.
  23. RefreshController() (*Broker, error)
  24. // Brokers returns the current set of active brokers as retrieved from cluster metadata.
  25. Brokers() []*Broker
  26. // Topics returns the set of available topics as retrieved from cluster metadata.
  27. Topics() ([]string, error)
  28. // Partitions returns the sorted list of all partition IDs for the given topic.
  29. Partitions(topic string) ([]int32, error)
  30. // WritablePartitions returns the sorted list of all writable partition IDs for
  31. // the given topic, where "writable" means "having a valid leader accepting
  32. // writes".
  33. WritablePartitions(topic string) ([]int32, error)
  34. // Leader returns the broker object that is the leader of the current
  35. // topic/partition, as determined by querying the cluster metadata.
  36. Leader(topic string, partitionID int32) (*Broker, error)
  37. // Replicas returns the set of all replica IDs for the given partition.
  38. Replicas(topic string, partitionID int32) ([]int32, error)
  39. // InSyncReplicas returns the set of all in-sync replica IDs for the given
  40. // partition. In-sync replicas are replicas which are fully caught up with
  41. // the partition leader.
  42. InSyncReplicas(topic string, partitionID int32) ([]int32, error)
  43. // OfflineReplicas returns the set of all offline replica IDs for the given
  44. // partition. Offline replicas are replicas which are offline
  45. OfflineReplicas(topic string, partitionID int32) ([]int32, error)
  46. // RefreshBrokers takes a list of addresses to be used as seed brokers.
  47. // Existing broker connections are closed and the updated list of seed brokers
  48. // will be used for the next metadata fetch.
  49. RefreshBrokers(addrs []string) error
  50. // RefreshMetadata takes a list of topics and queries the cluster to refresh the
  51. // available metadata for those topics. If no topics are provided, it will refresh
  52. // metadata for all topics.
  53. RefreshMetadata(topics ...string) error
  54. // GetOffset queries the cluster to get the most recent available offset at the
  55. // given time (in milliseconds) on the topic/partition combination.
  56. // Time should be OffsetOldest for the earliest available offset,
  57. // OffsetNewest for the offset of the message that will be produced next, or a time.
  58. GetOffset(topic string, partitionID int32, time int64) (int64, error)
  59. // Coordinator returns the coordinating broker for a consumer group. It will
  60. // return a locally cached value if it's available. You can call
  61. // RefreshCoordinator to update the cached value. This function only works on
  62. // Kafka 0.8.2 and higher.
  63. Coordinator(consumerGroup string) (*Broker, error)
  64. // RefreshCoordinator retrieves the coordinator for a consumer group and stores it
  65. // in local cache. This function only works on Kafka 0.8.2 and higher.
  66. RefreshCoordinator(consumerGroup string) error
  67. // InitProducerID retrieves information required for Idempotent Producer
  68. InitProducerID() (*InitProducerIDResponse, error)
  69. // Close shuts down all broker connections managed by this client. It is required
  70. // to call this function before a client object passes out of scope, as it will
  71. // otherwise leak memory. You must close any Producers or Consumers using a client
  72. // before you close the client.
  73. Close() error
  74. // Closed returns true if the client has already had Close called on it
  75. Closed() bool
  76. }
  77. const (
  78. // OffsetNewest stands for the log head offset, i.e. the offset that will be
  79. // assigned to the next message that will be produced to the partition. You
  80. // can send this to a client's GetOffset method to get this offset, or when
  81. // calling ConsumePartition to start consuming new messages.
  82. OffsetNewest int64 = -1
  83. // OffsetOldest stands for the oldest offset available on the broker for a
  84. // partition. You can send this to a client's GetOffset method to get this
  85. // offset, or when calling ConsumePartition to start consuming from the
  86. // oldest offset that is still available on the broker.
  87. OffsetOldest int64 = -2
  88. )
  89. type client struct {
  90. conf *Config
  91. closer, closed chan none // for shutting down background metadata updater
  92. // the broker addresses given to us through the constructor are not guaranteed to be returned in
  93. // the cluster metadata (I *think* it only returns brokers who are currently leading partitions?)
  94. // so we store them separately
  95. seedBrokers []*Broker
  96. deadSeeds []*Broker
  97. controllerID int32 // cluster controller broker id
  98. brokers map[int32]*Broker // maps broker ids to brokers
  99. metadata map[string]map[int32]*PartitionMetadata // maps topics to partition ids to metadata
  100. metadataTopics map[string]none // topics that need to collect metadata
  101. coordinators map[string]int32 // Maps consumer group names to coordinating broker IDs
  102. // If the number of partitions is large, we can get some churn calling cachedPartitions,
  103. // so the result is cached. It is important to update this value whenever metadata is changed
  104. cachedPartitionsResults map[string][maxPartitionIndex][]int32
  105. lock sync.RWMutex // protects access to the maps that hold cluster state.
  106. }
  107. // NewClient creates a new Client. It connects to one of the given broker addresses
  108. // and uses that broker to automatically fetch metadata on the rest of the kafka cluster. If metadata cannot
  109. // be retrieved from any of the given broker addresses, the client is not created.
  110. func NewClient(addrs []string, conf *Config) (Client, error) {
  111. Logger.Println("Initializing new client")
  112. if conf == nil {
  113. conf = NewConfig()
  114. }
  115. if err := conf.Validate(); err != nil {
  116. return nil, err
  117. }
  118. if len(addrs) < 1 {
  119. return nil, ConfigurationError("You must provide at least one broker address")
  120. }
  121. client := &client{
  122. conf: conf,
  123. closer: make(chan none),
  124. closed: make(chan none),
  125. brokers: make(map[int32]*Broker),
  126. metadata: make(map[string]map[int32]*PartitionMetadata),
  127. metadataTopics: make(map[string]none),
  128. cachedPartitionsResults: make(map[string][maxPartitionIndex][]int32),
  129. coordinators: make(map[string]int32),
  130. }
  131. client.randomizeSeedBrokers(addrs)
  132. if conf.Metadata.Full {
  133. // do an initial fetch of all cluster metadata by specifying an empty list of topics
  134. err := client.RefreshMetadata()
  135. switch err {
  136. case nil:
  137. break
  138. case ErrLeaderNotAvailable, ErrReplicaNotAvailable, ErrTopicAuthorizationFailed, ErrClusterAuthorizationFailed:
  139. // indicates that maybe part of the cluster is down, but is not fatal to creating the client
  140. Logger.Println(err)
  141. default:
  142. close(client.closed) // we haven't started the background updater yet, so we have to do this manually
  143. _ = client.Close()
  144. return nil, err
  145. }
  146. }
  147. go withRecover(client.backgroundMetadataUpdater)
  148. Logger.Println("Successfully initialized new client")
  149. return client, nil
  150. }
  151. func (client *client) Config() *Config {
  152. return client.conf
  153. }
  154. func (client *client) Brokers() []*Broker {
  155. client.lock.RLock()
  156. defer client.lock.RUnlock()
  157. brokers := make([]*Broker, 0, len(client.brokers))
  158. for _, broker := range client.brokers {
  159. brokers = append(brokers, broker)
  160. }
  161. return brokers
  162. }
  163. func (client *client) InitProducerID() (*InitProducerIDResponse, error) {
  164. var err error
  165. for broker := client.any(); broker != nil; broker = client.any() {
  166. req := &InitProducerIDRequest{}
  167. response, err := broker.InitProducerID(req)
  168. switch err.(type) {
  169. case nil:
  170. return response, nil
  171. default:
  172. // some error, remove that broker and try again
  173. Logger.Printf("Client got error from broker %d when issuing InitProducerID : %v\n", broker.ID(), err)
  174. _ = broker.Close()
  175. client.deregisterBroker(broker)
  176. }
  177. }
  178. return nil, err
  179. }
  180. func (client *client) Close() error {
  181. if client.Closed() {
  182. // Chances are this is being called from a defer() and the error will go unobserved
  183. // so we go ahead and log the event in this case.
  184. Logger.Printf("Close() called on already closed client")
  185. return ErrClosedClient
  186. }
  187. // shutdown and wait for the background thread before we take the lock, to avoid races
  188. close(client.closer)
  189. <-client.closed
  190. client.lock.Lock()
  191. defer client.lock.Unlock()
  192. Logger.Println("Closing Client")
  193. for _, broker := range client.brokers {
  194. safeAsyncClose(broker)
  195. }
  196. for _, broker := range client.seedBrokers {
  197. safeAsyncClose(broker)
  198. }
  199. client.brokers = nil
  200. client.metadata = nil
  201. client.metadataTopics = nil
  202. return nil
  203. }
  204. func (client *client) Closed() bool {
  205. client.lock.RLock()
  206. defer client.lock.RUnlock()
  207. return client.brokers == nil
  208. }
  209. func (client *client) Topics() ([]string, error) {
  210. if client.Closed() {
  211. return nil, ErrClosedClient
  212. }
  213. client.lock.RLock()
  214. defer client.lock.RUnlock()
  215. ret := make([]string, 0, len(client.metadata))
  216. for topic := range client.metadata {
  217. ret = append(ret, topic)
  218. }
  219. return ret, nil
  220. }
  221. func (client *client) MetadataTopics() ([]string, error) {
  222. if client.Closed() {
  223. return nil, ErrClosedClient
  224. }
  225. client.lock.RLock()
  226. defer client.lock.RUnlock()
  227. ret := make([]string, 0, len(client.metadataTopics))
  228. for topic := range client.metadataTopics {
  229. ret = append(ret, topic)
  230. }
  231. return ret, nil
  232. }
  233. func (client *client) Partitions(topic string) ([]int32, error) {
  234. if client.Closed() {
  235. return nil, ErrClosedClient
  236. }
  237. partitions := client.cachedPartitions(topic, allPartitions)
  238. if len(partitions) == 0 {
  239. err := client.RefreshMetadata(topic)
  240. if err != nil {
  241. return nil, err
  242. }
  243. partitions = client.cachedPartitions(topic, allPartitions)
  244. }
  245. // no partitions found after refresh metadata
  246. if len(partitions) == 0 {
  247. return nil, ErrUnknownTopicOrPartition
  248. }
  249. return partitions, nil
  250. }
  251. func (client *client) WritablePartitions(topic string) ([]int32, error) {
  252. if client.Closed() {
  253. return nil, ErrClosedClient
  254. }
  255. partitions := client.cachedPartitions(topic, writablePartitions)
  256. // len==0 catches when it's nil (no such topic) and the odd case when every single
  257. // partition is undergoing leader election simultaneously. Callers have to be able to handle
  258. // this function returning an empty slice (which is a valid return value) but catching it
  259. // here the first time (note we *don't* catch it below where we return ErrUnknownTopicOrPartition) triggers
  260. // a metadata refresh as a nicety so callers can just try again and don't have to manually
  261. // trigger a refresh (otherwise they'd just keep getting a stale cached copy).
  262. if len(partitions) == 0 {
  263. err := client.RefreshMetadata(topic)
  264. if err != nil {
  265. return nil, err
  266. }
  267. partitions = client.cachedPartitions(topic, writablePartitions)
  268. }
  269. if partitions == nil {
  270. return nil, ErrUnknownTopicOrPartition
  271. }
  272. return partitions, nil
  273. }
  274. func (client *client) Replicas(topic string, partitionID int32) ([]int32, error) {
  275. if client.Closed() {
  276. return nil, ErrClosedClient
  277. }
  278. metadata := client.cachedMetadata(topic, partitionID)
  279. if metadata == nil {
  280. err := client.RefreshMetadata(topic)
  281. if err != nil {
  282. return nil, err
  283. }
  284. metadata = client.cachedMetadata(topic, partitionID)
  285. }
  286. if metadata == nil {
  287. return nil, ErrUnknownTopicOrPartition
  288. }
  289. if metadata.Err == ErrReplicaNotAvailable {
  290. return dupInt32Slice(metadata.Replicas), metadata.Err
  291. }
  292. return dupInt32Slice(metadata.Replicas), nil
  293. }
  294. func (client *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) {
  295. if client.Closed() {
  296. return nil, ErrClosedClient
  297. }
  298. metadata := client.cachedMetadata(topic, partitionID)
  299. if metadata == nil {
  300. err := client.RefreshMetadata(topic)
  301. if err != nil {
  302. return nil, err
  303. }
  304. metadata = client.cachedMetadata(topic, partitionID)
  305. }
  306. if metadata == nil {
  307. return nil, ErrUnknownTopicOrPartition
  308. }
  309. if metadata.Err == ErrReplicaNotAvailable {
  310. return dupInt32Slice(metadata.Isr), metadata.Err
  311. }
  312. return dupInt32Slice(metadata.Isr), nil
  313. }
  314. func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32, error) {
  315. if client.Closed() {
  316. return nil, ErrClosedClient
  317. }
  318. metadata := client.cachedMetadata(topic, partitionID)
  319. if metadata == nil {
  320. err := client.RefreshMetadata(topic)
  321. if err != nil {
  322. return nil, err
  323. }
  324. metadata = client.cachedMetadata(topic, partitionID)
  325. }
  326. if metadata == nil {
  327. return nil, ErrUnknownTopicOrPartition
  328. }
  329. if metadata.Err == ErrReplicaNotAvailable {
  330. return dupInt32Slice(metadata.OfflineReplicas), metadata.Err
  331. }
  332. return dupInt32Slice(metadata.OfflineReplicas), nil
  333. }
  334. func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
  335. if client.Closed() {
  336. return nil, ErrClosedClient
  337. }
  338. leader, err := client.cachedLeader(topic, partitionID)
  339. if leader == nil {
  340. err = client.RefreshMetadata(topic)
  341. if err != nil {
  342. return nil, err
  343. }
  344. leader, err = client.cachedLeader(topic, partitionID)
  345. }
  346. return leader, err
  347. }
  348. func (client *client) RefreshBrokers(addrs []string) error {
  349. if client.Closed() {
  350. return ErrClosedClient
  351. }
  352. client.lock.Lock()
  353. defer client.lock.Unlock()
  354. for _, broker := range client.brokers {
  355. _ = broker.Close()
  356. delete(client.brokers, broker.ID())
  357. }
  358. client.seedBrokers = nil
  359. client.deadSeeds = nil
  360. client.randomizeSeedBrokers(addrs)
  361. return nil
  362. }
  363. func (client *client) RefreshMetadata(topics ...string) error {
  364. if client.Closed() {
  365. return ErrClosedClient
  366. }
  367. // Prior to 0.8.2, Kafka will throw exceptions on an empty topic and not return a proper
  368. // error. This handles the case by returning an error instead of sending it
  369. // off to Kafka. See: https://github.com/Shopify/sarama/pull/38#issuecomment-26362310
  370. for _, topic := range topics {
  371. if len(topic) == 0 {
  372. return ErrInvalidTopic // this is the error that 0.8.2 and later correctly return
  373. }
  374. }
  375. deadline := time.Time{}
  376. if client.conf.Metadata.Timeout > 0 {
  377. deadline = time.Now().Add(client.conf.Metadata.Timeout)
  378. }
  379. return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
  380. }
  381. func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
  382. if client.Closed() {
  383. return -1, ErrClosedClient
  384. }
  385. offset, err := client.getOffset(topic, partitionID, time)
  386. if err != nil {
  387. if err := client.RefreshMetadata(topic); err != nil {
  388. return -1, err
  389. }
  390. return client.getOffset(topic, partitionID, time)
  391. }
  392. return offset, err
  393. }
  394. func (client *client) Controller() (*Broker, error) {
  395. if client.Closed() {
  396. return nil, ErrClosedClient
  397. }
  398. if !client.conf.Version.IsAtLeast(V0_10_0_0) {
  399. return nil, ErrUnsupportedVersion
  400. }
  401. controller := client.cachedController()
  402. if controller == nil {
  403. if err := client.refreshMetadata(); err != nil {
  404. return nil, err
  405. }
  406. controller = client.cachedController()
  407. }
  408. if controller == nil {
  409. return nil, ErrControllerNotAvailable
  410. }
  411. _ = controller.Open(client.conf)
  412. return controller, nil
  413. }
  414. // deregisterController removes the cached controllerID
  415. func (client *client) deregisterController() {
  416. client.lock.Lock()
  417. defer client.lock.Unlock()
  418. delete(client.brokers, client.controllerID)
  419. }
  420. // RefreshController retrieves the cluster controller from fresh metadata
  421. // and stores it in the local cache. Requires Kafka 0.10 or higher.
  422. func (client *client) RefreshController() (*Broker, error) {
  423. if client.Closed() {
  424. return nil, ErrClosedClient
  425. }
  426. client.deregisterController()
  427. if err := client.refreshMetadata(); err != nil {
  428. return nil, err
  429. }
  430. controller := client.cachedController()
  431. if controller == nil {
  432. return nil, ErrControllerNotAvailable
  433. }
  434. _ = controller.Open(client.conf)
  435. return controller, nil
  436. }
  437. func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
  438. if client.Closed() {
  439. return nil, ErrClosedClient
  440. }
  441. coordinator := client.cachedCoordinator(consumerGroup)
  442. if coordinator == nil {
  443. if err := client.RefreshCoordinator(consumerGroup); err != nil {
  444. return nil, err
  445. }
  446. coordinator = client.cachedCoordinator(consumerGroup)
  447. }
  448. if coordinator == nil {
  449. return nil, ErrConsumerCoordinatorNotAvailable
  450. }
  451. _ = coordinator.Open(client.conf)
  452. return coordinator, nil
  453. }
  454. func (client *client) RefreshCoordinator(consumerGroup string) error {
  455. if client.Closed() {
  456. return ErrClosedClient
  457. }
  458. response, err := client.getConsumerMetadata(consumerGroup, client.conf.Metadata.Retry.Max)
  459. if err != nil {
  460. return err
  461. }
  462. client.lock.Lock()
  463. defer client.lock.Unlock()
  464. client.registerBroker(response.Coordinator)
  465. client.coordinators[consumerGroup] = response.Coordinator.ID()
  466. return nil
  467. }
  468. // private broker management helpers
  469. func (client *client) randomizeSeedBrokers(addrs []string) {
  470. random := rand.New(rand.NewSource(time.Now().UnixNano()))
  471. for _, index := range random.Perm(len(addrs)) {
  472. client.seedBrokers = append(client.seedBrokers, NewBroker(addrs[index]))
  473. }
  474. }
  475. func (client *client) updateBroker(brokers []*Broker) {
  476. var currentBroker = make(map[int32]*Broker, len(brokers))
  477. for _, broker := range brokers {
  478. currentBroker[broker.ID()] = broker
  479. if client.brokers[broker.ID()] == nil { // add new broker
  480. client.brokers[broker.ID()] = broker
  481. Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
  482. } else if broker.Addr() != client.brokers[broker.ID()].Addr() { // replace broker with new address
  483. safeAsyncClose(client.brokers[broker.ID()])
  484. client.brokers[broker.ID()] = broker
  485. Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr())
  486. }
  487. }
  488. for id, broker := range client.brokers {
  489. if _, exist := currentBroker[id]; !exist { // remove old broker
  490. safeAsyncClose(broker)
  491. delete(client.brokers, id)
  492. Logger.Printf("client/broker remove invalid broker #%d with %s", broker.ID(), broker.Addr())
  493. }
  494. }
  495. }
  496. // registerBroker makes sure a broker received by a Metadata or Coordinator request is registered
  497. // in the brokers map. It returns the broker that is registered, which may be the provided broker,
  498. // or a previously registered Broker instance. You must hold the write lock before calling this function.
  499. func (client *client) registerBroker(broker *Broker) {
  500. if client.brokers == nil {
  501. Logger.Printf("cannot register broker #%d at %s, client already closed", broker.ID(), broker.Addr())
  502. return
  503. }
  504. if client.brokers[broker.ID()] == nil {
  505. client.brokers[broker.ID()] = broker
  506. Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
  507. } else if broker.Addr() != client.brokers[broker.ID()].Addr() {
  508. safeAsyncClose(client.brokers[broker.ID()])
  509. client.brokers[broker.ID()] = broker
  510. Logger.Printf("client/brokers replaced registered broker #%d with %s", broker.ID(), broker.Addr())
  511. }
  512. }
  513. // deregisterBroker removes a broker from the seedsBroker list, and if it's
  514. // not the seedbroker, removes it from brokers map completely.
  515. func (client *client) deregisterBroker(broker *Broker) {
  516. client.lock.Lock()
  517. defer client.lock.Unlock()
  518. if len(client.seedBrokers) > 0 && broker == client.seedBrokers[0] {
  519. client.deadSeeds = append(client.deadSeeds, broker)
  520. client.seedBrokers = client.seedBrokers[1:]
  521. } else {
  522. // we do this so that our loop in `tryRefreshMetadata` doesn't go on forever,
  523. // but we really shouldn't have to; once that loop is made better this case can be
  524. // removed, and the function generally can be renamed from `deregisterBroker` to
  525. // `nextSeedBroker` or something
  526. Logger.Printf("client/brokers deregistered broker #%d at %s", broker.ID(), broker.Addr())
  527. delete(client.brokers, broker.ID())
  528. }
  529. }
  530. func (client *client) resurrectDeadBrokers() {
  531. client.lock.Lock()
  532. defer client.lock.Unlock()
  533. Logger.Printf("client/brokers resurrecting %d dead seed brokers", len(client.deadSeeds))
  534. client.seedBrokers = append(client.seedBrokers, client.deadSeeds...)
  535. client.deadSeeds = nil
  536. }
  537. func (client *client) any() *Broker {
  538. client.lock.RLock()
  539. defer client.lock.RUnlock()
  540. if len(client.seedBrokers) > 0 {
  541. _ = client.seedBrokers[0].Open(client.conf)
  542. return client.seedBrokers[0]
  543. }
  544. // not guaranteed to be random *or* deterministic
  545. for _, broker := range client.brokers {
  546. _ = broker.Open(client.conf)
  547. return broker
  548. }
  549. return nil
  550. }
  551. // private caching/lazy metadata helpers
  552. type partitionType int
  553. const (
  554. allPartitions partitionType = iota
  555. writablePartitions
  556. // If you add any more types, update the partition cache in update()
  557. // Ensure this is the last partition type value
  558. maxPartitionIndex
  559. )
  560. func (client *client) cachedMetadata(topic string, partitionID int32) *PartitionMetadata {
  561. client.lock.RLock()
  562. defer client.lock.RUnlock()
  563. partitions := client.metadata[topic]
  564. if partitions != nil {
  565. return partitions[partitionID]
  566. }
  567. return nil
  568. }
  569. func (client *client) cachedPartitions(topic string, partitionSet partitionType) []int32 {
  570. client.lock.RLock()
  571. defer client.lock.RUnlock()
  572. partitions, exists := client.cachedPartitionsResults[topic]
  573. if !exists {
  574. return nil
  575. }
  576. return partitions[partitionSet]
  577. }
  578. func (client *client) setPartitionCache(topic string, partitionSet partitionType) []int32 {
  579. partitions := client.metadata[topic]
  580. if partitions == nil {
  581. return nil
  582. }
  583. ret := make([]int32, 0, len(partitions))
  584. for _, partition := range partitions {
  585. if partitionSet == writablePartitions && partition.Err == ErrLeaderNotAvailable {
  586. continue
  587. }
  588. ret = append(ret, partition.ID)
  589. }
  590. sort.Sort(int32Slice(ret))
  591. return ret
  592. }
  593. func (client *client) cachedLeader(topic string, partitionID int32) (*Broker, error) {
  594. client.lock.RLock()
  595. defer client.lock.RUnlock()
  596. partitions := client.metadata[topic]
  597. if partitions != nil {
  598. metadata, ok := partitions[partitionID]
  599. if ok {
  600. if metadata.Err == ErrLeaderNotAvailable {
  601. return nil, ErrLeaderNotAvailable
  602. }
  603. b := client.brokers[metadata.Leader]
  604. if b == nil {
  605. return nil, ErrLeaderNotAvailable
  606. }
  607. _ = b.Open(client.conf)
  608. return b, nil
  609. }
  610. }
  611. return nil, ErrUnknownTopicOrPartition
  612. }
  613. func (client *client) getOffset(topic string, partitionID int32, time int64) (int64, error) {
  614. broker, err := client.Leader(topic, partitionID)
  615. if err != nil {
  616. return -1, err
  617. }
  618. request := &OffsetRequest{}
  619. if client.conf.Version.IsAtLeast(V0_10_1_0) {
  620. request.Version = 1
  621. }
  622. request.AddBlock(topic, partitionID, time, 1)
  623. response, err := broker.GetAvailableOffsets(request)
  624. if err != nil {
  625. _ = broker.Close()
  626. return -1, err
  627. }
  628. block := response.GetBlock(topic, partitionID)
  629. if block == nil {
  630. _ = broker.Close()
  631. return -1, ErrIncompleteResponse
  632. }
  633. if block.Err != ErrNoError {
  634. return -1, block.Err
  635. }
  636. if len(block.Offsets) != 1 {
  637. return -1, ErrOffsetOutOfRange
  638. }
  639. return block.Offsets[0], nil
  640. }
  641. // core metadata update logic
  642. func (client *client) backgroundMetadataUpdater() {
  643. defer close(client.closed)
  644. if client.conf.Metadata.RefreshFrequency == time.Duration(0) {
  645. return
  646. }
  647. ticker := time.NewTicker(client.conf.Metadata.RefreshFrequency)
  648. defer ticker.Stop()
  649. for {
  650. select {
  651. case <-ticker.C:
  652. if err := client.refreshMetadata(); err != nil {
  653. Logger.Println("Client background metadata update:", err)
  654. }
  655. case <-client.closer:
  656. return
  657. }
  658. }
  659. }
  660. func (client *client) refreshMetadata() error {
  661. var topics []string
  662. if !client.conf.Metadata.Full {
  663. if specificTopics, err := client.MetadataTopics(); err != nil {
  664. return err
  665. } else if len(specificTopics) == 0 {
  666. return ErrNoTopicsToUpdateMetadata
  667. } else {
  668. topics = specificTopics
  669. }
  670. }
  671. if err := client.RefreshMetadata(topics...); err != nil {
  672. return err
  673. }
  674. return nil
  675. }
  676. func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
  677. pastDeadline := func(backoff time.Duration) bool {
  678. if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) {
  679. // we are past the deadline
  680. return true
  681. }
  682. return false
  683. }
  684. retry := func(err error) error {
  685. if attemptsRemaining > 0 {
  686. backoff := client.computeBackoff(attemptsRemaining)
  687. if pastDeadline(backoff) {
  688. Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
  689. return err
  690. }
  691. Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
  692. if backoff > 0 {
  693. time.Sleep(backoff)
  694. }
  695. return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline)
  696. }
  697. return err
  698. }
  699. broker := client.any()
  700. for ; broker != nil && !pastDeadline(0); broker = client.any() {
  701. allowAutoTopicCreation := true
  702. if len(topics) > 0 {
  703. Logger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
  704. } else {
  705. allowAutoTopicCreation = false
  706. Logger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
  707. }
  708. req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation}
  709. if client.conf.Version.IsAtLeast(V1_0_0_0) {
  710. req.Version = 5
  711. } else if client.conf.Version.IsAtLeast(V0_10_0_0) {
  712. req.Version = 1
  713. }
  714. response, err := broker.GetMetadata(req)
  715. switch err.(type) {
  716. case nil:
  717. allKnownMetaData := len(topics) == 0
  718. // valid response, use it
  719. shouldRetry, err := client.updateMetadata(response, allKnownMetaData)
  720. if shouldRetry {
  721. Logger.Println("client/metadata found some partitions to be leaderless")
  722. return retry(err) // note: err can be nil
  723. }
  724. return err
  725. case PacketEncodingError:
  726. // didn't even send, return the error
  727. return err
  728. case KError:
  729. // if SASL auth error return as this _should_ be a non retryable err for all brokers
  730. if err.(KError) == ErrSASLAuthenticationFailed {
  731. Logger.Println("client/metadata failed SASL authentication")
  732. return err
  733. }
  734. if err.(KError) == ErrTopicAuthorizationFailed {
  735. Logger.Println("client is not authorized to access this topic. The topics were: ", topics)
  736. return err
  737. }
  738. // else remove that broker and try again
  739. Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
  740. _ = broker.Close()
  741. client.deregisterBroker(broker)
  742. default:
  743. // some other error, remove that broker and try again
  744. Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
  745. _ = broker.Close()
  746. client.deregisterBroker(broker)
  747. }
  748. }
  749. if broker != nil {
  750. Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
  751. return retry(ErrOutOfBrokers)
  752. }
  753. Logger.Println("client/metadata no available broker to send metadata request to")
  754. client.resurrectDeadBrokers()
  755. return retry(ErrOutOfBrokers)
  756. }
  757. // if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
  758. func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
  759. if client.Closed() {
  760. return
  761. }
  762. client.lock.Lock()
  763. defer client.lock.Unlock()
  764. // For all the brokers we received:
  765. // - if it is a new ID, save it
  766. // - if it is an existing ID, but the address we have is stale, discard the old one and save it
  767. // - if some brokers is not exist in it, remove old broker
  768. // - otherwise ignore it, replacing our existing one would just bounce the connection
  769. client.updateBroker(data.Brokers)
  770. client.controllerID = data.ControllerID
  771. if allKnownMetaData {
  772. client.metadata = make(map[string]map[int32]*PartitionMetadata)
  773. client.metadataTopics = make(map[string]none)
  774. client.cachedPartitionsResults = make(map[string][maxPartitionIndex][]int32)
  775. }
  776. for _, topic := range data.Topics {
  777. // topics must be added firstly to `metadataTopics` to guarantee that all
  778. // requested topics must be recorded to keep them trackable for periodically
  779. // metadata refresh.
  780. if _, exists := client.metadataTopics[topic.Name]; !exists {
  781. client.metadataTopics[topic.Name] = none{}
  782. }
  783. delete(client.metadata, topic.Name)
  784. delete(client.cachedPartitionsResults, topic.Name)
  785. switch topic.Err {
  786. case ErrNoError:
  787. // no-op
  788. case ErrInvalidTopic, ErrTopicAuthorizationFailed: // don't retry, don't store partial results
  789. err = topic.Err
  790. continue
  791. case ErrUnknownTopicOrPartition: // retry, do not store partial partition results
  792. err = topic.Err
  793. retry = true
  794. continue
  795. case ErrLeaderNotAvailable: // retry, but store partial partition results
  796. retry = true
  797. default: // don't retry, don't store partial results
  798. Logger.Printf("Unexpected topic-level metadata error: %s", topic.Err)
  799. err = topic.Err
  800. continue
  801. }
  802. client.metadata[topic.Name] = make(map[int32]*PartitionMetadata, len(topic.Partitions))
  803. for _, partition := range topic.Partitions {
  804. client.metadata[topic.Name][partition.ID] = partition
  805. if partition.Err == ErrLeaderNotAvailable {
  806. retry = true
  807. }
  808. }
  809. var partitionCache [maxPartitionIndex][]int32
  810. partitionCache[allPartitions] = client.setPartitionCache(topic.Name, allPartitions)
  811. partitionCache[writablePartitions] = client.setPartitionCache(topic.Name, writablePartitions)
  812. client.cachedPartitionsResults[topic.Name] = partitionCache
  813. }
  814. return
  815. }
  816. func (client *client) cachedCoordinator(consumerGroup string) *Broker {
  817. client.lock.RLock()
  818. defer client.lock.RUnlock()
  819. if coordinatorID, ok := client.coordinators[consumerGroup]; ok {
  820. return client.brokers[coordinatorID]
  821. }
  822. return nil
  823. }
  824. func (client *client) cachedController() *Broker {
  825. client.lock.RLock()
  826. defer client.lock.RUnlock()
  827. return client.brokers[client.controllerID]
  828. }
  829. func (client *client) computeBackoff(attemptsRemaining int) time.Duration {
  830. if client.conf.Metadata.Retry.BackoffFunc != nil {
  831. maxRetries := client.conf.Metadata.Retry.Max
  832. retries := maxRetries - attemptsRemaining
  833. return client.conf.Metadata.Retry.BackoffFunc(retries, maxRetries)
  834. }
  835. return client.conf.Metadata.Retry.Backoff
  836. }
  837. func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {
  838. retry := func(err error) (*FindCoordinatorResponse, error) {
  839. if attemptsRemaining > 0 {
  840. backoff := client.computeBackoff(attemptsRemaining)
  841. Logger.Printf("client/coordinator retrying after %dms... (%d attempts remaining)\n", backoff/time.Millisecond, attemptsRemaining)
  842. time.Sleep(backoff)
  843. return client.getConsumerMetadata(consumerGroup, attemptsRemaining-1)
  844. }
  845. return nil, err
  846. }
  847. for broker := client.any(); broker != nil; broker = client.any() {
  848. Logger.Printf("client/coordinator requesting coordinator for consumergroup %s from %s\n", consumerGroup, broker.Addr())
  849. request := new(FindCoordinatorRequest)
  850. request.CoordinatorKey = consumerGroup
  851. request.CoordinatorType = CoordinatorGroup
  852. response, err := broker.FindCoordinator(request)
  853. if err != nil {
  854. Logger.Printf("client/coordinator request to broker %s failed: %s\n", broker.Addr(), err)
  855. switch err.(type) {
  856. case PacketEncodingError:
  857. return nil, err
  858. default:
  859. _ = broker.Close()
  860. client.deregisterBroker(broker)
  861. continue
  862. }
  863. }
  864. switch response.Err {
  865. case ErrNoError:
  866. Logger.Printf("client/coordinator coordinator for consumergroup %s is #%d (%s)\n", consumerGroup, response.Coordinator.ID(), response.Coordinator.Addr())
  867. return response, nil
  868. case ErrConsumerCoordinatorNotAvailable:
  869. Logger.Printf("client/coordinator coordinator for consumer group %s is not available\n", consumerGroup)
  870. // This is very ugly, but this scenario will only happen once per cluster.
  871. // The __consumer_offsets topic only has to be created one time.
  872. // The number of partitions not configurable, but partition 0 should always exist.
  873. if _, err := client.Leader("__consumer_offsets", 0); err != nil {
  874. Logger.Printf("client/coordinator the __consumer_offsets topic is not initialized completely yet. Waiting 2 seconds...\n")
  875. time.Sleep(2 * time.Second)
  876. }
  877. return retry(ErrConsumerCoordinatorNotAvailable)
  878. case ErrGroupAuthorizationFailed:
  879. Logger.Printf("client was not authorized to access group %s while attempting to find coordinator", consumerGroup)
  880. return retry(ErrGroupAuthorizationFailed)
  881. default:
  882. return nil, response.Err
  883. }
  884. }
  885. Logger.Println("client/coordinator no available broker to send consumer metadata request to")
  886. client.resurrectDeadBrokers()
  887. return retry(ErrOutOfBrokers)
  888. }
  889. // nopCloserClient embeds an existing Client, but disables
  890. // the Close method (yet all other methods pass
  891. // through unchanged). This is for use in larger structs
  892. // where it is undesirable to close the client that was
  893. // passed in by the caller.
  894. type nopCloserClient struct {
  895. Client
  896. }
  897. // Close intercepts and purposely does not call the underlying
  898. // client's Close() method.
  899. func (ncc *nopCloserClient) Close() error {
  900. return nil
  901. }