peer_hub.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. /*
  2. Copyright 2014 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdserver
  14. import (
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "io/ioutil"
  19. "net/http"
  20. "net/url"
  21. "path"
  22. "sync"
  23. "time"
  24. "github.com/coreos/etcd/raft"
  25. )
  26. var (
  27. errUnknownPeer = errors.New("unknown peer")
  28. )
  29. type peerGetter interface {
  30. peer(id int64) (*peer, error)
  31. }
  32. type peerHub struct {
  33. mu sync.RWMutex
  34. stopped bool
  35. seeds map[string]bool
  36. peers map[int64]*peer
  37. c *http.Client
  38. followersStats *raftFollowersStats
  39. serverStats *raftServerStats
  40. }
  41. func newPeerHub(c *http.Client, followersStats *raftFollowersStats) *peerHub {
  42. h := &peerHub{
  43. peers: make(map[int64]*peer),
  44. seeds: make(map[string]bool),
  45. c: c,
  46. followersStats: followersStats,
  47. }
  48. return h
  49. }
  50. func (h *peerHub) setServerStats(serverStats *raftServerStats) {
  51. h.serverStats = serverStats
  52. }
  53. func (h *peerHub) setSeeds(seeds []string) {
  54. for _, seed := range seeds {
  55. h.seeds[seed] = true
  56. }
  57. }
  58. func (h *peerHub) getSeeds() map[string]bool {
  59. h.mu.RLock()
  60. defer h.mu.RUnlock()
  61. s := make(map[string]bool)
  62. for k, v := range h.seeds {
  63. s[k] = v
  64. }
  65. return s
  66. }
  67. func (h *peerHub) stop() {
  68. h.mu.Lock()
  69. defer h.mu.Unlock()
  70. h.stopped = true
  71. for _, p := range h.peers {
  72. p.stop()
  73. }
  74. h.followersStats.Reset()
  75. // http.Transport needs some time to put used connections
  76. // into idle queues.
  77. time.Sleep(time.Millisecond)
  78. tr := h.c.Transport.(*http.Transport)
  79. tr.CloseIdleConnections()
  80. }
  81. func (h *peerHub) peer(id int64) (*peer, error) {
  82. h.mu.Lock()
  83. defer h.mu.Unlock()
  84. if h.stopped {
  85. return nil, fmt.Errorf("peerHub stopped")
  86. }
  87. if p, ok := h.peers[id]; ok {
  88. return p, nil
  89. }
  90. return nil, fmt.Errorf("peer %d not found", id)
  91. }
  92. func (h *peerHub) add(id int64, rawurl string) (*peer, error) {
  93. u, err := url.Parse(rawurl)
  94. if err != nil {
  95. return nil, err
  96. }
  97. u.Path = raftPrefix
  98. h.mu.Lock()
  99. defer h.mu.Unlock()
  100. if h.stopped {
  101. return nil, fmt.Errorf("peerHub stopped")
  102. }
  103. h.peers[id] = newPeer(u.String(), h.c, h.followersStats.Follower(fmt.Sprint(id)))
  104. return h.peers[id], nil
  105. }
  106. func (h *peerHub) send(msg raft.Message) error {
  107. if p, err := h.fetch(msg.To); err == nil {
  108. data, err := json.Marshal(msg)
  109. if err != nil {
  110. return err
  111. }
  112. if msg.IsMsgApp() {
  113. h.serverStats.SendAppendReq(len(data))
  114. }
  115. p.send(data)
  116. return nil
  117. }
  118. return errUnknownPeer
  119. }
  120. func (h *peerHub) fetch(nodeId int64) (*peer, error) {
  121. if p, err := h.peer(nodeId); err == nil {
  122. return p, nil
  123. }
  124. for seed := range h.seeds {
  125. if p, err := h.seedFetch(seed, nodeId); err == nil {
  126. return p, nil
  127. }
  128. }
  129. return nil, fmt.Errorf("cannot fetch the address of node %d", nodeId)
  130. }
  131. func (h *peerHub) seedFetch(seedurl string, id int64) (*peer, error) {
  132. u, err := url.Parse(seedurl)
  133. if err != nil {
  134. return nil, fmt.Errorf("cannot parse the url of the given seed")
  135. }
  136. u.Path = path.Join("/raft/cfg", fmt.Sprint(id))
  137. resp, err := h.c.Get(u.String())
  138. if err != nil {
  139. return nil, fmt.Errorf("cannot reach %v", u)
  140. }
  141. defer resp.Body.Close()
  142. if resp.StatusCode != http.StatusOK {
  143. return nil, fmt.Errorf("cannot find node %d via %s", id, seedurl)
  144. }
  145. b, err := ioutil.ReadAll(resp.Body)
  146. if err != nil {
  147. return nil, fmt.Errorf("cannot reach %v", u)
  148. }
  149. return h.add(id, string(b))
  150. }