metadata_cache.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. package kafka
  2. import (
  3. "sort"
  4. "sync"
  5. )
  6. type metadataCache struct {
  7. client *Client
  8. brokers map[int32]*Broker // maps broker ids to brokers
  9. leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
  10. lock sync.RWMutex // protects access to the maps, only one since they're always accessed together
  11. }
  12. func newMetadataCache(client *Client, host string, port int32) (*metadataCache, error) {
  13. mc := new(metadataCache)
  14. starter := NewBroker(host, port)
  15. err := starter.Connect()
  16. if err != nil {
  17. return nil, err
  18. }
  19. mc.client = client
  20. mc.brokers = make(map[int32]*Broker)
  21. mc.leaders = make(map[string]map[int32]int32)
  22. mc.brokers[starter.id] = starter
  23. // do an initial fetch of all cluster metadata by specifing an empty list of topics
  24. err = mc.refreshTopics(make([]*string, 0))
  25. if err != nil {
  26. return nil, err
  27. }
  28. return mc, nil
  29. }
  30. func (mc *metadataCache) leader(topic string, partition_id int32) *Broker {
  31. mc.lock.RLock()
  32. defer mc.lock.RUnlock()
  33. partitions := mc.leaders[topic]
  34. if partitions != nil {
  35. leader := partitions[partition_id]
  36. if leader == -1 {
  37. return nil
  38. } else {
  39. return mc.brokers[leader]
  40. }
  41. }
  42. return nil
  43. }
  44. func (mc *metadataCache) any() *Broker {
  45. mc.lock.RLock()
  46. defer mc.lock.RUnlock()
  47. for _, broker := range mc.brokers {
  48. return broker
  49. }
  50. return nil
  51. }
  52. func (mc *metadataCache) partitions(topic string) []int32 {
  53. mc.lock.RLock()
  54. defer mc.lock.RUnlock()
  55. partitions := mc.leaders[topic]
  56. if partitions == nil {
  57. return nil
  58. }
  59. ret := make([]int32, len(partitions))
  60. for id, _ := range partitions {
  61. ret = append(ret, id)
  62. }
  63. sort.Sort(int32Slice(ret))
  64. return ret
  65. }
  66. func (mc *metadataCache) refreshTopics(topics []*string) error {
  67. broker := mc.any()
  68. if broker == nil {
  69. return OutOfBrokers{}
  70. }
  71. response, err := broker.RequestMetadata(mc.client.id, &MetadataRequest{topics})
  72. if err != nil {
  73. return err
  74. }
  75. mc.lock.Lock()
  76. defer mc.lock.Unlock()
  77. for i := range response.Brokers {
  78. broker := &response.Brokers[i]
  79. err = broker.Connect()
  80. if err != nil {
  81. return err
  82. }
  83. mc.brokers[broker.id] = broker
  84. }
  85. for i := range response.Topics {
  86. topic := &response.Topics[i]
  87. if topic.Err != NO_ERROR {
  88. return topic.Err
  89. }
  90. mc.leaders[*topic.Name] = make(map[int32]int32, len(topic.Partitions))
  91. for j := range topic.Partitions {
  92. partition := &topic.Partitions[j]
  93. if partition.Err != NO_ERROR {
  94. return partition.Err
  95. }
  96. mc.leaders[*topic.Name][partition.Id] = partition.Leader
  97. }
  98. }
  99. return nil
  100. }
  101. func (mc *metadataCache) refreshTopic(topic string) error {
  102. tmp := make([]*string, 1)
  103. tmp[0] = &topic
  104. return mc.refreshTopics(tmp)
  105. }