peer_hub.go 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. package etcd
  2. import (
  3. "fmt"
  4. "io/ioutil"
  5. "net/http"
  6. "net/url"
  7. "path"
  8. "sync"
  9. )
  10. type peerGetter interface {
  11. peer(id int64) (*peer, error)
  12. }
  13. type peerHub struct {
  14. mu sync.RWMutex
  15. peers map[int64]*peer
  16. c *http.Client
  17. }
  18. func newPeerHub(c *http.Client) *peerHub {
  19. h := &peerHub{
  20. peers: make(map[int64]*peer),
  21. c: c,
  22. }
  23. return h
  24. }
  25. func (h *peerHub) stop() {
  26. for _, p := range h.peers {
  27. p.stop()
  28. }
  29. tr := h.c.Transport.(*http.Transport)
  30. tr.CloseIdleConnections()
  31. }
  32. func (h *peerHub) peer(id int64) (*peer, error) {
  33. h.mu.Lock()
  34. defer h.mu.Unlock()
  35. if p, ok := h.peers[id]; ok {
  36. return p, nil
  37. }
  38. return nil, fmt.Errorf("peer %d not found", id)
  39. }
  40. func (h *peerHub) fetch(seedurl string, id int64) error {
  41. if _, err := h.peer(id); err == nil {
  42. return nil
  43. }
  44. u, err := url.Parse(seedurl)
  45. if err != nil {
  46. return fmt.Errorf("cannot parse the url of the given seed")
  47. }
  48. u.Path = path.Join("/raft/cfg", fmt.Sprint(id))
  49. resp, err := h.c.Get(u.String())
  50. if err != nil {
  51. return fmt.Errorf("cannot reach %v", u)
  52. }
  53. defer resp.Body.Close()
  54. if resp.StatusCode != http.StatusOK {
  55. return fmt.Errorf("cannot find node %d via %s", id, seedurl)
  56. }
  57. b, err := ioutil.ReadAll(resp.Body)
  58. if err != nil {
  59. return fmt.Errorf("cannot reach %v", u)
  60. }
  61. if err := h.add(id, string(b)); err != nil {
  62. return fmt.Errorf("cannot parse the url of node %d: %v", id, err)
  63. }
  64. return nil
  65. }
  66. func (h *peerHub) add(id int64, rawurl string) error {
  67. u, err := url.Parse(rawurl)
  68. if err != nil {
  69. return err
  70. }
  71. u.Path = raftPrefix
  72. h.mu.Lock()
  73. defer h.mu.Unlock()
  74. h.peers[id] = newPeer(u.String(), h.c)
  75. return nil
  76. }
  77. func (h *peerHub) send(nodeId int64, data []byte) error {
  78. h.mu.RLock()
  79. p := h.peers[nodeId]
  80. h.mu.RUnlock()
  81. if p == nil {
  82. return errUnknownNode
  83. }
  84. return p.send(data)
  85. }