metadata_cache.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. package kafka
  2. import (
  3. "sort"
  4. "sync"
  5. )
  6. type metadataCache struct {
  7. client *Client
  8. brokers map[int32]*Broker // maps broker ids to brokers
  9. leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
  10. lock sync.RWMutex // protects access to the maps, only one since they're always accessed together
  11. }
  12. func newMetadataCache(client *Client, host string, port int32) (*metadataCache, error) {
  13. mc := new(metadataCache)
  14. starter := NewBroker(host, port)
  15. err := starter.Connect()
  16. if err != nil {
  17. return nil, err
  18. }
  19. mc.client = client
  20. mc.brokers = make(map[int32]*Broker)
  21. mc.leaders = make(map[string]map[int32]int32)
  22. mc.brokers[starter.ID()] = starter
  23. // do an initial fetch of all cluster metadata by specifing an empty list of topics
  24. err = mc.refreshTopics(make([]*string, 0))
  25. if err != nil {
  26. return nil, err
  27. }
  28. return mc, nil
  29. }
  30. func (mc *metadataCache) removeBroker(broker *Broker) {
  31. if broker == nil {
  32. return
  33. }
  34. mc.lock.RLock()
  35. defer mc.lock.RUnlock()
  36. delete(mc.brokers, broker.ID())
  37. go broker.Close()
  38. }
  39. func (mc *metadataCache) leader(topic string, partition_id int32) *Broker {
  40. mc.lock.RLock()
  41. defer mc.lock.RUnlock()
  42. partitions := mc.leaders[topic]
  43. if partitions != nil {
  44. leader := partitions[partition_id]
  45. if leader == -1 {
  46. return nil
  47. } else {
  48. return mc.brokers[leader]
  49. }
  50. }
  51. return nil
  52. }
  53. func (mc *metadataCache) any() *Broker {
  54. mc.lock.RLock()
  55. defer mc.lock.RUnlock()
  56. for _, broker := range mc.brokers {
  57. return broker
  58. }
  59. return nil
  60. }
  61. func (mc *metadataCache) partitions(topic string) []int32 {
  62. mc.lock.RLock()
  63. defer mc.lock.RUnlock()
  64. partitions := mc.leaders[topic]
  65. if partitions == nil {
  66. return nil
  67. }
  68. ret := make([]int32, len(partitions))
  69. for id, _ := range partitions {
  70. ret = append(ret, id)
  71. }
  72. sort.Sort(int32Slice(ret))
  73. return ret
  74. }
  75. func (mc *metadataCache) update(data *MetadataResponse) error {
  76. // connect to the brokers before taking the lock, as this can take a while
  77. // to timeout if one of them isn't reachable
  78. for _, broker := range data.Brokers {
  79. err := broker.Connect()
  80. if err != nil {
  81. return err
  82. }
  83. }
  84. mc.lock.Lock()
  85. defer mc.lock.Unlock()
  86. for _, broker := range data.Brokers {
  87. if mc.brokers[broker.ID()] != nil {
  88. go mc.brokers[broker.ID()].Close()
  89. }
  90. mc.brokers[broker.ID()] = broker
  91. }
  92. for _, topic := range data.Topics {
  93. if topic.Err != NO_ERROR {
  94. return topic.Err
  95. }
  96. mc.leaders[*topic.Name] = make(map[int32]int32, len(topic.Partitions))
  97. for _, partition := range topic.Partitions {
  98. if partition.Err != NO_ERROR {
  99. return partition.Err
  100. }
  101. mc.leaders[*topic.Name][partition.Id] = partition.Leader
  102. }
  103. }
  104. return nil
  105. }
  106. func (mc *metadataCache) refreshTopics(topics []*string) error {
  107. for broker := mc.any(); broker != nil; broker = mc.any() {
  108. response, err := broker.GetMetadata(mc.client.id, &MetadataRequest{topics})
  109. switch err.(type) {
  110. case nil:
  111. // valid response, use it
  112. return mc.update(response)
  113. case EncodingError:
  114. // didn't even send, return the error
  115. return err
  116. }
  117. // some other error, remove that broker and try again
  118. mc.removeBroker(broker)
  119. }
  120. return OutOfBrokers{}
  121. }
  122. func (mc *metadataCache) refreshTopic(topic string) error {
  123. tmp := make([]*string, 1)
  124. tmp[0] = &topic
  125. return mc.refreshTopics(tmp)
  126. }