transporter.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package server
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "time"
  10. "github.com/coreos/etcd/log"
  11. "github.com/coreos/raft"
  12. )
  13. // Transporter layer for communication between raft nodes
  14. type transporter struct {
  15. client *http.Client
  16. transport *http.Transport
  17. peerServer *PeerServer
  18. tranTimeout time.Duration
  19. }
  20. type dialer func(network, addr string) (net.Conn, error)
  21. // Create transporter using by raft server
  22. // Create http or https transporter based on
  23. // whether the user give the server cert and key
  24. func newTransporter(scheme string, tlsConf tls.Config, peerServer *PeerServer) *transporter {
  25. // names for each type of timeout, for the sake of clarity
  26. dialTimeout := time.Duration(3 * peerServer.heartbeatTimeout + peerServer.electionTimeout) * time.Millisecond
  27. responseHeaderTimeout := time.Duration(3 * peerServer.heartbeatTimeout + peerServer.electionTimeout) * time.Millisecond
  28. t := transporter{}
  29. t.tranTimeout = time.Duration(peerServer.heartbeatTimeout) * time.Millisecond
  30. tr := &http.Transport{
  31. Dial: dialWithTimeoutFactory(dialTimeout),
  32. ResponseHeaderTimeout: responseHeaderTimeout,
  33. }
  34. if scheme == "https" {
  35. tr.TLSClientConfig = &tlsConf
  36. tr.DisableCompression = true
  37. }
  38. t.client = &http.Client{Transport: tr}
  39. t.transport = tr
  40. t.peerServer = peerServer
  41. return &t
  42. }
  43. // factory function to return a dialer
  44. func dialWithTimeoutFactory( timeout time.Duration ) dialer {
  45. return func(network, addr string) (net.Conn, error) {
  46. return net.DialTimeout(network, addr, timeout)
  47. }
  48. }
  49. // Sends AppendEntries RPCs to a peer when the server is the leader.
  50. func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
  51. var b bytes.Buffer
  52. if _, err := req.Encode(&b); err != nil {
  53. log.Warn("transporter.ae.encoding.error:", err)
  54. return nil
  55. }
  56. size := b.Len()
  57. t.peerServer.serverStats.SendAppendReq(size)
  58. u, _ := t.peerServer.registry.PeerURL(peer.Name)
  59. log.Debugf("Send LogEntries to %s ", u)
  60. thisFollowerStats, ok := t.peerServer.followersStats.Followers[peer.Name]
  61. if !ok { //this is the first time this follower has been seen
  62. thisFollowerStats = &raftFollowerStats{}
  63. thisFollowerStats.Latency.Minimum = 1 << 63
  64. t.peerServer.followersStats.Followers[peer.Name] = thisFollowerStats
  65. }
  66. start := time.Now()
  67. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
  68. end := time.Now()
  69. if err != nil {
  70. log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
  71. if ok {
  72. thisFollowerStats.Fail()
  73. }
  74. return nil
  75. } else {
  76. if ok {
  77. thisFollowerStats.Succ(end.Sub(start))
  78. }
  79. }
  80. if resp != nil {
  81. defer resp.Body.Close()
  82. t.CancelWhenTimeout(httpRequest)
  83. aeresp := &raft.AppendEntriesResponse{}
  84. if _, err = aeresp.Decode(resp.Body); err != nil && err != io.EOF {
  85. log.Warn("transporter.ae.decoding.error:", err)
  86. return nil
  87. }
  88. return aeresp
  89. }
  90. return nil
  91. }
  92. // Sends RequestVote RPCs to a peer when the server is the candidate.
  93. func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
  94. var b bytes.Buffer
  95. if _, err := req.Encode(&b); err != nil {
  96. log.Warn("transporter.vr.encoding.error:", err)
  97. return nil
  98. }
  99. u, _ := t.peerServer.registry.PeerURL(peer.Name)
  100. log.Debugf("Send Vote from %s to %s", server.Name(), u)
  101. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
  102. if err != nil {
  103. log.Debugf("Cannot send VoteRequest to %s : %s", u, err)
  104. }
  105. if resp != nil {
  106. defer resp.Body.Close()
  107. t.CancelWhenTimeout(httpRequest)
  108. rvrsp := &raft.RequestVoteResponse{}
  109. if _, err = rvrsp.Decode(resp.Body); err != nil && err != io.EOF {
  110. log.Warn("transporter.vr.decoding.error:", err)
  111. return nil
  112. }
  113. return rvrsp
  114. }
  115. return nil
  116. }
  117. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
  118. func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
  119. var b bytes.Buffer
  120. if _, err := req.Encode(&b); err != nil {
  121. log.Warn("transporter.ss.encoding.error:", err)
  122. return nil
  123. }
  124. u, _ := t.peerServer.registry.PeerURL(peer.Name)
  125. log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u)
  126. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
  127. if err != nil {
  128. log.Debugf("Cannot send Snapshot Request to %s : %s", u, err)
  129. }
  130. if resp != nil {
  131. defer resp.Body.Close()
  132. t.CancelWhenTimeout(httpRequest)
  133. ssrsp := &raft.SnapshotResponse{}
  134. if _, err = ssrsp.Decode(resp.Body); err != nil && err != io.EOF {
  135. log.Warn("transporter.ss.decoding.error:", err)
  136. return nil
  137. }
  138. return ssrsp
  139. }
  140. return nil
  141. }
  142. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
  143. func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
  144. var b bytes.Buffer
  145. if _, err := req.Encode(&b); err != nil {
  146. log.Warn("transporter.ss.encoding.error:", err)
  147. return nil
  148. }
  149. u, _ := t.peerServer.registry.PeerURL(peer.Name)
  150. log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u)
  151. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
  152. if err != nil {
  153. log.Debugf("Cannot send Snapshot Recovery to %s : %s", u, err)
  154. }
  155. if resp != nil {
  156. defer resp.Body.Close()
  157. t.CancelWhenTimeout(httpRequest)
  158. ssrrsp := &raft.SnapshotRecoveryResponse{}
  159. if _, err = ssrrsp.Decode(resp.Body); err != nil && err != io.EOF {
  160. log.Warn("transporter.ssr.decoding.error:", err)
  161. return nil
  162. }
  163. return ssrrsp
  164. }
  165. return nil
  166. }
  167. // Send server side POST request
  168. func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
  169. req, _ := http.NewRequest("POST", urlStr, body)
  170. resp, err := t.client.Do(req)
  171. return resp, req, err
  172. }
  173. // Send server side GET request
  174. func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) {
  175. req, _ := http.NewRequest("GET", urlStr, nil)
  176. resp, err := t.client.Do(req)
  177. return resp, req, err
  178. }
  179. // Cancel the on fly HTTP transaction when timeout happens.
  180. func (t *transporter) CancelWhenTimeout(req *http.Request) {
  181. go func() {
  182. time.Sleep(t.tranTimeout)
  183. t.transport.CancelRequest(req)
  184. }()
  185. }