transporter.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package server
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "fmt"
  6. "io"
  7. "net"
  8. "net/http"
  9. "time"
  10. "github.com/coreos/etcd/third_party/github.com/coreos/raft"
  11. httpclient "github.com/coreos/etcd/third_party/github.com/mreiferson/go-httpclient"
  12. "github.com/coreos/etcd/log"
  13. )
  14. // Transporter layer for communication between raft nodes
  15. type transporter struct {
  16. requestTimeout time.Duration
  17. followersStats *raftFollowersStats
  18. serverStats *raftServerStats
  19. registry *Registry
  20. client *http.Client
  21. transport *httpclient.Transport
  22. }
  23. type dialer func(network, addr string) (net.Conn, error)
  24. // Create transporter using by raft server
  25. // Create http or https transporter based on
  26. // whether the user give the server cert and key
  27. func NewTransporter(followersStats *raftFollowersStats, serverStats *raftServerStats, registry *Registry, dialTimeout, requestTimeout, responseHeaderTimeout time.Duration) *transporter {
  28. tr := &httpclient.Transport{
  29. ResponseHeaderTimeout: responseHeaderTimeout,
  30. // This is a workaround for Transport.CancelRequest doesn't work on
  31. // HTTPS connections blocked. The patch for it is in progress,
  32. // and would be available in Go1.3
  33. // More: https://codereview.appspot.com/69280043/
  34. ConnectTimeout: dialTimeout,
  35. RequestTimeout: dialTimeout + responseHeaderTimeout,
  36. ReadWriteTimeout: responseHeaderTimeout,
  37. }
  38. t := transporter{
  39. client: &http.Client{Transport: tr},
  40. transport: tr,
  41. requestTimeout: requestTimeout,
  42. followersStats: followersStats,
  43. serverStats: serverStats,
  44. registry: registry,
  45. }
  46. return &t
  47. }
  48. func (t *transporter) SetTLSConfig(tlsConf tls.Config) {
  49. t.transport.TLSClientConfig = &tlsConf
  50. t.transport.DisableCompression = true
  51. }
  52. // Sends AppendEntries RPCs to a peer when the server is the leader.
  53. func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
  54. var b bytes.Buffer
  55. if _, err := req.Encode(&b); err != nil {
  56. log.Warn("transporter.ae.encoding.error:", err)
  57. return nil
  58. }
  59. size := b.Len()
  60. t.serverStats.SendAppendReq(size)
  61. u, _ := t.registry.PeerURL(peer.Name)
  62. log.Debugf("Send LogEntries to %s ", u)
  63. thisFollowerStats, ok := t.followersStats.Followers[peer.Name]
  64. if !ok { //this is the first time this follower has been seen
  65. thisFollowerStats = &raftFollowerStats{}
  66. thisFollowerStats.Latency.Minimum = 1 << 63
  67. t.followersStats.Followers[peer.Name] = thisFollowerStats
  68. }
  69. start := time.Now()
  70. resp, _, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
  71. end := time.Now()
  72. if err != nil {
  73. log.Debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
  74. if ok {
  75. thisFollowerStats.Fail()
  76. }
  77. return nil
  78. } else {
  79. if ok {
  80. thisFollowerStats.Succ(end.Sub(start))
  81. }
  82. }
  83. if resp != nil {
  84. defer resp.Body.Close()
  85. aeresp := &raft.AppendEntriesResponse{}
  86. if _, err = aeresp.Decode(resp.Body); err != nil && err != io.EOF {
  87. log.Warn("transporter.ae.decoding.error:", err)
  88. return nil
  89. }
  90. return aeresp
  91. }
  92. return nil
  93. }
  94. // Sends RequestVote RPCs to a peer when the server is the candidate.
  95. func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
  96. var b bytes.Buffer
  97. if _, err := req.Encode(&b); err != nil {
  98. log.Warn("transporter.vr.encoding.error:", err)
  99. return nil
  100. }
  101. u, _ := t.registry.PeerURL(peer.Name)
  102. log.Debugf("Send Vote from %s to %s", server.Name(), u)
  103. resp, _, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
  104. if err != nil {
  105. log.Debugf("Cannot send VoteRequest to %s : %s", u, err)
  106. }
  107. if resp != nil {
  108. defer resp.Body.Close()
  109. rvrsp := &raft.RequestVoteResponse{}
  110. if _, err = rvrsp.Decode(resp.Body); err != nil && err != io.EOF {
  111. log.Warn("transporter.vr.decoding.error:", err)
  112. return nil
  113. }
  114. return rvrsp
  115. }
  116. return nil
  117. }
  118. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
  119. func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
  120. var b bytes.Buffer
  121. if _, err := req.Encode(&b); err != nil {
  122. log.Warn("transporter.ss.encoding.error:", err)
  123. return nil
  124. }
  125. u, _ := t.registry.PeerURL(peer.Name)
  126. log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u)
  127. resp, _, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
  128. if err != nil {
  129. log.Debugf("Cannot send Snapshot Request to %s : %s", u, err)
  130. }
  131. if resp != nil {
  132. defer resp.Body.Close()
  133. ssrsp := &raft.SnapshotResponse{}
  134. if _, err = ssrsp.Decode(resp.Body); err != nil && err != io.EOF {
  135. log.Warn("transporter.ss.decoding.error:", err)
  136. return nil
  137. }
  138. return ssrsp
  139. }
  140. return nil
  141. }
  142. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
  143. func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
  144. var b bytes.Buffer
  145. if _, err := req.Encode(&b); err != nil {
  146. log.Warn("transporter.ss.encoding.error:", err)
  147. return nil
  148. }
  149. u, _ := t.registry.PeerURL(peer.Name)
  150. log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u)
  151. resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
  152. if err != nil {
  153. log.Debugf("Cannot send Snapshot Recovery to %s : %s", u, err)
  154. }
  155. if resp != nil {
  156. defer resp.Body.Close()
  157. ssrrsp := &raft.SnapshotRecoveryResponse{}
  158. if _, err = ssrrsp.Decode(resp.Body); err != nil && err != io.EOF {
  159. log.Warn("transporter.ssr.decoding.error:", err)
  160. return nil
  161. }
  162. return ssrrsp
  163. }
  164. return nil
  165. }
  166. // Send server side POST request
  167. func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
  168. req, _ := http.NewRequest("POST", urlStr, body)
  169. resp, err := t.client.Do(req)
  170. return resp, req, err
  171. }
  172. // Send server side GET request
  173. func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) {
  174. req, _ := http.NewRequest("GET", urlStr, nil)
  175. resp, err := t.client.Do(req)
  176. return resp, req, err
  177. }