metadata_cache.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. package kafka
  2. import k "sarama/protocol"
  3. import (
  4. "sort"
  5. "sync"
  6. )
  7. type metadataCache struct {
  8. client *Client
  9. brokers map[int32]*k.Broker // maps broker ids to brokers
  10. leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
  11. lock sync.RWMutex // protects access to the maps, only one since they're always accessed together
  12. }
  13. func newMetadataCache(client *Client, host string, port int32) (*metadataCache, error) {
  14. mc := new(metadataCache)
  15. starter := k.NewBroker(host, port)
  16. err := starter.Connect()
  17. if err != nil {
  18. return nil, err
  19. }
  20. mc.client = client
  21. mc.brokers = make(map[int32]*k.Broker)
  22. mc.leaders = make(map[string]map[int32]int32)
  23. mc.brokers[starter.ID()] = starter
  24. // do an initial fetch of all cluster metadata by specifing an empty list of topics
  25. err = mc.refreshTopics(make([]string, 0))
  26. if err != nil {
  27. return nil, err
  28. }
  29. return mc, nil
  30. }
  31. func (mc *metadataCache) removeBroker(broker *k.Broker) {
  32. if broker == nil {
  33. return
  34. }
  35. mc.lock.RLock()
  36. defer mc.lock.RUnlock()
  37. delete(mc.brokers, broker.ID())
  38. go broker.Close()
  39. }
  40. func (mc *metadataCache) leader(topic string, partition_id int32) *k.Broker {
  41. mc.lock.RLock()
  42. defer mc.lock.RUnlock()
  43. partitions := mc.leaders[topic]
  44. if partitions != nil {
  45. leader := partitions[partition_id]
  46. if leader == -1 {
  47. return nil
  48. } else {
  49. return mc.brokers[leader]
  50. }
  51. }
  52. return nil
  53. }
  54. func (mc *metadataCache) any() *k.Broker {
  55. mc.lock.RLock()
  56. defer mc.lock.RUnlock()
  57. for _, broker := range mc.brokers {
  58. return broker
  59. }
  60. return nil
  61. }
  62. func (mc *metadataCache) partitions(topic string) []int32 {
  63. mc.lock.RLock()
  64. defer mc.lock.RUnlock()
  65. partitions := mc.leaders[topic]
  66. if partitions == nil {
  67. return nil
  68. }
  69. ret := make([]int32, len(partitions))
  70. for id, _ := range partitions {
  71. ret = append(ret, id)
  72. }
  73. sort.Sort(int32Slice(ret))
  74. return ret
  75. }
  76. func (mc *metadataCache) update(data *k.MetadataResponse) error {
  77. // connect to the brokers before taking the lock, as this can take a while
  78. // to timeout if one of them isn't reachable
  79. for _, broker := range data.Brokers {
  80. err := broker.Connect()
  81. if err != nil {
  82. return err
  83. }
  84. }
  85. mc.lock.Lock()
  86. defer mc.lock.Unlock()
  87. for _, broker := range data.Brokers {
  88. if mc.brokers[broker.ID()] != nil {
  89. go mc.brokers[broker.ID()].Close()
  90. }
  91. mc.brokers[broker.ID()] = broker
  92. }
  93. for _, topic := range data.Topics {
  94. if topic.Err != k.NO_ERROR {
  95. return topic.Err
  96. }
  97. mc.leaders[topic.Name] = make(map[int32]int32, len(topic.Partitions))
  98. for _, partition := range topic.Partitions {
  99. if partition.Err != k.NO_ERROR {
  100. return partition.Err
  101. }
  102. mc.leaders[topic.Name][partition.Id] = partition.Leader
  103. }
  104. }
  105. return nil
  106. }
  107. func (mc *metadataCache) refreshTopics(topics []string) error {
  108. for broker := mc.any(); broker != nil; broker = mc.any() {
  109. response, err := broker.GetMetadata(mc.client.id, &k.MetadataRequest{Topics: topics})
  110. switch err.(type) {
  111. case nil:
  112. // valid response, use it
  113. return mc.update(response)
  114. case k.EncodingError:
  115. // didn't even send, return the error
  116. return err
  117. }
  118. // some other error, remove that broker and try again
  119. mc.removeBroker(broker)
  120. }
  121. return OutOfBrokers
  122. }
  123. func (mc *metadataCache) refreshTopic(topic string) error {
  124. tmp := make([]string, 1)
  125. tmp[0] = topic
  126. return mc.refreshTopics(tmp)
  127. }