peer_hub.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package etcd
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "net/http"
  8. "net/url"
  9. "path"
  10. "sync"
  11. "github.com/coreos/etcd/raft"
  12. )
  13. var (
  14. errUnknownPeer = errors.New("unknown peer")
  15. )
  16. type peerGetter interface {
  17. peer(id int64) (*peer, error)
  18. }
  19. type peerHub struct {
  20. mu sync.RWMutex
  21. stopped bool
  22. seeds map[string]bool
  23. peers map[int64]*peer
  24. c *http.Client
  25. }
  26. func newPeerHub(seeds []string, c *http.Client) *peerHub {
  27. h := &peerHub{
  28. peers: make(map[int64]*peer),
  29. seeds: make(map[string]bool),
  30. c: c,
  31. }
  32. for _, seed := range seeds {
  33. h.seeds[seed] = true
  34. }
  35. return h
  36. }
  37. func (h *peerHub) stop() {
  38. h.mu.Lock()
  39. defer h.mu.Unlock()
  40. h.stopped = true
  41. for _, p := range h.peers {
  42. p.stop()
  43. }
  44. tr := h.c.Transport.(*http.Transport)
  45. tr.CloseIdleConnections()
  46. }
  47. func (h *peerHub) peer(id int64) (*peer, error) {
  48. h.mu.Lock()
  49. defer h.mu.Unlock()
  50. if h.stopped {
  51. return nil, fmt.Errorf("peerHub stopped")
  52. }
  53. if p, ok := h.peers[id]; ok {
  54. return p, nil
  55. }
  56. return nil, fmt.Errorf("peer %d not found", id)
  57. }
  58. func (h *peerHub) add(id int64, rawurl string) (*peer, error) {
  59. u, err := url.Parse(rawurl)
  60. if err != nil {
  61. return nil, err
  62. }
  63. u.Path = raftPrefix
  64. h.mu.Lock()
  65. defer h.mu.Unlock()
  66. if h.stopped {
  67. return nil, fmt.Errorf("peerHub stopped")
  68. }
  69. h.peers[id] = newPeer(u.String(), h.c)
  70. return h.peers[id], nil
  71. }
  72. func (h *peerHub) send(msg raft.Message) error {
  73. if p, err := h.fetch(msg.To); err == nil {
  74. data, err := json.Marshal(msg)
  75. if err != nil {
  76. return err
  77. }
  78. return p.send(data)
  79. }
  80. return errUnknownPeer
  81. }
  82. func (h *peerHub) fetch(nodeId int64) (*peer, error) {
  83. if p, err := h.peer(nodeId); err == nil {
  84. return p, nil
  85. }
  86. for seed := range h.seeds {
  87. if p, err := h.seedFetch(seed, nodeId); err == nil {
  88. return p, nil
  89. }
  90. }
  91. return nil, fmt.Errorf("cannot fetch the address of node %d", nodeId)
  92. }
  93. func (h *peerHub) seedFetch(seedurl string, id int64) (*peer, error) {
  94. u, err := url.Parse(seedurl)
  95. if err != nil {
  96. return nil, fmt.Errorf("cannot parse the url of the given seed")
  97. }
  98. u.Path = path.Join("/raft/cfg", fmt.Sprint(id))
  99. resp, err := h.c.Get(u.String())
  100. if err != nil {
  101. return nil, fmt.Errorf("cannot reach %v", u)
  102. }
  103. defer resp.Body.Close()
  104. if resp.StatusCode != http.StatusOK {
  105. return nil, fmt.Errorf("cannot find node %d via %s", id, seedurl)
  106. }
  107. b, err := ioutil.ReadAll(resp.Body)
  108. if err != nil {
  109. return nil, fmt.Errorf("cannot reach %v", u)
  110. }
  111. return h.add(id, string(b))
  112. }