transport.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. package rafthttp
  2. import (
  3. "log"
  4. "net/http"
  5. "net/url"
  6. "path"
  7. "sync"
  8. "github.com/coreos/etcd/etcdserver/stats"
  9. "github.com/coreos/etcd/pkg/types"
  10. "github.com/coreos/etcd/raft/raftpb"
  11. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  12. )
  13. type Raft interface {
  14. Process(ctx context.Context, m raftpb.Message) error
  15. }
  16. type Transporter interface {
  17. Handler() http.Handler
  18. Send(m []raftpb.Message)
  19. AddPeer(id types.ID, urls []string)
  20. RemovePeer(id types.ID)
  21. UpdatePeer(id types.ID, urls []string)
  22. Stop()
  23. ShouldStopNotify() <-chan struct{}
  24. }
  25. type transport struct {
  26. roundTripper http.RoundTripper
  27. id types.ID
  28. clusterID types.ID
  29. raft Raft
  30. serverStats *stats.ServerStats
  31. leaderStats *stats.LeaderStats
  32. mu sync.RWMutex // protect the peer map
  33. peers map[types.ID]*peer // remote peers
  34. shouldstop chan struct{}
  35. }
  36. func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
  37. return &transport{
  38. roundTripper: rt,
  39. id: id,
  40. clusterID: cid,
  41. raft: r,
  42. serverStats: ss,
  43. leaderStats: ls,
  44. peers: make(map[types.ID]*peer),
  45. shouldstop: make(chan struct{}, 1),
  46. }
  47. }
  48. func (t *transport) Handler() http.Handler {
  49. h := NewHandler(t.raft, t.clusterID)
  50. sh := NewStreamHandler(t, t.id, t.clusterID)
  51. mux := http.NewServeMux()
  52. mux.Handle(RaftPrefix, h)
  53. mux.Handle(RaftStreamPrefix+"/", sh)
  54. return mux
  55. }
  56. func (t *transport) Peer(id types.ID) *peer {
  57. t.mu.RLock()
  58. defer t.mu.RUnlock()
  59. return t.peers[id]
  60. }
  61. func (t *transport) Send(msgs []raftpb.Message) {
  62. for _, m := range msgs {
  63. // intentionally dropped message
  64. if m.To == 0 {
  65. continue
  66. }
  67. to := types.ID(m.To)
  68. p, ok := t.peers[to]
  69. if !ok {
  70. log.Printf("etcdserver: send message to unknown receiver %s", to)
  71. continue
  72. }
  73. if m.Type == raftpb.MsgApp {
  74. t.serverStats.SendAppendReq(m.Size())
  75. }
  76. p.Send(m)
  77. }
  78. }
  79. func (t *transport) Stop() {
  80. for _, p := range t.peers {
  81. p.Stop()
  82. }
  83. if tr, ok := t.roundTripper.(*http.Transport); ok {
  84. tr.CloseIdleConnections()
  85. }
  86. }
  87. func (t *transport) ShouldStopNotify() <-chan struct{} {
  88. return t.shouldstop
  89. }
  90. func (t *transport) AddPeer(id types.ID, urls []string) {
  91. t.mu.Lock()
  92. defer t.mu.Unlock()
  93. if _, ok := t.peers[id]; ok {
  94. return
  95. }
  96. // TODO: considering how to switch between all available peer urls
  97. peerURL := urls[0]
  98. u, err := url.Parse(peerURL)
  99. if err != nil {
  100. log.Panicf("unexpect peer url %s", peerURL)
  101. }
  102. u.Path = path.Join(u.Path, RaftPrefix)
  103. fs := t.leaderStats.Follower(id.String())
  104. t.peers[id] = NewPeer(t.roundTripper, u.String(), id, t.clusterID,
  105. t.raft, fs, t.shouldstop)
  106. }
  107. func (t *transport) RemovePeer(id types.ID) {
  108. t.mu.Lock()
  109. defer t.mu.Unlock()
  110. t.peers[id].Stop()
  111. delete(t.peers, id)
  112. }
  113. func (t *transport) UpdatePeer(id types.ID, urls []string) {
  114. t.mu.Lock()
  115. defer t.mu.Unlock()
  116. // TODO: return error or just panic?
  117. if _, ok := t.peers[id]; !ok {
  118. return
  119. }
  120. peerURL := urls[0]
  121. u, err := url.Parse(peerURL)
  122. if err != nil {
  123. log.Panicf("unexpect peer url %s", peerURL)
  124. }
  125. u.Path = path.Join(u.Path, RaftPrefix)
  126. t.peers[id].Update(u.String())
  127. }
  128. type Pausable interface {
  129. Pause()
  130. Resume()
  131. }
  132. // for testing
  133. func (t *transport) Pause() {
  134. for _, p := range t.peers {
  135. p.Pause()
  136. }
  137. }
  138. func (t *transport) Resume() {
  139. for _, p := range t.peers {
  140. p.Resume()
  141. }
  142. }