transporter.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. package server
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "net"
  9. "net/http"
  10. "time"
  11. "github.com/coreos/etcd/log"
  12. "github.com/coreos/go-raft"
  13. )
  14. // Timeout for setup internal raft http connection
  15. // This should not exceed 3 * RTT
  16. var dailTimeout = 3 * HeartbeatTimeout
  17. // Timeout for setup internal raft http connection + receive all post body
  18. // The raft server will not send back response header until it received all the
  19. // post body.
  20. // This should not exceed dailTimeout + electionTimeout
  21. var responseHeaderTimeout = 3*HeartbeatTimeout + ElectionTimeout
  22. // Timeout for receiving the response body from the server
  23. // This should not exceed heartbeatTimeout
  24. var tranTimeout = HeartbeatTimeout
  25. // Transporter layer for communication between raft nodes
  26. type transporter struct {
  27. client *http.Client
  28. transport *http.Transport
  29. peerServer *PeerServer
  30. }
  31. // Create transporter using by raft server
  32. // Create http or https transporter based on
  33. // whether the user give the server cert and key
  34. func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter {
  35. t := transporter{}
  36. tr := &http.Transport{
  37. Dial: dialWithTimeout,
  38. ResponseHeaderTimeout: responseHeaderTimeout,
  39. }
  40. if scheme == "https" {
  41. tr.TLSClientConfig = &tlsConf
  42. tr.DisableCompression = true
  43. }
  44. t.client = &http.Client{Transport: tr}
  45. t.transport = tr
  46. t.peerServer = peerServer
  47. return &t
  48. }
  49. // Dial with timeout
  50. func dialWithTimeout(network, addr string) (net.Conn, error) {
  51. return net.DialTimeout(network, addr, dailTimeout)
  52. }
  53. // Sends AppendEntries RPCs to a peer when the server is the leader.
  54. func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
  55. var aersp *raft.AppendEntriesResponse
  56. var b bytes.Buffer
  57. json.NewEncoder(&b).Encode(req)
  58. size := b.Len()
  59. t.peerServer.serverStats.SendAppendReq(size)
  60. u, _ := t.peerServer.registry.PeerURL(peer.Name)
  61. log.Debugf("Send LogEntries to %s ", u)
  62. thisFollowerStats, ok := t.peerServer.followersStats.Followers[peer.Name]
  63. if !ok { //this is the first time this follower has been seen
  64. thisFollowerStats = &raftFollowerStats{}
  65. thisFollowerStats.Latency.Minimum = 1 << 63
  66. t.peerServer.followersStats.Followers[peer.Name] = thisFollowerStats
  67. }
  68. start := time.Now()
  69. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
  70. end := time.Now()
  71. if err != nil {
  72. log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
  73. if ok {
  74. thisFollowerStats.Fail()
  75. }
  76. } else {
  77. if ok {
  78. thisFollowerStats.Succ(end.Sub(start))
  79. }
  80. }
  81. if resp != nil {
  82. defer resp.Body.Close()
  83. t.CancelWhenTimeout(httpRequest)
  84. aersp = &raft.AppendEntriesResponse{}
  85. if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  86. return aersp
  87. }
  88. }
  89. return aersp
  90. }
  91. // Sends RequestVote RPCs to a peer when the server is the candidate.
  92. func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
  93. var rvrsp *raft.RequestVoteResponse
  94. var b bytes.Buffer
  95. json.NewEncoder(&b).Encode(req)
  96. u, _ := t.peerServer.registry.PeerURL(peer.Name)
  97. log.Debugf("Send Vote from %s to %s", server.Name(), u)
  98. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
  99. if err != nil {
  100. log.Debugf("Cannot send VoteRequest to %s : %s", u, err)
  101. }
  102. if resp != nil {
  103. defer resp.Body.Close()
  104. t.CancelWhenTimeout(httpRequest)
  105. rvrsp := &raft.RequestVoteResponse{}
  106. if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
  107. return rvrsp
  108. }
  109. }
  110. return rvrsp
  111. }
  112. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
  113. func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
  114. var aersp *raft.SnapshotResponse
  115. var b bytes.Buffer
  116. json.NewEncoder(&b).Encode(req)
  117. u, _ := t.peerServer.registry.PeerURL(peer.Name)
  118. log.Debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
  119. req.LastTerm, req.LastIndex)
  120. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
  121. if err != nil {
  122. log.Debugf("Cannot send SendSnapshotRequest to %s : %s", u, err)
  123. }
  124. if resp != nil {
  125. defer resp.Body.Close()
  126. t.CancelWhenTimeout(httpRequest)
  127. aersp = &raft.SnapshotResponse{}
  128. if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  129. return aersp
  130. }
  131. }
  132. return aersp
  133. }
  134. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
  135. func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
  136. var aersp *raft.SnapshotRecoveryResponse
  137. var b bytes.Buffer
  138. json.NewEncoder(&b).Encode(req)
  139. u, _ := t.peerServer.registry.PeerURL(peer.Name)
  140. log.Debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
  141. req.LastTerm, req.LastIndex)
  142. resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
  143. if err != nil {
  144. log.Debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err)
  145. }
  146. if resp != nil {
  147. defer resp.Body.Close()
  148. aersp = &raft.SnapshotRecoveryResponse{}
  149. if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  150. return aersp
  151. }
  152. }
  153. return aersp
  154. }
  155. // Send server side POST request
  156. func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
  157. req, _ := http.NewRequest("POST", urlStr, body)
  158. resp, err := t.client.Do(req)
  159. return resp, req, err
  160. }
  161. // Send server side GET request
  162. func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) {
  163. req, _ := http.NewRequest("GET", urlStr, nil)
  164. resp, err := t.client.Do(req)
  165. return resp, req, err
  166. }
  167. // Cancel the on fly HTTP transaction when timeout happens.
  168. func (t *transporter) CancelWhenTimeout(req *http.Request) {
  169. go func() {
  170. time.Sleep(tranTimeout)
  171. t.transport.CancelRequest(req)
  172. }()
  173. }