transport.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. // Copyright 2015 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package rafthttp
  15. import (
  16. "log"
  17. "net/http"
  18. "net/url"
  19. "path"
  20. "sync"
  21. "github.com/coreos/etcd/etcdserver/stats"
  22. "github.com/coreos/etcd/pkg/types"
  23. "github.com/coreos/etcd/raft/raftpb"
  24. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  25. )
  26. type Raft interface {
  27. Process(ctx context.Context, m raftpb.Message) error
  28. }
  29. type Transporter interface {
  30. Handler() http.Handler
  31. Send(m []raftpb.Message)
  32. AddPeer(id types.ID, urls []string)
  33. RemovePeer(id types.ID)
  34. UpdatePeer(id types.ID, urls []string)
  35. Stop()
  36. }
  37. type transport struct {
  38. roundTripper http.RoundTripper
  39. id types.ID
  40. clusterID types.ID
  41. raft Raft
  42. serverStats *stats.ServerStats
  43. leaderStats *stats.LeaderStats
  44. mu sync.RWMutex // protect the peer map
  45. peers map[types.ID]*peer // remote peers
  46. errorc chan error
  47. }
  48. func NewTransporter(rt http.RoundTripper, id, cid types.ID, r Raft, errorc chan error, ss *stats.ServerStats, ls *stats.LeaderStats) Transporter {
  49. return &transport{
  50. roundTripper: rt,
  51. id: id,
  52. clusterID: cid,
  53. raft: r,
  54. serverStats: ss,
  55. leaderStats: ls,
  56. peers: make(map[types.ID]*peer),
  57. errorc: errorc,
  58. }
  59. }
  60. func (t *transport) Handler() http.Handler {
  61. h := NewHandler(t.raft, t.clusterID)
  62. sh := NewStreamHandler(t, t.id, t.clusterID)
  63. mux := http.NewServeMux()
  64. mux.Handle(RaftPrefix, h)
  65. mux.Handle(RaftStreamPrefix+"/", sh)
  66. return mux
  67. }
  68. func (t *transport) Peer(id types.ID) *peer {
  69. t.mu.RLock()
  70. defer t.mu.RUnlock()
  71. return t.peers[id]
  72. }
  73. func (t *transport) Send(msgs []raftpb.Message) {
  74. for _, m := range msgs {
  75. // intentionally dropped message
  76. if m.To == 0 {
  77. continue
  78. }
  79. to := types.ID(m.To)
  80. p, ok := t.peers[to]
  81. if !ok {
  82. log.Printf("etcdserver: send message to unknown receiver %s", to)
  83. continue
  84. }
  85. if m.Type == raftpb.MsgApp {
  86. t.serverStats.SendAppendReq(m.Size())
  87. }
  88. p.Send(m)
  89. }
  90. }
  91. func (t *transport) Stop() {
  92. for _, p := range t.peers {
  93. p.Stop()
  94. }
  95. if tr, ok := t.roundTripper.(*http.Transport); ok {
  96. tr.CloseIdleConnections()
  97. }
  98. }
  99. func (t *transport) AddPeer(id types.ID, urls []string) {
  100. t.mu.Lock()
  101. defer t.mu.Unlock()
  102. if _, ok := t.peers[id]; ok {
  103. return
  104. }
  105. // TODO: considering how to switch between all available peer urls
  106. peerURL := urls[0]
  107. u, err := url.Parse(peerURL)
  108. if err != nil {
  109. log.Panicf("unexpect peer url %s", peerURL)
  110. }
  111. u.Path = path.Join(u.Path, RaftPrefix)
  112. fs := t.leaderStats.Follower(id.String())
  113. t.peers[id] = NewPeer(t.roundTripper, u.String(), id, t.clusterID, t.raft, fs, t.errorc)
  114. }
  115. func (t *transport) RemovePeer(id types.ID) {
  116. t.mu.Lock()
  117. defer t.mu.Unlock()
  118. t.peers[id].Stop()
  119. delete(t.peers, id)
  120. }
  121. func (t *transport) UpdatePeer(id types.ID, urls []string) {
  122. t.mu.Lock()
  123. defer t.mu.Unlock()
  124. // TODO: return error or just panic?
  125. if _, ok := t.peers[id]; !ok {
  126. return
  127. }
  128. peerURL := urls[0]
  129. u, err := url.Parse(peerURL)
  130. if err != nil {
  131. log.Panicf("unexpect peer url %s", peerURL)
  132. }
  133. u.Path = path.Join(u.Path, RaftPrefix)
  134. t.peers[id].Update(u.String())
  135. }
  136. type Pausable interface {
  137. Pause()
  138. Resume()
  139. }
  140. // for testing
  141. func (t *transport) Pause() {
  142. for _, p := range t.peers {
  143. p.Pause()
  144. }
  145. }
  146. func (t *transport) Resume() {
  147. for _, p := range t.peers {
  148. p.Resume()
  149. }
  150. }