peer_hub.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcd
  14. import (
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "io/ioutil"
  19. "net/http"
  20. "net/url"
  21. "path"
  22. "sync"
  23. "github.com/coreos/etcd/raft"
  24. )
  25. var (
  26. errUnknownPeer = errors.New("unknown peer")
  27. )
  28. type peerGetter interface {
  29. peer(id int64) (*peer, error)
  30. }
  31. type peerHub struct {
  32. mu sync.RWMutex
  33. stopped bool
  34. seeds map[string]bool
  35. peers map[int64]*peer
  36. c *http.Client
  37. }
  38. func newPeerHub(c *http.Client) *peerHub {
  39. h := &peerHub{
  40. peers: make(map[int64]*peer),
  41. seeds: make(map[string]bool),
  42. c: c,
  43. }
  44. return h
  45. }
  46. func (h *peerHub) setSeeds(seeds []string) {
  47. for _, seed := range seeds {
  48. h.seeds[seed] = true
  49. }
  50. }
  51. func (h *peerHub) getSeeds() map[string]bool {
  52. h.mu.RLock()
  53. defer h.mu.RUnlock()
  54. s := make(map[string]bool)
  55. for k, v := range h.seeds {
  56. s[k] = v
  57. }
  58. return s
  59. }
  60. func (h *peerHub) stop() {
  61. h.mu.Lock()
  62. defer h.mu.Unlock()
  63. h.stopped = true
  64. for _, p := range h.peers {
  65. p.stop()
  66. }
  67. tr := h.c.Transport.(*http.Transport)
  68. tr.CloseIdleConnections()
  69. }
  70. func (h *peerHub) peer(id int64) (*peer, error) {
  71. h.mu.Lock()
  72. defer h.mu.Unlock()
  73. if h.stopped {
  74. return nil, fmt.Errorf("peerHub stopped")
  75. }
  76. if p, ok := h.peers[id]; ok {
  77. return p, nil
  78. }
  79. return nil, fmt.Errorf("peer %d not found", id)
  80. }
  81. func (h *peerHub) add(id int64, rawurl string) (*peer, error) {
  82. u, err := url.Parse(rawurl)
  83. if err != nil {
  84. return nil, err
  85. }
  86. u.Path = raftPrefix
  87. h.mu.Lock()
  88. defer h.mu.Unlock()
  89. if h.stopped {
  90. return nil, fmt.Errorf("peerHub stopped")
  91. }
  92. h.peers[id] = newPeer(u.String(), h.c)
  93. return h.peers[id], nil
  94. }
  95. func (h *peerHub) send(msg raft.Message) error {
  96. if p, err := h.fetch(msg.To); err == nil {
  97. data, err := json.Marshal(msg)
  98. if err != nil {
  99. return err
  100. }
  101. return p.send(data)
  102. }
  103. return errUnknownPeer
  104. }
  105. func (h *peerHub) fetch(nodeId int64) (*peer, error) {
  106. if p, err := h.peer(nodeId); err == nil {
  107. return p, nil
  108. }
  109. for seed := range h.seeds {
  110. if p, err := h.seedFetch(seed, nodeId); err == nil {
  111. return p, nil
  112. }
  113. }
  114. return nil, fmt.Errorf("cannot fetch the address of node %d", nodeId)
  115. }
  116. func (h *peerHub) seedFetch(seedurl string, id int64) (*peer, error) {
  117. u, err := url.Parse(seedurl)
  118. if err != nil {
  119. return nil, fmt.Errorf("cannot parse the url of the given seed")
  120. }
  121. u.Path = path.Join("/raft/cfg", fmt.Sprint(id))
  122. resp, err := h.c.Get(u.String())
  123. if err != nil {
  124. return nil, fmt.Errorf("cannot reach %v", u)
  125. }
  126. defer resp.Body.Close()
  127. if resp.StatusCode != http.StatusOK {
  128. return nil, fmt.Errorf("cannot find node %d via %s", id, seedurl)
  129. }
  130. b, err := ioutil.ReadAll(resp.Body)
  131. if err != nil {
  132. return nil, fmt.Errorf("cannot reach %v", u)
  133. }
  134. return h.add(id, string(b))
  135. }