transporter.go 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package main
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "net"
  9. "net/http"
  10. "time"
  11. "github.com/coreos/go-raft"
  12. )
  13. // Timeout for setup internal raft http connection
  14. // This should not exceed 3 * RTT
  15. var dailTimeout = 3 * HeartbeatTimeout
  16. // Timeout for setup internal raft http connection + receive response header
  17. // This should not exceed 3 * RTT + RTT
  18. var responseHeaderTimeout = 4 * HeartbeatTimeout
  19. // Timeout for receiving the response body from the server
  20. // This should not exceed election timeout
  21. var tranTimeout = ElectionTimeout
  22. // Transporter layer for communication between raft nodes
  23. type transporter struct {
  24. client *http.Client
  25. transport *http.Transport
  26. }
  27. // Create transporter using by raft server
  28. // Create http or https transporter based on
  29. // whether the user give the server cert and key
  30. func newTransporter(scheme string, tlsConf tls.Config) *transporter {
  31. t := transporter{}
  32. tr := &http.Transport{
  33. Dial: dialWithTimeout,
  34. ResponseHeaderTimeout: responseHeaderTimeout,
  35. }
  36. if scheme == "https" {
  37. tr.TLSClientConfig = &tlsConf
  38. tr.DisableCompression = true
  39. }
  40. t.client = &http.Client{Transport: tr}
  41. t.transport = tr
  42. return &t
  43. }
  44. // Dial with timeout
  45. func dialWithTimeout(network, addr string) (net.Conn, error) {
  46. return net.DialTimeout(network, addr, dailTimeout)
  47. }
  48. // Sends AppendEntries RPCs to a peer when the server is the leader.
  49. func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
  50. var aersp *raft.AppendEntriesResponse
  51. var b bytes.Buffer
  52. json.NewEncoder(&b).Encode(req)
  53. size := b.Len()
  54. r.serverStats.SendAppendReq(size)
  55. u, _ := nameToRaftURL(peer.Name)
  56. debugf("Send LogEntries to %s ", u)
  57. thisFollowerStats, ok := r.followersStats.Followers[peer.Name]
  58. if !ok { //this is the first time this follower has been seen
  59. thisFollowerStats = &raftFollowerStats{}
  60. thisFollowerStats.Latency.Minimum = 1 << 63
  61. r.followersStats.Followers[peer.Name] = thisFollowerStats
  62. }
  63. start := time.Now()
  64. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
  65. end := time.Now()
  66. if err != nil {
  67. debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
  68. if ok {
  69. thisFollowerStats.Fail()
  70. }
  71. } else {
  72. if ok {
  73. thisFollowerStats.Succ(end.Sub(start))
  74. }
  75. }
  76. if resp != nil {
  77. defer resp.Body.Close()
  78. t.CancelWhenTimeout(httpRequest)
  79. aersp = &raft.AppendEntriesResponse{}
  80. if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  81. return aersp
  82. }
  83. }
  84. return aersp
  85. }
  86. // Sends RequestVote RPCs to a peer when the server is the candidate.
  87. func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
  88. var rvrsp *raft.RequestVoteResponse
  89. var b bytes.Buffer
  90. json.NewEncoder(&b).Encode(req)
  91. u, _ := nameToRaftURL(peer.Name)
  92. debugf("Send Vote to %s", u)
  93. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
  94. if err != nil {
  95. debugf("Cannot send VoteRequest to %s : %s", u, err)
  96. }
  97. if resp != nil {
  98. defer resp.Body.Close()
  99. t.CancelWhenTimeout(httpRequest)
  100. rvrsp := &raft.RequestVoteResponse{}
  101. if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
  102. return rvrsp
  103. }
  104. }
  105. return rvrsp
  106. }
  107. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
  108. func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
  109. var aersp *raft.SnapshotResponse
  110. var b bytes.Buffer
  111. json.NewEncoder(&b).Encode(req)
  112. u, _ := nameToRaftURL(peer.Name)
  113. debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
  114. req.LastTerm, req.LastIndex)
  115. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
  116. if err != nil {
  117. debugf("Cannot send SendSnapshotRequest to %s : %s", u, err)
  118. }
  119. if resp != nil {
  120. defer resp.Body.Close()
  121. t.CancelWhenTimeout(httpRequest)
  122. aersp = &raft.SnapshotResponse{}
  123. if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  124. return aersp
  125. }
  126. }
  127. return aersp
  128. }
  129. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
  130. func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
  131. var aersp *raft.SnapshotRecoveryResponse
  132. var b bytes.Buffer
  133. json.NewEncoder(&b).Encode(req)
  134. u, _ := nameToRaftURL(peer.Name)
  135. debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
  136. req.LastTerm, req.LastIndex)
  137. resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
  138. if err != nil {
  139. debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err)
  140. }
  141. if resp != nil {
  142. defer resp.Body.Close()
  143. aersp = &raft.SnapshotRecoveryResponse{}
  144. if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  145. return aersp
  146. }
  147. }
  148. return aersp
  149. }
  150. // Send server side POST request
  151. func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
  152. req, _ := http.NewRequest("POST", urlStr, body)
  153. resp, err := t.client.Do(req)
  154. return resp, req, err
  155. }
  156. // Send server side GET request
  157. func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) {
  158. req, _ := http.NewRequest("GET", urlStr, nil)
  159. resp, err := t.client.Do(req)
  160. return resp, req, err
  161. }
  162. // Cancel the on fly HTTP transaction when timeout happens
  163. func (t *transporter) CancelWhenTimeout(req *http.Request) {
  164. go func() {
  165. time.Sleep(ElectionTimeout)
  166. t.transport.CancelRequest(req)
  167. }()
  168. }