transporter.go 6.4 KB


  1. /*
  2. Copyright 2013 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package main
  14. import (
  15. "bytes"
  16. "crypto/tls"
  17. "fmt"
  18. "io"
  19. "net"
  20. "net/http"
  21. "time"
  22. "github.com/coreos/go-raft"
  23. )
  24. // Timeout for setup internal raft http connection
  25. // This should not exceed 3 * RTT
  26. var dailTimeout = 3 * HeartbeatTimeout
  27. // Timeout for setup internal raft http connection + receive response header
  28. // This should not exceed 3 * RTT + RTT
  29. var responseHeaderTimeout = 4 * HeartbeatTimeout
  30. // Timeout for receiving the response body from the server
  31. // This should not exceed election timeout
  32. var tranTimeout = ElectionTimeout
  33. // Transporter layer for communication between raft nodes
  34. type transporter struct {
  35. client *http.Client
  36. transport *http.Transport
  37. }
  38. // Create transporter using by raft server
  39. // Create http or https transporter based on
  40. // whether the user give the server cert and key
  41. func newTransporter(scheme string, tlsConf tls.Config) *transporter {
  42. t := transporter{}
  43. tr := &http.Transport{
  44. Dial: dialWithTimeout,
  45. ResponseHeaderTimeout: responseHeaderTimeout,
  46. }
  47. if scheme == "https" {
  48. tr.TLSClientConfig = &tlsConf
  49. tr.DisableCompression = true
  50. }
  51. t.client = &http.Client{Transport: tr}
  52. t.transport = tr
  53. return &t
  54. }
  55. // Dial with timeout
  56. func dialWithTimeout(network, addr string) (net.Conn, error) {
  57. return net.DialTimeout(network, addr, dailTimeout)
  58. }
  59. // Sends AppendEntries RPCs to a peer when the server is the leader.
  60. func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
  61. var b bytes.Buffer
  62. if _, err := req.Encode(&b); err != nil {
  63. warn("transporter.ae.encoding.error:", err)
  64. return nil
  65. }
  66. size := b.Len()
  67. r.serverStats.SendAppendReq(size)
  68. u, _ := nameToRaftURL(peer.Name)
  69. debugf("Send LogEntries to %s ", u)
  70. thisFollowerStats, ok := r.followersStats.Followers[peer.Name]
  71. if !ok { //this is the first time this follower has been seen
  72. thisFollowerStats = &raftFollowerStats{}
  73. thisFollowerStats.Latency.Minimum = 1 << 63
  74. r.followersStats.Followers[peer.Name] = thisFollowerStats
  75. }
  76. start := time.Now()
  77. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
  78. end := time.Now()
  79. if err != nil {
  80. debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
  81. if ok {
  82. thisFollowerStats.Fail()
  83. }
  84. return nil
  85. } else {
  86. if ok {
  87. thisFollowerStats.Succ(end.Sub(start))
  88. }
  89. }
  90. if resp != nil {
  91. defer resp.Body.Close()
  92. t.CancelWhenTimeout(httpRequest)
  93. aeresp := &raft.AppendEntriesResponse{}
  94. if _, err = aeresp.Decode(resp.Body); err != nil && err != io.EOF {
  95. warn("transporter.ae.decoding.error:", err)
  96. return nil
  97. }
  98. return aeresp
  99. }
  100. return nil
  101. }
  102. // Sends RequestVote RPCs to a peer when the server is the candidate.
  103. func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
  104. var b bytes.Buffer
  105. if _, err := req.Encode(&b); err != nil {
  106. warn("transporter.vr.encoding.error:", err)
  107. return nil
  108. }
  109. u, _ := nameToRaftURL(peer.Name)
  110. debugf("Send Vote from %s to %s", server.Name(), u)
  111. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
  112. if err != nil {
  113. debugf("Cannot send VoteRequest to %s : %s", u, err)
  114. }
  115. if resp != nil {
  116. defer resp.Body.Close()
  117. t.CancelWhenTimeout(httpRequest)
  118. rvrsp := &raft.RequestVoteResponse{}
  119. if _, err = rvrsp.Decode(resp.Body); err != nil && err != io.EOF {
  120. warn("transporter.vr.decoding.error:", err)
  121. return nil
  122. }
  123. return rvrsp
  124. }
  125. return nil
  126. }
  127. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
  128. func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
  129. var b bytes.Buffer
  130. if _, err := req.Encode(&b); err != nil {
  131. warn("transporter.ss.encoding.error:", err)
  132. return nil
  133. }
  134. u, _ := nameToRaftURL(peer.Name)
  135. debugf("Send Snapshot Request from %s to %s", server.Name(), u)
  136. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
  137. if err != nil {
  138. debugf("Cannot send Snapshot Request to %s : %s", u, err)
  139. }
  140. if resp != nil {
  141. defer resp.Body.Close()
  142. t.CancelWhenTimeout(httpRequest)
  143. ssrsp := &raft.SnapshotResponse{}
  144. if _, err = ssrsp.Decode(resp.Body); err != nil && err != io.EOF {
  145. warn("transporter.ss.decoding.error:", err)
  146. return nil
  147. }
  148. return ssrsp
  149. }
  150. return nil
  151. }
  152. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
  153. func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
  154. var b bytes.Buffer
  155. if _, err := req.Encode(&b); err != nil {
  156. warn("transporter.ss.encoding.error:", err)
  157. return nil
  158. }
  159. u, _ := nameToRaftURL(peer.Name)
  160. debugf("Send Snapshot Recovery from %s to %s", server.Name(), u)
  161. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
  162. if err != nil {
  163. debugf("Cannot send Snapshot Recovery to %s : %s", u, err)
  164. }
  165. if resp != nil {
  166. defer resp.Body.Close()
  167. t.CancelWhenTimeout(httpRequest)
  168. ssrrsp := &raft.SnapshotRecoveryResponse{}
  169. if _, err = ssrrsp.Decode(resp.Body); err != nil && err != io.EOF {
  170. warn("transporter.ssr.decoding.error:", err)
  171. return nil
  172. }
  173. return ssrrsp
  174. }
  175. return nil
  176. }
  177. // Send server side POST request
  178. func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
  179. req, _ := http.NewRequest("POST", urlStr, body)
  180. resp, err := t.client.Do(req)
  181. return resp, req, err
  182. }
  183. // Send server side GET request
  184. func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) {
  185. req, _ := http.NewRequest("GET", urlStr, nil)
  186. resp, err := t.client.Do(req)
  187. return resp, req, err
  188. }
  189. // Cancel the on fly HTTP transaction when timeout happens.
  190. func (t *transporter) CancelWhenTimeout(req *http.Request) {
  191. go func() {
  192. time.Sleep(tranTimeout)
  193. t.transport.CancelRequest(req)
  194. }()
  195. }