transporter.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261
  1. package server
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "time"
  10. "github.com/coreos/etcd/log"
  11. "github.com/coreos/etcd/third_party/github.com/goraft/raft"
  12. httpclient "github.com/coreos/etcd/third_party/github.com/mreiferson/go-httpclient"
  13. )
  14. const (
  15. snapshotTimeout = time.Second * 120
  16. )
  17. // Transporter layer for communication between raft nodes
  18. type transporter struct {
  19. requestTimeout time.Duration
  20. followersStats *raftFollowersStats
  21. serverStats *raftServerStats
  22. registry *Registry
  23. client *http.Client
  24. transport *httpclient.Transport
  25. snapshotClient *http.Client
  26. snapshotTransport *httpclient.Transport
  27. }
  28. type dialer func(network, addr string) (net.Conn, error)
  29. // Create transporter using by raft server
  30. // Create http or https transporter based on
  31. // whether the user give the server cert and key
  32. func NewTransporter(followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter {
  33. tr := &httpclient.Transport{
  34. ResponseHeaderTimeout: responseHeaderTimeout,
  35. // This is a workaround for Transport.CancelRequest doesn't work on
  36. // HTTPS connections blocked. The patch for it is in progress,
  37. // and would be available in Go1.3
  38. // More: https://codereview.appspot.com/69280043/
  39. ConnectTimeout: dialTimeout,
  40. RequestTimeout: dialTimeout + responseHeaderTimeout,
  41. ReadWriteTimeout: responseHeaderTimeout,
  42. }
  43. // Sending snapshot might take a long time so we use a different HTTP transporter
  44. // Timeout is set to 120s (Around 100MB if the bandwidth is 10Mbits/s)
  45. // This timeout is not calculated by heartbeat time.
  46. // TODO(xiangl1) we can actually calculate the max bandwidth if we know
  47. // average RTT.
  48. // It should be equal to (TCP max window size/RTT).
  49. sTr := &httpclient.Transport{
  50. ConnectTimeout: dialTimeout,
  51. RequestTimeout: snapshotTimeout,
  52. ReadWriteTimeout: snapshotTimeout,
  53. }
  54. t := transporter{
  55. client: &http.Client{Transport: tr},
  56. transport: tr,
  57. snapshotClient: &http.Client{Transport: sTr},
  58. snapshotTransport: sTr,
  59. requestTimeout: requestTimeout,
  60. followersStats: followersStats,
  61. serverStats: serverStats,
  62. registry: registry,
  63. }
  64. return &t
  65. }
  66. func (t *transporter) SetTLSConfig(tlsConf tls.Config) {
  67. t.transport.TLSClientConfig = &tlsConf
  68. t.transport.DisableCompression = true
  69. t.snapshotTransport.TLSClientConfig = &tlsConf
  70. t.snapshotTransport.DisableCompression = true
  71. }
  72. // Sends AppendEntries RPCs to a peer when the server is the leader.
  73. func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
  74. var b bytes.Buffer
  75. if _, err := req.Encode(&b); err != nil {
  76. log.Warn("transporter.ae.encoding.error:", err)
  77. return nil
  78. }
  79. size := b.Len()
  80. t.serverStats.SendAppendReq(size)
  81. u, _ := t.registry.PeerURL(peer.Name)
  82. log.Debugf("Send LogEntries to %s ", u)
  83. thisFollowerStats, ok := t.followersStats.Followers[peer.Name]
  84. if !ok { //this is the first time this follower has been seen
  85. thisFollowerStats = &raftFollowerStats{}
  86. thisFollowerStats.Latency.Minimum = 1 << 63
  87. t.followersStats.Followers[peer.Name] = thisFollowerStats
  88. }
  89. start := time.Now()
  90. resp, _, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
  91. end := time.Now()
  92. if err != nil {
  93. log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
  94. if ok {
  95. thisFollowerStats.Fail()
  96. }
  97. return nil
  98. } else {
  99. if ok {
  100. thisFollowerStats.Succ(end.Sub(start))
  101. }
  102. }
  103. if resp != nil {
  104. defer resp.Body.Close()
  105. aeresp := &raft.AppendEntriesResponse{}
  106. if _, err = aeresp.Decode(resp.Body); err != nil && err != io.EOF {
  107. log.Warn("transporter.ae.decoding.error:", err)
  108. return nil
  109. }
  110. return aeresp
  111. }
  112. return nil
  113. }
  114. // Sends RequestVote RPCs to a peer when the server is the candidate.
  115. func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
  116. var b bytes.Buffer
  117. if _, err := req.Encode(&b); err != nil {
  118. log.Warn("transporter.vr.encoding.error:", err)
  119. return nil
  120. }
  121. u, _ := t.registry.PeerURL(peer.Name)
  122. log.Debugf("Send Vote from %s to %s", server.Name(), u)
  123. resp, _, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
  124. if err != nil {
  125. log.Debugf("Cannot send VoteRequest to %s : %s", u, err)
  126. }
  127. if resp != nil {
  128. defer resp.Body.Close()
  129. rvrsp := &raft.RequestVoteResponse{}
  130. if _, err = rvrsp.Decode(resp.Body); err != nil && err != io.EOF {
  131. log.Warn("transporter.vr.decoding.error:", err)
  132. return nil
  133. }
  134. return rvrsp
  135. }
  136. return nil
  137. }
  138. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
  139. func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
  140. var b bytes.Buffer
  141. if _, err := req.Encode(&b); err != nil {
  142. log.Warn("transporter.ss.encoding.error:", err)
  143. return nil
  144. }
  145. u, _ := t.registry.PeerURL(peer.Name)
  146. log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u)
  147. resp, _, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
  148. if err != nil {
  149. log.Debugf("Cannot send Snapshot Request to %s : %s", u, err)
  150. }
  151. if resp != nil {
  152. defer resp.Body.Close()
  153. ssrsp := &raft.SnapshotResponse{}
  154. if _, err = ssrsp.Decode(resp.Body); err != nil && err != io.EOF {
  155. log.Warn("transporter.ss.decoding.error:", err)
  156. return nil
  157. }
  158. return ssrsp
  159. }
  160. return nil
  161. }
  162. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
  163. func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
  164. var b bytes.Buffer
  165. if _, err := req.Encode(&b); err != nil {
  166. log.Warn("transporter.ss.encoding.error:", err)
  167. return nil
  168. }
  169. u, _ := t.registry.PeerURL(peer.Name)
  170. log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u)
  171. resp, err := t.PostSnapshot(fmt.Sprintf("%s/snapshotRecovery", u), &b)
  172. if err != nil {
  173. log.Debugf("Cannot send Snapshot Recovery to %s : %s", u, err)
  174. }
  175. if resp != nil {
  176. defer resp.Body.Close()
  177. ssrrsp := &raft.SnapshotRecoveryResponse{}
  178. if _, err = ssrrsp.Decode(resp.Body); err != nil && err != io.EOF {
  179. log.Warn("transporter.ssr.decoding.error:", err)
  180. return nil
  181. }
  182. return ssrrsp
  183. }
  184. return nil
  185. }
  186. // Send server side POST request
  187. func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
  188. req, _ := http.NewRequest("POST", urlStr, body)
  189. resp, err := t.client.Do(req)
  190. return resp, req, err
  191. }
  192. // Send server side GET request
  193. func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) {
  194. req, _ := http.NewRequest("GET", urlStr, nil)
  195. resp, err := t.client.Do(req)
  196. return resp, req, err
  197. }
  198. // Send server side PUT request
  199. func (t *transporter) Put(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
  200. req, _ := http.NewRequest("PUT", urlStr, body)
  201. resp, err := t.client.Do(req)
  202. return resp, req, err
  203. }
  204. // PostSnapshot posts a json format snapshot to the given url
  205. // The underlying HTTP transport has a minute level timeout
  206. func (t *transporter) PostSnapshot(url string, body io.Reader) (*http.Response, error) {
  207. return t.snapshotClient.Post(url, "application/json", body)
  208. }