transporter.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. package server
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "time"
  10. "github.com/coreos/etcd/log"
  11. "github.com/coreos/etcd/third_party/github.com/goraft/raft"
  12. httpclient "github.com/coreos/etcd/third_party/github.com/mreiferson/go-httpclient"
  13. )
  14. const (
  15. snapshotTimeout = time.Second * 120
  16. )
  17. // Transporter layer for communication between raft nodes
  18. type transporter struct {
  19. followersStats *raftFollowersStats
  20. serverStats *raftServerStats
  21. registry *Registry
  22. client *http.Client
  23. transport *httpclient.Transport
  24. snapshotClient *http.Client
  25. snapshotTransport *httpclient.Transport
  26. }
  27. type dialer func(network, addr string) (net.Conn, error)
  28. // Create transporter using by raft server
  29. // Create http or https transporter based on
  30. // whether the user give the server cert and key
  31. func NewTransporter(followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter {
  32. tr := &httpclient.Transport{
  33. ResponseHeaderTimeout: responseHeaderTimeout,
  34. // This is a workaround for Transport.CancelRequest doesn't work on
  35. // HTTPS connections blocked. The patch for it is in progress,
  36. // and would be available in Go1.3
  37. // More: https://codereview.appspot.com/69280043/
  38. ConnectTimeout: dialTimeout,
  39. RequestTimeout: requestTimeout,
  40. }
  41. // Sending snapshot might take a long time so we use a different HTTP transporter
  42. // Timeout is set to 120s (Around 100MB if the bandwidth is 10Mbits/s)
  43. // This timeout is not calculated by heartbeat time.
  44. // TODO(xiangl1) we can actually calculate the max bandwidth if we know
  45. // average RTT.
  46. // It should be equal to (TCP max window size/RTT).
  47. sTr := &httpclient.Transport{
  48. ConnectTimeout: dialTimeout,
  49. RequestTimeout: snapshotTimeout,
  50. }
  51. t := transporter{
  52. client: &http.Client{Transport: tr},
  53. transport: tr,
  54. snapshotClient: &http.Client{Transport: sTr},
  55. snapshotTransport: sTr,
  56. followersStats: followersStats,
  57. serverStats: serverStats,
  58. registry: registry,
  59. }
  60. return &t
  61. }
  62. func (t *transporter) SetTLSConfig(tlsConf tls.Config) {
  63. t.transport.TLSClientConfig = &tlsConf
  64. t.transport.DisableCompression = true
  65. t.snapshotTransport.TLSClientConfig = &tlsConf
  66. t.snapshotTransport.DisableCompression = true
  67. }
  68. // Sends AppendEntries RPCs to a peer when the server is the leader.
  69. func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
  70. var b bytes.Buffer
  71. if _, err := req.Encode(&b); err != nil {
  72. log.Warn("transporter.ae.encoding.error:", err)
  73. return nil
  74. }
  75. size := b.Len()
  76. t.serverStats.SendAppendReq(size)
  77. u, _ := t.registry.PeerURL(peer.Name)
  78. log.Debugf("Send LogEntries to %s ", u)
  79. thisFollowerStats, ok := t.followersStats.Followers[peer.Name]
  80. if !ok { //this is the first time this follower has been seen
  81. thisFollowerStats = &raftFollowerStats{}
  82. thisFollowerStats.Latency.Minimum = 1 << 63
  83. t.followersStats.Followers[peer.Name] = thisFollowerStats
  84. }
  85. start := time.Now()
  86. resp, _, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
  87. end := time.Now()
  88. if err != nil {
  89. log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
  90. if ok {
  91. thisFollowerStats.Fail()
  92. }
  93. return nil
  94. } else {
  95. if ok {
  96. thisFollowerStats.Succ(end.Sub(start))
  97. }
  98. }
  99. if resp != nil {
  100. defer resp.Body.Close()
  101. aeresp := &raft.AppendEntriesResponse{}
  102. if _, err = aeresp.Decode(resp.Body); err != nil && err != io.EOF {
  103. log.Warn("transporter.ae.decoding.error:", err)
  104. return nil
  105. }
  106. return aeresp
  107. }
  108. return nil
  109. }
  110. // Sends RequestVote RPCs to a peer when the server is the candidate.
  111. func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
  112. var b bytes.Buffer
  113. if _, err := req.Encode(&b); err != nil {
  114. log.Warn("transporter.vr.encoding.error:", err)
  115. return nil
  116. }
  117. u, _ := t.registry.PeerURL(peer.Name)
  118. log.Debugf("Send Vote from %s to %s", server.Name(), u)
  119. resp, _, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
  120. if err != nil {
  121. log.Debugf("Cannot send VoteRequest to %s : %s", u, err)
  122. }
  123. if resp != nil {
  124. defer resp.Body.Close()
  125. rvrsp := &raft.RequestVoteResponse{}
  126. if _, err = rvrsp.Decode(resp.Body); err != nil && err != io.EOF {
  127. log.Warn("transporter.vr.decoding.error:", err)
  128. return nil
  129. }
  130. return rvrsp
  131. }
  132. return nil
  133. }
  134. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
  135. func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
  136. var b bytes.Buffer
  137. if _, err := req.Encode(&b); err != nil {
  138. log.Warn("transporter.ss.encoding.error:", err)
  139. return nil
  140. }
  141. u, _ := t.registry.PeerURL(peer.Name)
  142. log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u)
  143. resp, _, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
  144. if err != nil {
  145. log.Debugf("Cannot send Snapshot Request to %s : %s", u, err)
  146. }
  147. if resp != nil {
  148. defer resp.Body.Close()
  149. ssrsp := &raft.SnapshotResponse{}
  150. if _, err = ssrsp.Decode(resp.Body); err != nil && err != io.EOF {
  151. log.Warn("transporter.ss.decoding.error:", err)
  152. return nil
  153. }
  154. return ssrsp
  155. }
  156. return nil
  157. }
  158. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
  159. func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
  160. var b bytes.Buffer
  161. if _, err := req.Encode(&b); err != nil {
  162. log.Warn("transporter.ss.encoding.error:", err)
  163. return nil
  164. }
  165. u, _ := t.registry.PeerURL(peer.Name)
  166. log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u)
  167. resp, err := t.PostSnapshot(fmt.Sprintf("%s/snapshotRecovery", u), &b)
  168. if err != nil {
  169. log.Debugf("Cannot send Snapshot Recovery to %s : %s", u, err)
  170. }
  171. if resp != nil {
  172. defer resp.Body.Close()
  173. ssrrsp := &raft.SnapshotRecoveryResponse{}
  174. if _, err = ssrrsp.Decode(resp.Body); err != nil && err != io.EOF {
  175. log.Warn("transporter.ssr.decoding.error:", err)
  176. return nil
  177. }
  178. return ssrrsp
  179. }
  180. return nil
  181. }
  182. // Send server side POST request
  183. func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
  184. req, _ := http.NewRequest("POST", urlStr, body)
  185. resp, err := t.client.Do(req)
  186. return resp, req, err
  187. }
  188. // Send server side GET request
  189. func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) {
  190. req, _ := http.NewRequest("GET", urlStr, nil)
  191. resp, err := t.client.Do(req)
  192. return resp, req, err
  193. }
  194. // Send server side PUT request
  195. func (t *transporter) Put(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
  196. req, _ := http.NewRequest("PUT", urlStr, body)
  197. resp, err := t.client.Do(req)
  198. return resp, req, err
  199. }
  200. // PostSnapshot posts a json format snapshot to the given url
  201. // The underlying HTTP transport has a minute level timeout
  202. func (t *transporter) PostSnapshot(url string, body io.Reader) (*http.Response, error) {
  203. return t.snapshotClient.Post(url, "application/json", body)
  204. }