transporter.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package server
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "time"
  10. "github.com/coreos/etcd/log"
  11. "github.com/coreos/raft"
  12. )
  13. // Transporter layer for communication between raft nodes
  14. type transporter struct {
  15. client *http.Client
  16. transport *http.Transport
  17. peerServer *PeerServer
  18. }
  19. type dialer func(network, addr string) (net.Conn, error)
  20. // Create transporter using by raft server
  21. // Create http or https transporter based on
  22. // whether the user give the server cert and key
  23. func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter {
  24. // names for each type of timeout, for the sake of clarity
  25. dialTimeout := (3 * peerServer.HeartbeatTimeout) + peerServer.ElectionTimeout
  26. responseHeaderTimeout := (3 * peerServer.HeartbeatTimeout) + peerServer.ElectionTimeout
  27. t := transporter{}
  28. tr := &http.Transport{
  29. Dial: func(network, addr string) (net.Conn, error) {
  30. return net.DialTimeout(network, addr, dialTimeout)
  31. },
  32. ResponseHeaderTimeout: responseHeaderTimeout,
  33. }
  34. if scheme == "https" {
  35. tr.TLSClientConfig = &tlsConf
  36. tr.DisableCompression = true
  37. }
  38. t.client = &http.Client{Transport: tr}
  39. t.transport = tr
  40. t.peerServer = peerServer
  41. return &t
  42. }
  43. // Sends AppendEntries RPCs to a peer when the server is the leader.
  44. func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
  45. var b bytes.Buffer
  46. if _, err := req.Encode(&b); err != nil {
  47. log.Warn("transporter.ae.encoding.error:", err)
  48. return nil
  49. }
  50. size := b.Len()
  51. t.peerServer.serverStats.SendAppendReq(size)
  52. u, _ := t.peerServer.registry.PeerURL(peer.Name)
  53. log.Debugf("Send LogEntries to %s ", u)
  54. thisFollowerStats, ok := t.peerServer.followersStats.Followers[peer.Name]
  55. if !ok { //this is the first time this follower has been seen
  56. thisFollowerStats = &raftFollowerStats{}
  57. thisFollowerStats.Latency.Minimum = 1 << 63
  58. t.peerServer.followersStats.Followers[peer.Name] = thisFollowerStats
  59. }
  60. start := time.Now()
  61. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
  62. end := time.Now()
  63. if err != nil {
  64. log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
  65. if ok {
  66. thisFollowerStats.Fail()
  67. }
  68. return nil
  69. } else {
  70. if ok {
  71. thisFollowerStats.Succ(end.Sub(start))
  72. }
  73. }
  74. if resp != nil {
  75. defer resp.Body.Close()
  76. t.CancelWhenTimeout(httpRequest)
  77. aeresp := &raft.AppendEntriesResponse{}
  78. if _, err = aeresp.Decode(resp.Body); err != nil && err != io.EOF {
  79. log.Warn("transporter.ae.decoding.error:", err)
  80. return nil
  81. }
  82. return aeresp
  83. }
  84. return nil
  85. }
  86. // Sends RequestVote RPCs to a peer when the server is the candidate.
  87. func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
  88. var b bytes.Buffer
  89. if _, err := req.Encode(&b); err != nil {
  90. log.Warn("transporter.vr.encoding.error:", err)
  91. return nil
  92. }
  93. u, _ := t.peerServer.registry.PeerURL(peer.Name)
  94. log.Debugf("Send Vote from %s to %s", server.Name(), u)
  95. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
  96. if err != nil {
  97. log.Debugf("Cannot send VoteRequest to %s : %s", u, err)
  98. }
  99. if resp != nil {
  100. defer resp.Body.Close()
  101. t.CancelWhenTimeout(httpRequest)
  102. rvrsp := &raft.RequestVoteResponse{}
  103. if _, err = rvrsp.Decode(resp.Body); err != nil && err != io.EOF {
  104. log.Warn("transporter.vr.decoding.error:", err)
  105. return nil
  106. }
  107. return rvrsp
  108. }
  109. return nil
  110. }
  111. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
  112. func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
  113. var b bytes.Buffer
  114. if _, err := req.Encode(&b); err != nil {
  115. log.Warn("transporter.ss.encoding.error:", err)
  116. return nil
  117. }
  118. u, _ := t.peerServer.registry.PeerURL(peer.Name)
  119. log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u)
  120. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
  121. if err != nil {
  122. log.Debugf("Cannot send Snapshot Request to %s : %s", u, err)
  123. }
  124. if resp != nil {
  125. defer resp.Body.Close()
  126. t.CancelWhenTimeout(httpRequest)
  127. ssrsp := &raft.SnapshotResponse{}
  128. if _, err = ssrsp.Decode(resp.Body); err != nil && err != io.EOF {
  129. log.Warn("transporter.ss.decoding.error:", err)
  130. return nil
  131. }
  132. return ssrsp
  133. }
  134. return nil
  135. }
  136. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
  137. func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
  138. var b bytes.Buffer
  139. if _, err := req.Encode(&b); err != nil {
  140. log.Warn("transporter.ss.encoding.error:", err)
  141. return nil
  142. }
  143. u, _ := t.peerServer.registry.PeerURL(peer.Name)
  144. log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u)
  145. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
  146. if err != nil {
  147. log.Debugf("Cannot send Snapshot Recovery to %s : %s", u, err)
  148. }
  149. if resp != nil {
  150. defer resp.Body.Close()
  151. t.CancelWhenTimeout(httpRequest)
  152. ssrrsp := &raft.SnapshotRecoveryResponse{}
  153. if _, err = ssrrsp.Decode(resp.Body); err != nil && err != io.EOF {
  154. log.Warn("transporter.ssr.decoding.error:", err)
  155. return nil
  156. }
  157. return ssrrsp
  158. }
  159. return nil
  160. }
  161. // Send server side POST request
  162. func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
  163. req, _ := http.NewRequest("POST", urlStr, body)
  164. resp, err := t.client.Do(req)
  165. return resp, req, err
  166. }
  167. // Send server side GET request
  168. func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) {
  169. req, _ := http.NewRequest("GET", urlStr, nil)
  170. resp, err := t.client.Do(req)
  171. return resp, req, err
  172. }
  173. // Cancel the on fly HTTP transaction when timeout happens.
  174. func (t *transporter) CancelWhenTimeout(req *http.Request) {
  175. go func() {
  176. time.Sleep(t.peerServer.HeartbeatTimeout)
  177. t.transport.CancelRequest(req)
  178. }()
  179. }