metadata_cache.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package kafka
  2. import (
  3. "sort"
  4. "sync"
  5. )
  6. type metadataCache struct {
  7. client *Client
  8. brokers map[int32]*Broker // maps broker ids to brokers
  9. leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
  10. lock sync.RWMutex // protects access to the maps, only one since they're always accessed together
  11. }
  12. func newMetadataCache(client *Client, host string, port int32) (*metadataCache, error) {
  13. mc := new(metadataCache)
  14. starter, err := NewBroker(host, port)
  15. if err != nil {
  16. return nil, err
  17. }
  18. mc.client = client
  19. mc.brokers = make(map[int32]*Broker)
  20. mc.leaders = make(map[string]map[int32]int32)
  21. mc.brokers[starter.id] = starter
  22. // do an initial fetch of all cluster metadata by specifing an empty list of topics
  23. err = mc.refreshTopics(make([]*string, 0))
  24. if err != nil {
  25. return nil, err
  26. }
  27. return mc, nil
  28. }
  29. func (mc *metadataCache) leader(topic string, partition_id int32) *Broker {
  30. mc.lock.RLock()
  31. defer mc.lock.RUnlock()
  32. partitions := mc.leaders[topic]
  33. if partitions != nil {
  34. leader := partitions[partition_id]
  35. if leader == -1 {
  36. return nil
  37. } else {
  38. return mc.brokers[leader]
  39. }
  40. }
  41. return nil
  42. }
  43. func (mc *metadataCache) any() *Broker {
  44. mc.lock.RLock()
  45. defer mc.lock.RUnlock()
  46. for _, broker := range mc.brokers {
  47. return broker
  48. }
  49. return nil
  50. }
  51. func (mc *metadataCache) partitions(topic string) []int32 {
  52. mc.lock.RLock()
  53. defer mc.lock.RUnlock()
  54. partitions := mc.leaders[topic]
  55. if partitions == nil {
  56. return nil
  57. }
  58. ret := make([]int32, len(partitions))
  59. for id, _ := range partitions {
  60. ret = append(ret, id)
  61. }
  62. sort.Sort(int32Slice(ret))
  63. return ret
  64. }
  65. func (mc *metadataCache) refreshTopics(topics []*string) error {
  66. broker := mc.any()
  67. if broker == nil {
  68. return OutOfBrokers{}
  69. }
  70. response, err := broker.RequestMetadata(mc.client.id, &MetadataRequest{topics})
  71. if err != nil {
  72. return err
  73. }
  74. mc.lock.Lock()
  75. defer mc.lock.Unlock()
  76. for i := range response.Brokers {
  77. broker := &response.Brokers[i]
  78. err = broker.Connect()
  79. if err != nil {
  80. return err
  81. }
  82. mc.brokers[broker.id] = broker
  83. }
  84. for i := range response.Topics {
  85. topic := &response.Topics[i]
  86. if topic.Err != NO_ERROR {
  87. return topic.Err
  88. }
  89. mc.leaders[*topic.Name] = make(map[int32]int32, len(topic.Partitions))
  90. for j := range topic.Partitions {
  91. partition := &topic.Partitions[j]
  92. if partition.Err != NO_ERROR {
  93. return partition.Err
  94. }
  95. mc.leaders[*topic.Name][partition.Id] = partition.Leader
  96. }
  97. }
  98. return nil
  99. }
  100. func (mc *metadataCache) refreshTopic(topic string) error {
  101. tmp := make([]*string, 1)
  102. tmp[0] = &topic
  103. return mc.refreshTopics(tmp)
  104. }