transporter.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. package server
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "time"
  10. "github.com/coreos/etcd/log"
  11. "github.com/coreos/raft"
  12. )
  13. // Transporter layer for communication between raft nodes
  14. type transporter struct {
  15. requestTimeout time.Duration
  16. followersStats *raftFollowersStats
  17. serverStats *raftServerStats
  18. registry *Registry
  19. client *http.Client
  20. transport *http.Transport
  21. }
  22. type dialer func(network, addr string) (net.Conn, error)
  23. // Create transporter using by raft server
  24. // Create http or https transporter based on
  25. // whether the user give the server cert and key
  26. func NewTransporter(followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter {
  27. tr := &http.Transport{
  28. Dial: func(network, addr string) (net.Conn, error) {
  29. return net.DialTimeout(network, addr, dialTimeout)
  30. },
  31. ResponseHeaderTimeout: responseHeaderTimeout,
  32. }
  33. t := transporter{
  34. client: &http.Client{Transport: tr},
  35. transport: tr,
  36. requestTimeout: requestTimeout,
  37. followersStats: followersStats,
  38. serverStats: serverStats,
  39. registry: registry,
  40. }
  41. return &t
  42. }
  43. func (t *transporter) SetTLSConfig(tlsConf tls.Config) {
  44. t.transport.TLSClientConfig = &tlsConf
  45. t.transport.DisableCompression = true
  46. }
  47. // Sends AppendEntries RPCs to a peer when the server is the leader.
  48. func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
  49. var b bytes.Buffer
  50. if _, err := req.Encode(&b); err != nil {
  51. log.Warn("transporter.ae.encoding.error:", err)
  52. return nil
  53. }
  54. size := b.Len()
  55. t.serverStats.SendAppendReq(size)
  56. u, _ := t.registry.PeerURL(peer.Name)
  57. log.Debugf("Send LogEntries to %s ", u)
  58. thisFollowerStats, ok := t.followersStats.Followers[peer.Name]
  59. if !ok { //this is the first time this follower has been seen
  60. thisFollowerStats = &raftFollowerStats{}
  61. thisFollowerStats.Latency.Minimum = 1 << 63
  62. t.followersStats.Followers[peer.Name] = thisFollowerStats
  63. }
  64. start := time.Now()
  65. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
  66. end := time.Now()
  67. if err != nil {
  68. log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
  69. if ok {
  70. thisFollowerStats.Fail()
  71. }
  72. return nil
  73. } else {
  74. if ok {
  75. thisFollowerStats.Succ(end.Sub(start))
  76. }
  77. }
  78. if resp != nil {
  79. defer resp.Body.Close()
  80. t.CancelWhenTimeout(httpRequest)
  81. aeresp := &raft.AppendEntriesResponse{}
  82. if _, err = aeresp.Decode(resp.Body); err != nil && err != io.EOF {
  83. log.Warn("transporter.ae.decoding.error:", err)
  84. return nil
  85. }
  86. return aeresp
  87. }
  88. return nil
  89. }
  90. // Sends RequestVote RPCs to a peer when the server is the candidate.
  91. func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
  92. var b bytes.Buffer
  93. if _, err := req.Encode(&b); err != nil {
  94. log.Warn("transporter.vr.encoding.error:", err)
  95. return nil
  96. }
  97. u, _ := t.registry.PeerURL(peer.Name)
  98. log.Debugf("Send Vote from %s to %s", server.Name(), u)
  99. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
  100. if err != nil {
  101. log.Debugf("Cannot send VoteRequest to %s : %s", u, err)
  102. }
  103. if resp != nil {
  104. defer resp.Body.Close()
  105. t.CancelWhenTimeout(httpRequest)
  106. rvrsp := &raft.RequestVoteResponse{}
  107. if _, err = rvrsp.Decode(resp.Body); err != nil && err != io.EOF {
  108. log.Warn("transporter.vr.decoding.error:", err)
  109. return nil
  110. }
  111. return rvrsp
  112. }
  113. return nil
  114. }
  115. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
  116. func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
  117. var b bytes.Buffer
  118. if _, err := req.Encode(&b); err != nil {
  119. log.Warn("transporter.ss.encoding.error:", err)
  120. return nil
  121. }
  122. u, _ := t.registry.PeerURL(peer.Name)
  123. log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u)
  124. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
  125. if err != nil {
  126. log.Debugf("Cannot send Snapshot Request to %s : %s", u, err)
  127. }
  128. if resp != nil {
  129. defer resp.Body.Close()
  130. t.CancelWhenTimeout(httpRequest)
  131. ssrsp := &raft.SnapshotResponse{}
  132. if _, err = ssrsp.Decode(resp.Body); err != nil && err != io.EOF {
  133. log.Warn("transporter.ss.decoding.error:", err)
  134. return nil
  135. }
  136. return ssrsp
  137. }
  138. return nil
  139. }
  140. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
  141. func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
  142. var b bytes.Buffer
  143. if _, err := req.Encode(&b); err != nil {
  144. log.Warn("transporter.ss.encoding.error:", err)
  145. return nil
  146. }
  147. u, _ := t.registry.PeerURL(peer.Name)
  148. log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u)
  149. resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
  150. if err != nil {
  151. log.Debugf("Cannot send Snapshot Recovery to %s : %s", u, err)
  152. }
  153. if resp != nil {
  154. defer resp.Body.Close()
  155. t.CancelWhenTimeout(httpRequest)
  156. ssrrsp := &raft.SnapshotRecoveryResponse{}
  157. if _, err = ssrrsp.Decode(resp.Body); err != nil && err != io.EOF {
  158. log.Warn("transporter.ssr.decoding.error:", err)
  159. return nil
  160. }
  161. return ssrrsp
  162. }
  163. return nil
  164. }
  165. // Send server side POST request
  166. func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
  167. req, _ := http.NewRequest("POST", urlStr, body)
  168. resp, err := t.client.Do(req)
  169. return resp, req, err
  170. }
  171. // Send server side GET request
  172. func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) {
  173. req, _ := http.NewRequest("GET", urlStr, nil)
  174. resp, err := t.client.Do(req)
  175. return resp, req, err
  176. }
  177. // Cancel the on fly HTTP transaction when timeout happens.
  178. func (t *transporter) CancelWhenTimeout(req *http.Request) {
  179. go func() {
  180. time.Sleep(t.requestTimeout)
  181. t.transport.CancelRequest(req)
  182. }()
  183. }