peer_hub.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. package etcd
  2. import (
  3. "errors"
  4. "fmt"
  5. "io/ioutil"
  6. "net/http"
  7. "net/url"
  8. "path"
  9. "sync"
  10. )
  11. var (
  12. errUnknownPeer = errors.New("unknown peer")
  13. )
  14. type peerGetter interface {
  15. peer(id int64) (*peer, error)
  16. }
  17. type peerHub struct {
  18. mu sync.RWMutex
  19. seeds map[string]bool
  20. peers map[int64]*peer
  21. c *http.Client
  22. }
  23. func newPeerHub(seeds []string, c *http.Client) *peerHub {
  24. h := &peerHub{
  25. peers: make(map[int64]*peer),
  26. seeds: make(map[string]bool),
  27. c: c,
  28. }
  29. for _, seed := range seeds {
  30. h.seeds[seed] = true
  31. }
  32. return h
  33. }
  34. func (h *peerHub) stop() {
  35. for _, p := range h.peers {
  36. p.stop()
  37. }
  38. tr := h.c.Transport.(*http.Transport)
  39. tr.CloseIdleConnections()
  40. }
  41. func (h *peerHub) peer(id int64) (*peer, error) {
  42. h.mu.Lock()
  43. defer h.mu.Unlock()
  44. if p, ok := h.peers[id]; ok {
  45. return p, nil
  46. }
  47. return nil, fmt.Errorf("peer %d not found", id)
  48. }
  49. func (h *peerHub) add(id int64, rawurl string) error {
  50. u, err := url.Parse(rawurl)
  51. if err != nil {
  52. return err
  53. }
  54. u.Path = raftPrefix
  55. h.mu.Lock()
  56. defer h.mu.Unlock()
  57. h.peers[id] = newPeer(u.String(), h.c)
  58. return nil
  59. }
  60. func (h *peerHub) send(nodeId int64, data []byte) error {
  61. h.mu.RLock()
  62. p := h.peers[nodeId]
  63. h.mu.RUnlock()
  64. if p == nil {
  65. err := h.fetch(nodeId)
  66. if err != nil {
  67. return errUnknownPeer
  68. }
  69. }
  70. h.mu.RLock()
  71. p = h.peers[nodeId]
  72. h.mu.RUnlock()
  73. return p.send(data)
  74. }
  75. func (h *peerHub) fetch(nodeId int64) error {
  76. for seed := range h.seeds {
  77. if err := h.seedFetch(seed, nodeId); err == nil {
  78. return nil
  79. }
  80. }
  81. return fmt.Errorf("cannot fetch the address of node %d", nodeId)
  82. }
  83. func (h *peerHub) seedFetch(seedurl string, id int64) error {
  84. if _, err := h.peer(id); err == nil {
  85. return nil
  86. }
  87. u, err := url.Parse(seedurl)
  88. if err != nil {
  89. return fmt.Errorf("cannot parse the url of the given seed")
  90. }
  91. u.Path = path.Join("/raft/cfg", fmt.Sprint(id))
  92. resp, err := h.c.Get(u.String())
  93. if err != nil {
  94. return fmt.Errorf("cannot reach %v", u)
  95. }
  96. defer resp.Body.Close()
  97. if resp.StatusCode != http.StatusOK {
  98. return fmt.Errorf("cannot find node %d via %s", id, seedurl)
  99. }
  100. b, err := ioutil.ReadAll(resp.Body)
  101. if err != nil {
  102. return fmt.Errorf("cannot reach %v", u)
  103. }
  104. if err := h.add(id, string(b)); err != nil {
  105. return fmt.Errorf("cannot parse the url of node %d: %v", id, err)
  106. }
  107. return nil
  108. }