transporter.go 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. /*
  2. Copyright 2013 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package main
  14. import (
  15. "bytes"
  16. "crypto/tls"
  17. "encoding/json"
  18. "fmt"
  19. "io"
  20. "net"
  21. "net/http"
  22. "time"
  23. "github.com/coreos/go-raft"
  24. )
  25. // Timeout for setup internal raft http connection
  26. // This should not exceed 3 * RTT
  27. var dailTimeout = 3 * HeartbeatTimeout
  28. // Timeout for setup internal raft http connection + receive response header
  29. // This should not exceed 3 * RTT + RTT
  30. var responseHeaderTimeout = 4 * HeartbeatTimeout
  31. // Timeout for receiving the response body from the server
  32. // This should not exceed election timeout
  33. var tranTimeout = ElectionTimeout
  34. // Transporter layer for communication between raft nodes
  35. type transporter struct {
  36. client *http.Client
  37. transport *http.Transport
  38. }
  39. // Create transporter using by raft server
  40. // Create http or https transporter based on
  41. // whether the user give the server cert and key
  42. func newTransporter(scheme string, tlsConf tls.Config) *transporter {
  43. t := transporter{}
  44. tr := &http.Transport{
  45. Dial: dialWithTimeout,
  46. ResponseHeaderTimeout: responseHeaderTimeout,
  47. }
  48. if scheme == "https" {
  49. tr.TLSClientConfig = &tlsConf
  50. tr.DisableCompression = true
  51. }
  52. t.client = &http.Client{Transport: tr}
  53. t.transport = tr
  54. return &t
  55. }
  56. // Dial with timeout
  57. func dialWithTimeout(network, addr string) (net.Conn, error) {
  58. return net.DialTimeout(network, addr, dailTimeout)
  59. }
  60. // Sends AppendEntries RPCs to a peer when the server is the leader.
  61. func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
  62. var aersp *raft.AppendEntriesResponse
  63. var b bytes.Buffer
  64. json.NewEncoder(&b).Encode(req)
  65. size := b.Len()
  66. r.serverStats.SendAppendReq(size)
  67. u, _ := nameToRaftURL(peer.Name)
  68. debugf("Send LogEntries to %s ", u)
  69. thisFollowerStats, ok := r.followersStats.Followers[peer.Name]
  70. if !ok { //this is the first time this follower has been seen
  71. thisFollowerStats = &raftFollowerStats{}
  72. thisFollowerStats.Latency.Minimum = 1 << 63
  73. r.followersStats.Followers[peer.Name] = thisFollowerStats
  74. }
  75. start := time.Now()
  76. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
  77. end := time.Now()
  78. if err != nil {
  79. debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
  80. if ok {
  81. thisFollowerStats.Fail()
  82. }
  83. } else {
  84. if ok {
  85. thisFollowerStats.Succ(end.Sub(start))
  86. }
  87. }
  88. if resp != nil {
  89. defer resp.Body.Close()
  90. t.CancelWhenTimeout(httpRequest)
  91. aersp = &raft.AppendEntriesResponse{}
  92. if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  93. return aersp
  94. }
  95. }
  96. return aersp
  97. }
  98. // Sends RequestVote RPCs to a peer when the server is the candidate.
  99. func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
  100. var rvrsp *raft.RequestVoteResponse
  101. var b bytes.Buffer
  102. json.NewEncoder(&b).Encode(req)
  103. u, _ := nameToRaftURL(peer.Name)
  104. debugf("Send Vote to %s", u)
  105. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
  106. if err != nil {
  107. debugf("Cannot send VoteRequest to %s : %s", u, err)
  108. }
  109. if resp != nil {
  110. defer resp.Body.Close()
  111. t.CancelWhenTimeout(httpRequest)
  112. rvrsp := &raft.RequestVoteResponse{}
  113. if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
  114. return rvrsp
  115. }
  116. }
  117. return rvrsp
  118. }
  119. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
  120. func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
  121. var aersp *raft.SnapshotResponse
  122. var b bytes.Buffer
  123. json.NewEncoder(&b).Encode(req)
  124. u, _ := nameToRaftURL(peer.Name)
  125. debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
  126. req.LastTerm, req.LastIndex)
  127. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
  128. if err != nil {
  129. debugf("Cannot send SendSnapshotRequest to %s : %s", u, err)
  130. }
  131. if resp != nil {
  132. defer resp.Body.Close()
  133. t.CancelWhenTimeout(httpRequest)
  134. aersp = &raft.SnapshotResponse{}
  135. if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  136. return aersp
  137. }
  138. }
  139. return aersp
  140. }
  141. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
  142. func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
  143. var aersp *raft.SnapshotRecoveryResponse
  144. var b bytes.Buffer
  145. json.NewEncoder(&b).Encode(req)
  146. u, _ := nameToRaftURL(peer.Name)
  147. debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
  148. req.LastTerm, req.LastIndex)
  149. resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
  150. if err != nil {
  151. debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err)
  152. }
  153. if resp != nil {
  154. defer resp.Body.Close()
  155. aersp = &raft.SnapshotRecoveryResponse{}
  156. if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  157. return aersp
  158. }
  159. }
  160. return aersp
  161. }
  162. // Send server side POST request
  163. func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
  164. req, _ := http.NewRequest("POST", urlStr, body)
  165. resp, err := t.client.Do(req)
  166. return resp, req, err
  167. }
  168. // Send server side GET request
  169. func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) {
  170. req, _ := http.NewRequest("GET", urlStr, nil)
  171. resp, err := t.client.Do(req)
  172. return resp, req, err
  173. }
  174. // Cancel the on fly HTTP transaction when timeout happens
  175. func (t *transporter) CancelWhenTimeout(req *http.Request) {
  176. go func() {
  177. time.Sleep(ElectionTimeout)
  178. t.transport.CancelRequest(req)
  179. }()
  180. }