broker_manager.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. package kafka
  2. import "sync"
  3. type topicPartition struct {
  4. topic string
  5. partition int32
  6. }
  7. type brokerManager struct {
  8. client *Client
  9. defaultBroker *broker
  10. brokers map[int32]*broker
  11. leaders map[topicPartition]int32
  12. brokersLock sync.RWMutex
  13. }
  14. func newBrokerManager(client *Client, host string, port int32) (bm *brokerManager, err error) {
  15. bm = new(brokerManager)
  16. bm.client = client
  17. // we create a new broker object as the default 'master' broker
  18. // if this broker is also a leader then we will end up with two broker objects for it, but that's not a big deal
  19. bm.defaultBroker, err = newBroker(host, port)
  20. if err != nil {
  21. return nil, err
  22. }
  23. bm.brokers = make(map[int32]*broker)
  24. bm.leaders = make(map[topicPartition]int32)
  25. // do an initial fetch of all cluster metadata by specifing an empty list of topics
  26. err = bm.refreshTopics(make([]*string, 0))
  27. if err != nil {
  28. return nil, err
  29. }
  30. return bm, nil
  31. }
  32. func (bm *brokerManager) getLeader(topic string, partition int32) (*broker, error) {
  33. var broker *broker = nil
  34. bm.brokersLock.RLock()
  35. id, ok := bm.leaders[topicPartition{topic, partition}]
  36. if ok {
  37. broker = bm.brokers[id]
  38. }
  39. bm.brokersLock.RUnlock()
  40. if broker == nil {
  41. err := bm.refreshTopic(topic)
  42. if err != nil {
  43. return nil, err
  44. }
  45. bm.brokersLock.RLock()
  46. broker = bm.brokers[bm.leaders[topicPartition{topic, partition}]]
  47. bm.brokersLock.RUnlock()
  48. }
  49. if broker == nil {
  50. return nil, UNKNOWN_TOPIC_OR_PARTITION
  51. }
  52. return broker, nil
  53. }
  54. func (bm *brokerManager) tryLeader(topic string, partition int32, req encoder, res decoder) error {
  55. b, err := bm.getLeader(topic, partition)
  56. if err != nil {
  57. return err
  58. }
  59. responseChan, err := b.sendRequest(bm.client.id, req)
  60. if err != nil {
  61. // errors that would make us refresh the broker metadata don't get returned here,
  62. // they'd come through responseChan.errors, so it's safe to just return here
  63. return err
  64. }
  65. select {
  66. case buf := <-responseChan.packets:
  67. decoder := realDecoder{raw: buf}
  68. err = res.decode(&decoder)
  69. case err = <-responseChan.errors:
  70. }
  71. if err == nil {
  72. // successfully received and decoded the packet, we're done
  73. return nil
  74. }
  75. // we got an error, so discard that broker
  76. bm.brokersLock.Lock()
  77. delete(bm.brokers, b.id)
  78. bm.brokersLock.Unlock()
  79. // then do the whole thing again
  80. // (the metadata for the broker gets refreshed automatically in getLeader)
  81. // if we get a broker here, it's guaranteed to be fresh, so if it fails then
  82. // we pass that error back to the user (as opposed to retrying indefinitely)
  83. b, err = bm.getLeader(topic, partition)
  84. if err != nil {
  85. return err
  86. }
  87. responseChan, err = b.sendRequest(bm.client.id, req)
  88. if err != nil {
  89. return err
  90. }
  91. select {
  92. case buf := <-responseChan.packets:
  93. decoder := realDecoder{raw: buf}
  94. err = res.decode(&decoder)
  95. return err
  96. case err = <-responseChan.errors:
  97. return err
  98. }
  99. }
  100. func (bm *brokerManager) getDefault() *broker {
  101. if bm.defaultBroker == nil {
  102. bm.brokersLock.RLock()
  103. defer bm.brokersLock.RUnlock()
  104. for _, id := range bm.leaders {
  105. bm.defaultBroker = bm.brokers[id]
  106. break
  107. }
  108. }
  109. return bm.defaultBroker
  110. }
  111. func (bm *brokerManager) tryDefaultBrokers(req encoder, res decoder) error {
  112. for b := bm.getDefault(); b != nil; b = bm.getDefault() {
  113. responseChan, err := b.sendRequest(bm.client.id, req)
  114. if err != nil {
  115. return err
  116. }
  117. select {
  118. case buf := <-responseChan.packets:
  119. decoder := realDecoder{raw: buf}
  120. err = res.decode(&decoder)
  121. return err
  122. case <-responseChan.errors:
  123. bm.defaultBroker = nil
  124. bm.brokersLock.Lock()
  125. delete(bm.brokers, b.id)
  126. bm.brokersLock.Unlock()
  127. }
  128. }
  129. return OutOfBrokers{}
  130. }
  131. func (bm *brokerManager) refreshTopics(topics []*string) error {
  132. response := new(metadata)
  133. err := bm.tryDefaultBrokers(&metadataRequest{topics}, response)
  134. if err != nil {
  135. return err
  136. }
  137. bm.brokersLock.Lock()
  138. defer bm.brokersLock.Unlock()
  139. for i := range response.brokers {
  140. broker := &response.brokers[i]
  141. bm.brokers[broker.id] = broker
  142. }
  143. for i := range response.topics {
  144. topic := &response.topics[i]
  145. if topic.err != NO_ERROR {
  146. return topic.err
  147. }
  148. for j := range topic.partitions {
  149. partition := &topic.partitions[j]
  150. if partition.err != NO_ERROR {
  151. return partition.err
  152. }
  153. bm.leaders[topicPartition{*topic.name, partition.id}] = partition.leader
  154. }
  155. }
  156. return nil
  157. }
  158. func (bm *brokerManager) refreshTopic(topic string) error {
  159. tmp := make([]*string, 1)
  160. tmp[0] = &topic
  161. return bm.refreshTopics(tmp)
  162. }