transport.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157
  1. package rafthttp
  2. import (
  3. "log"
  4. "net/http"
  5. "net/url"
  6. "path"
  7. "sync"
  8. "github.com/coreos/etcd/etcdserver/stats"
  9. "github.com/coreos/etcd/pkg/types"
  10. "github.com/coreos/etcd/raft/raftpb"
  11. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  12. )
  13. type Raft interface {
  14. Process(ctx context.Context, m raftpb.Message) error
  15. }
  16. type Transporter interface {
  17. Handler() http.Handler
  18. Send(m []raftpb.Message)
  19. AddPeer(id types.ID, urls []string)
  20. RemovePeer(id types.ID)
  21. UpdatePeer(id types.ID, urls []string)
  22. Stop()
  23. }
  24. type transport struct {
  25. roundTripper http.RoundTripper
  26. id types.ID
  27. clusterID types.ID
  28. raft Raft
  29. serverStats *stats.ServerStats
  30. leaderStats *stats.LeaderStats
  31. mu sync.RWMutex // protect the peer map
  32. peers map[types.ID]*peer // remote peers
  33. errorc chan error
  34. }
  35. func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
  36. return &transport{
  37. roundTripper: rt,
  38. id: id,
  39. clusterID: cid,
  40. raft: r,
  41. serverStats: ss,
  42. leaderStats: ls,
  43. peers: make(map[types.ID]*peer),
  44. errorc: errorc,
  45. }
  46. }
  47. func (t *transport) Handler() http.Handler {
  48. h := NewHandler(t.raft, t.clusterID)
  49. sh := NewStreamHandler(t, t.id, t.clusterID)
  50. mux := http.NewServeMux()
  51. mux.Handle(RaftPrefix, h)
  52. mux.Handle(RaftStreamPrefix+"/", sh)
  53. return mux
  54. }
  55. func (t *transport) Peer(id types.ID) *peer {
  56. t.mu.RLock()
  57. defer t.mu.RUnlock()
  58. return t.peers[id]
  59. }
  60. func (t *transport) Send(msgs []raftpb.Message) {
  61. for _, m := range msgs {
  62. // intentionally dropped message
  63. if m.To == 0 {
  64. continue
  65. }
  66. to := types.ID(m.To)
  67. p, ok := t.peers[to]
  68. if !ok {
  69. log.Printf("etcdserver: send message to unknown receiver %s", to)
  70. continue
  71. }
  72. if m.Type == raftpb.MsgApp {
  73. t.serverStats.SendAppendReq(m.Size())
  74. }
  75. p.Send(m)
  76. }
  77. }
  78. func (t *transport) Stop() {
  79. for _, p := range t.peers {
  80. p.Stop()
  81. }
  82. if tr, ok := t.roundTripper.(*http.Transport); ok {
  83. tr.CloseIdleConnections()
  84. }
  85. }
  86. func (t *transport) AddPeer(id types.ID, urls []string) {
  87. t.mu.Lock()
  88. defer t.mu.Unlock()
  89. if _, ok := t.peers[id]; ok {
  90. return
  91. }
  92. // TODO: considering how to switch between all available peer urls
  93. peerURL := urls[0]
  94. u, err := url.Parse(peerURL)
  95. if err != nil {
  96. log.Panicf("unexpect peer url %s", peerURL)
  97. }
  98. u.Path = path.Join(u.Path, RaftPrefix)
  99. fs := t.leaderStats.Follower(id.String())
  100. t.peers[id] = NewPeer(t.roundTripper, u.String(), id, t.clusterID, t.raft, fs, t.errorc)
  101. }
  102. func (t *transport) RemovePeer(id types.ID) {
  103. t.mu.Lock()
  104. defer t.mu.Unlock()
  105. t.peers[id].Stop()
  106. delete(t.peers, id)
  107. }
  108. func (t *transport) UpdatePeer(id types.ID, urls []string) {
  109. t.mu.Lock()
  110. defer t.mu.Unlock()
  111. // TODO: return error or just panic?
  112. if _, ok := t.peers[id]; !ok {
  113. return
  114. }
  115. peerURL := urls[0]
  116. u, err := url.Parse(peerURL)
  117. if err != nil {
  118. log.Panicf("unexpect peer url %s", peerURL)
  119. }
  120. u.Path = path.Join(u.Path, RaftPrefix)
  121. t.peers[id].Update(u.String())
  122. }
  123. type Pausable interface {
  124. Pause()
  125. Resume()
  126. }
  127. // for testing
  128. func (t *transport) Pause() {
  129. for _, p := range t.peers {
  130. p.Pause()
  131. }
  132. }
  133. func (t *transport) Resume() {
  134. for _, p := range t.peers {
  135. p.Resume()
  136. }
  137. }