transporter.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. package main
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "net"
  9. "net/http"
  10. "time"
  11. "github.com/coreos/go-raft"
  12. )
  13. // Transporter layer for communication between raft nodes
  14. type transporter struct {
  15. client *http.Client
  16. timeout time.Duration
  17. }
  18. // response struct
  19. type transporterResponse struct {
  20. resp *http.Response
  21. err error
  22. }
  23. // Create transporter using by raft server
  24. // Create http or https transporter based on
  25. // whether the user give the server cert and key
  26. func newTransporter(scheme string, tlsConf tls.Config, timeout time.Duration) *transporter {
  27. t := transporter{}
  28. tr := &http.Transport{
  29. Dial: dialTimeout,
  30. }
  31. if scheme == "https" {
  32. tr.TLSClientConfig = &tlsConf
  33. tr.DisableCompression = true
  34. }
  35. t.client = &http.Client{Transport: tr}
  36. t.timeout = timeout
  37. return &t
  38. }
  39. // Dial with timeout
  40. func dialTimeout(network, addr string) (net.Conn, error) {
  41. return net.DialTimeout(network, addr, HTTPTimeout)
  42. }
  43. // Sends AppendEntries RPCs to a peer when the server is the leader.
  44. func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
  45. var aersp *raft.AppendEntriesResponse
  46. var b bytes.Buffer
  47. json.NewEncoder(&b).Encode(req)
  48. size := b.Len()
  49. r.serverStats.SendAppendReq(size)
  50. u, _ := nameToRaftURL(peer.Name)
  51. debugf("Send LogEntries to %s ", u)
  52. thisFollowerStats, ok := r.followersStats.Followers[peer.Name]
  53. if !ok { //this is the first time this follower has been seen
  54. thisFollowerStats = &raftFollowerStats{}
  55. thisFollowerStats.Latency.Minimum = 1 << 63
  56. r.followersStats.Followers[peer.Name] = thisFollowerStats
  57. }
  58. start := time.Now()
  59. resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
  60. end := time.Now()
  61. if err != nil {
  62. debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
  63. if ok {
  64. thisFollowerStats.Fail()
  65. }
  66. } else {
  67. if ok {
  68. thisFollowerStats.Succ(end.Sub(start))
  69. }
  70. }
  71. if resp != nil {
  72. defer resp.Body.Close()
  73. aersp = &raft.AppendEntriesResponse{}
  74. if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  75. return aersp
  76. }
  77. }
  78. return aersp
  79. }
  80. // Sends RequestVote RPCs to a peer when the server is the candidate.
  81. func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
  82. var rvrsp *raft.RequestVoteResponse
  83. var b bytes.Buffer
  84. json.NewEncoder(&b).Encode(req)
  85. u, _ := nameToRaftURL(peer.Name)
  86. debugf("Send Vote to %s", u)
  87. resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
  88. if err != nil {
  89. debugf("Cannot send VoteRequest to %s : %s", u, err)
  90. }
  91. if resp != nil {
  92. defer resp.Body.Close()
  93. rvrsp := &raft.RequestVoteResponse{}
  94. if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
  95. return rvrsp
  96. }
  97. }
  98. return rvrsp
  99. }
  100. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
  101. func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
  102. var aersp *raft.SnapshotResponse
  103. var b bytes.Buffer
  104. json.NewEncoder(&b).Encode(req)
  105. u, _ := nameToRaftURL(peer.Name)
  106. debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
  107. req.LastTerm, req.LastIndex)
  108. resp, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
  109. if err != nil {
  110. debugf("Cannot send SendSnapshotRequest to %s : %s", u, err)
  111. }
  112. if resp != nil {
  113. defer resp.Body.Close()
  114. aersp = &raft.SnapshotResponse{}
  115. if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  116. return aersp
  117. }
  118. }
  119. return aersp
  120. }
  121. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
  122. func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
  123. var aersp *raft.SnapshotRecoveryResponse
  124. var b bytes.Buffer
  125. json.NewEncoder(&b).Encode(req)
  126. u, _ := nameToRaftURL(peer.Name)
  127. debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
  128. req.LastTerm, req.LastIndex)
  129. resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
  130. if err != nil {
  131. debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err)
  132. }
  133. if resp != nil {
  134. defer resp.Body.Close()
  135. aersp = &raft.SnapshotRecoveryResponse{}
  136. if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  137. return aersp
  138. }
  139. }
  140. return aersp
  141. }
  142. // Send server side POST request
  143. func (t *transporter) Post(path string, body io.Reader) (*http.Response, error) {
  144. c := make(chan *transporterResponse, 1)
  145. go func() {
  146. tr := new(transporterResponse)
  147. tr.resp, tr.err = t.client.Post(path, "application/json", body)
  148. c <- tr
  149. }()
  150. return t.waitResponse(c)
  151. }
  152. // Send server side GET request
  153. func (t *transporter) Get(path string) (*http.Response, error) {
  154. c := make(chan *transporterResponse, 1)
  155. go func() {
  156. tr := new(transporterResponse)
  157. tr.resp, tr.err = t.client.Get(path)
  158. c <- tr
  159. }()
  160. return t.waitResponse(c)
  161. }
  162. func (t *transporter) waitResponse(responseChan chan *transporterResponse) (*http.Response, error) {
  163. timeoutChan := time.After(t.timeout * 10)
  164. select {
  165. case <-timeoutChan:
  166. return nil, fmt.Errorf("Wait Response Timeout: %v", t.timeout)
  167. case r := <-responseChan:
  168. return r.resp, r.err
  169. }
  170. // for complier
  171. return nil, nil
  172. }