transporter.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. package main
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "net"
  9. "net/http"
  10. "time"
  11. "github.com/coreos/go-raft"
  12. )
  13. // Transporter layer for communication between raft nodes
  14. type transporter struct {
  15. client *http.Client
  16. timeout time.Duration
  17. }
  18. // response struct
  19. type transporterResponse struct {
  20. resp *http.Response
  21. err error
  22. }
  23. // Create transporter using by raft server
  24. // Create http or https transporter based on
  25. // whether the user give the server cert and key
  26. func newTransporter(scheme string, tlsConf tls.Config, timeout time.Duration) *transporter {
  27. t := transporter{}
  28. tr := &http.Transport{
  29. Dial: dialTimeout,
  30. }
  31. if scheme == "https" {
  32. tr.TLSClientConfig = &tlsConf
  33. tr.DisableCompression = true
  34. }
  35. t.client = &http.Client{Transport: tr}
  36. t.timeout = timeout
  37. return &t
  38. }
  39. // Dial with timeout
  40. func dialTimeout(network, addr string) (net.Conn, error) {
  41. return net.DialTimeout(network, addr, HTTPTimeout)
  42. }
  43. // Sends AppendEntries RPCs to a peer when the server is the leader.
  44. func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
  45. var aersp *raft.AppendEntriesResponse
  46. var b bytes.Buffer
  47. json.NewEncoder(&b).Encode(req)
  48. size := b.Len()
  49. r.serverStats.SendAppendReq(size)
  50. u, _ := nameToRaftURL(peer.Name)
  51. debugf("Send LogEntries to %s ", u)
  52. thisPeerStats, ok := r.peersStats.Peers[peer.Name]
  53. start := time.Now()
  54. resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
  55. end := time.Now()
  56. if err != nil {
  57. debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
  58. if ok {
  59. thisPeerStats.Fail()
  60. }
  61. } else {
  62. if ok {
  63. thisPeerStats.Succ(end.Sub(start))
  64. }
  65. }
  66. r.peersStats.Peers[peer.Name] = thisPeerStats
  67. if resp != nil {
  68. defer resp.Body.Close()
  69. aersp = &raft.AppendEntriesResponse{}
  70. if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  71. return aersp
  72. }
  73. }
  74. return aersp
  75. }
  76. // Sends RequestVote RPCs to a peer when the server is the candidate.
  77. func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
  78. var rvrsp *raft.RequestVoteResponse
  79. var b bytes.Buffer
  80. json.NewEncoder(&b).Encode(req)
  81. u, _ := nameToRaftURL(peer.Name)
  82. debugf("Send Vote to %s", u)
  83. resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
  84. if err != nil {
  85. debugf("Cannot send VoteRequest to %s : %s", u, err)
  86. }
  87. if resp != nil {
  88. defer resp.Body.Close()
  89. rvrsp := &raft.RequestVoteResponse{}
  90. if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
  91. return rvrsp
  92. }
  93. }
  94. return rvrsp
  95. }
  96. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
  97. func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
  98. var aersp *raft.SnapshotResponse
  99. var b bytes.Buffer
  100. json.NewEncoder(&b).Encode(req)
  101. u, _ := nameToRaftURL(peer.Name)
  102. debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
  103. req.LastTerm, req.LastIndex)
  104. resp, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
  105. if err != nil {
  106. debugf("Cannot send SendSnapshotRequest to %s : %s", u, err)
  107. }
  108. if resp != nil {
  109. defer resp.Body.Close()
  110. aersp = &raft.SnapshotResponse{}
  111. if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  112. return aersp
  113. }
  114. }
  115. return aersp
  116. }
  117. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
  118. func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
  119. var aersp *raft.SnapshotRecoveryResponse
  120. var b bytes.Buffer
  121. json.NewEncoder(&b).Encode(req)
  122. u, _ := nameToRaftURL(peer.Name)
  123. debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
  124. req.LastTerm, req.LastIndex)
  125. resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
  126. if err != nil {
  127. debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err)
  128. }
  129. if resp != nil {
  130. defer resp.Body.Close()
  131. aersp = &raft.SnapshotRecoveryResponse{}
  132. if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  133. return aersp
  134. }
  135. }
  136. return aersp
  137. }
  138. // Send server side POST request
  139. func (t *transporter) Post(path string, body io.Reader) (*http.Response, error) {
  140. c := make(chan *transporterResponse, 1)
  141. go func() {
  142. tr := new(transporterResponse)
  143. tr.resp, tr.err = t.client.Post(path, "application/json", body)
  144. c <- tr
  145. }()
  146. return t.waitResponse(c)
  147. }
  148. // Send server side GET request
  149. func (t *transporter) Get(path string) (*http.Response, error) {
  150. c := make(chan *transporterResponse, 1)
  151. go func() {
  152. tr := new(transporterResponse)
  153. tr.resp, tr.err = t.client.Get(path)
  154. c <- tr
  155. }()
  156. return t.waitResponse(c)
  157. }
  158. func (t *transporter) waitResponse(responseChan chan *transporterResponse) (*http.Response, error) {
  159. timeoutChan := time.After(t.timeout)
  160. select {
  161. case <-timeoutChan:
  162. return nil, fmt.Errorf("Wait Response Timeout: %v", t.timeout)
  163. case r := <-responseChan:
  164. return r.resp, r.err
  165. }
  166. // for complier
  167. return nil, nil
  168. }