metadata_cache.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package kafka
  2. import k "sarama/protocol"
  3. import (
  4. "sort"
  5. "sync"
  6. )
  7. type metadataCache struct {
  8. client *Client
  9. brokers map[int32]*k.Broker // maps broker ids to brokers
  10. leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
  11. lock sync.RWMutex // protects access to the maps, only one since they're always accessed together
  12. }
  13. func newMetadataCache(client *Client, host string, port int32) (*metadataCache, error) {
  14. tmp := k.NewBroker(host, port)
  15. err := tmp.Connect()
  16. if err != nil {
  17. return nil, err
  18. }
  19. mc := new(metadataCache)
  20. mc.client = client
  21. mc.brokers = make(map[int32]*k.Broker)
  22. mc.leaders = make(map[string]map[int32]int32)
  23. // add it temporarily with an invalid ID so that refreshTopics can find it
  24. mc.brokers[-1] = tmp
  25. // do an initial fetch of all cluster metadata by specifing an empty list of topics
  26. err = mc.refreshTopics(make([]string, 0))
  27. if err != nil {
  28. mc.closeAll() // this closes tmp, since it's still in the brokers hash
  29. return nil, err
  30. }
  31. // now remove our tmp broker - the successful metadata request will have returned it
  32. // with a valid ID, so it will already be in the hash somewhere else and we don't need
  33. // the incomplete tmp one anymore
  34. go mc.brokers[-1].Close()
  35. delete(mc.brokers, -1)
  36. return mc, nil
  37. }
  38. func (mc *metadataCache) closeAll() {
  39. mc.lock.Lock()
  40. defer mc.lock.Unlock()
  41. for _, broker := range mc.brokers {
  42. go broker.Close()
  43. }
  44. mc.brokers = nil
  45. mc.leaders = nil
  46. }
  47. func (mc *metadataCache) leader(topic string, partition_id int32) *k.Broker {
  48. mc.lock.RLock()
  49. defer mc.lock.RUnlock()
  50. partitions := mc.leaders[topic]
  51. if partitions != nil {
  52. leader := partitions[partition_id]
  53. if leader == -1 {
  54. return nil
  55. } else {
  56. return mc.brokers[leader]
  57. }
  58. }
  59. return nil
  60. }
  61. func (mc *metadataCache) any() *k.Broker {
  62. mc.lock.RLock()
  63. defer mc.lock.RUnlock()
  64. for _, broker := range mc.brokers {
  65. return broker
  66. }
  67. return nil
  68. }
  69. func (mc *metadataCache) partitions(topic string) []int32 {
  70. mc.lock.RLock()
  71. defer mc.lock.RUnlock()
  72. partitions := mc.leaders[topic]
  73. if partitions == nil {
  74. return nil
  75. }
  76. ret := make([]int32, len(partitions))
  77. for id, _ := range partitions {
  78. ret = append(ret, id)
  79. }
  80. sort.Sort(int32Slice(ret))
  81. return ret
  82. }
  83. func (mc *metadataCache) update(data *k.MetadataResponse) error {
  84. // First discard brokers that we already know about. This avoids bouncing TCP connections,
  85. // and especially avoids closing valid connections out from under clients who may be trying
  86. // to use them. We only need a read-lock for this.
  87. var newBrokers []*k.Broker
  88. mc.lock.RLock()
  89. for _, broker := range data.Brokers {
  90. if !broker.Equals(mc.brokers[broker.ID()]) {
  91. newBrokers = append(newBrokers, broker)
  92. }
  93. }
  94. mc.lock.RUnlock()
  95. // connect to the brokers before taking the lock, as this can take a while
  96. // to timeout if one of them isn't reachable
  97. for _, broker := range newBrokers {
  98. err := broker.Connect()
  99. if err != nil {
  100. return err
  101. }
  102. }
  103. mc.lock.Lock()
  104. defer mc.lock.Unlock()
  105. for _, broker := range newBrokers {
  106. if mc.brokers[broker.ID()] != nil {
  107. go mc.brokers[broker.ID()].Close()
  108. }
  109. mc.brokers[broker.ID()] = broker
  110. }
  111. for _, topic := range data.Topics {
  112. if topic.Err != k.NO_ERROR {
  113. return topic.Err
  114. }
  115. mc.leaders[topic.Name] = make(map[int32]int32, len(topic.Partitions))
  116. for _, partition := range topic.Partitions {
  117. if partition.Err != k.NO_ERROR {
  118. return partition.Err
  119. }
  120. mc.leaders[topic.Name][partition.Id] = partition.Leader
  121. }
  122. }
  123. return nil
  124. }
  125. func (mc *metadataCache) refreshTopics(topics []string) error {
  126. for broker := mc.any(); broker != nil; broker = mc.any() {
  127. response, err := broker.GetMetadata(mc.client.id, &k.MetadataRequest{Topics: topics})
  128. switch err.(type) {
  129. case nil:
  130. // valid response, use it
  131. return mc.update(response)
  132. case k.EncodingError:
  133. // didn't even send, return the error
  134. return err
  135. }
  136. // some other error, remove that broker and try again
  137. mc.lock.Lock()
  138. delete(mc.brokers, broker.ID())
  139. go broker.Close()
  140. mc.lock.Unlock()
  141. }
  142. return OutOfBrokers
  143. }
  144. func (mc *metadataCache) refreshTopic(topic string) error {
  145. tmp := make([]string, 1)
  146. tmp[0] = topic
  147. return mc.refreshTopics(tmp)
  148. }