metadata_cache.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package kafka
  2. import (
  3. "sort"
  4. "sync"
  5. )
  6. type metadataCache struct {
  7. client *Client
  8. brokers map[int32]*broker // maps broker ids to brokers
  9. leaders map[string]map[int32]int32 // maps topics to partition ids to broker ids
  10. lock sync.RWMutex // protects access to the maps, only one since they're always accessed together
  11. }
  12. func newMetadataCache(client *Client, host string, port int32) (*metadataCache, error) {
  13. mc := new(metadataCache)
  14. starter, err := newBroker(host, port)
  15. if err != nil {
  16. return nil, err
  17. }
  18. mc.client = client
  19. mc.brokers = make(map[int32]*broker)
  20. mc.leaders = make(map[string]map[int32]int32)
  21. mc.brokers[starter.id] = starter
  22. // do an initial fetch of all cluster metadata by specifing an empty list of topics
  23. err = mc.refreshTopics(make([]*string, 0))
  24. if err != nil {
  25. return nil, err
  26. }
  27. return mc, nil
  28. }
  29. func (mc *metadataCache) leader(topic string, partition_id int32) *broker {
  30. mc.lock.RLock()
  31. defer mc.lock.RUnlock()
  32. partitions := mc.leaders[topic]
  33. if partitions != nil {
  34. leader := partitions[partition_id]
  35. if leader == -1 {
  36. return nil
  37. } else {
  38. return mc.brokers[leader]
  39. }
  40. }
  41. return nil
  42. }
  43. func (mc *metadataCache) any() *broker {
  44. mc.lock.RLock()
  45. defer mc.lock.RUnlock()
  46. for _, broker := range mc.brokers {
  47. return broker
  48. }
  49. return nil
  50. }
  51. func (mc *metadataCache) partitions(topic string) []int32 {
  52. mc.lock.RLock()
  53. defer mc.lock.RUnlock()
  54. partitions := mc.leaders[topic]
  55. if partitions == nil {
  56. return nil
  57. }
  58. ret := make([]int32, len(partitions))
  59. for id, _ := range partitions {
  60. ret = append(ret, id)
  61. }
  62. sort.Sort(int32Slice(ret))
  63. return ret
  64. }
  65. func (mc *metadataCache) refreshTopics(topics []*string) error {
  66. broker := mc.any()
  67. if broker == nil {
  68. return OutOfBrokers{}
  69. }
  70. response := new(metadataResponse)
  71. err := broker.SendAndReceive(mc.client.id, &metadataRequest{topics}, response)
  72. if err != nil {
  73. return err
  74. }
  75. mc.lock.Lock()
  76. defer mc.lock.Unlock()
  77. for i := range response.brokers {
  78. broker := &response.brokers[i]
  79. mc.brokers[broker.id] = broker
  80. }
  81. for i := range response.topics {
  82. topic := &response.topics[i]
  83. if topic.err != NO_ERROR {
  84. return topic.err
  85. }
  86. mc.leaders[*topic.name] = make(map[int32]int32, len(topic.partitions))
  87. for j := range topic.partitions {
  88. partition := &topic.partitions[j]
  89. if partition.err != NO_ERROR {
  90. return partition.err
  91. }
  92. mc.leaders[*topic.name][partition.id] = partition.leader
  93. }
  94. }
  95. return nil
  96. }
  97. func (mc *metadataCache) refreshTopic(topic string) error {
  98. tmp := make([]*string, 1)
  99. tmp[0] = &topic
  100. return mc.refreshTopics(tmp)
  101. }