brokerManager.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103
  1. package kafka
  2. import (
  3. "errors"
  4. "sync"
  5. )
  6. type brokerKey struct {
  7. topic string
  8. partition int32
  9. }
  10. type brokerManager struct {
  11. client *Client
  12. defaultBroker *broker
  13. leaders map[brokerKey]*broker
  14. leadersLock sync.RWMutex
  15. }
  16. func newBrokerManager(client *Client, host string, port int32) (bm *brokerManager, err error) {
  17. bm = new(brokerManager)
  18. bm.client = client
  19. bm.defaultBroker, err = newBroker(host, port)
  20. if err != nil {
  21. return nil, err
  22. }
  23. bm.leaders = make(map[brokerKey]*broker)
  24. err = bm.refreshAllTopics()
  25. if err != nil {
  26. return nil, err
  27. }
  28. return bm, nil
  29. }
  30. func (bm *brokerManager) lookupLeader(topic string, partition int32) *broker {
  31. bm.leadersLock.RLock()
  32. defer bm.leadersLock.RUnlock()
  33. return bm.leaders[brokerKey{topic, partition}]
  34. }
  35. func (bm *brokerManager) getDefault() *broker {
  36. if bm.defaultBroker == nil {
  37. bm.leadersLock.RLock()
  38. defer bm.leadersLock.RUnlock()
  39. for _, bm.defaultBroker = range bm.leaders {
  40. break
  41. }
  42. }
  43. return bm.defaultBroker
  44. }
  45. func (bm *brokerManager) refreshTopics(topics []*string) error {
  46. b := bm.getDefault()
  47. if b == nil {
  48. return errors.New("kafka: lost all broker connections")
  49. }
  50. responseChan, err := b.sendRequest(bm.client.id, REQUEST_METADATA, &metadataRequest{topics})
  51. if err != nil {
  52. // TODO
  53. }
  54. decoder := realDecoder{raw: <-responseChan}
  55. response := new(metadata)
  56. err = response.decode(&decoder)
  57. if err != nil {
  58. // how badly should we blow up here ?
  59. }
  60. bm.leadersLock.Lock()
  61. defer bm.leadersLock.Unlock()
  62. for i := range response.topics {
  63. topic := &response.topics[i]
  64. if topic.err != NO_ERROR {
  65. return topic.err
  66. }
  67. for j := range topic.partitions {
  68. partition := &topic.partitions[j]
  69. if partition.err != NO_ERROR {
  70. return partition.err
  71. }
  72. bm.leaders[brokerKey{*topic.name, partition.id}] = response.brokerById(partition.leader)
  73. }
  74. }
  75. return nil
  76. }
  77. func (bm *brokerManager) refreshTopic(topic string) error {
  78. tmp := make([]*string, 1)
  79. tmp[0] = &topic
  80. return bm.refreshTopics(tmp)
  81. }
  82. func (bm *brokerManager) refreshAllTopics() error {
  83. tmp := make([]*string, 0)
  84. return bm.refreshTopics(tmp)
  85. }