transporter.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. package main
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "io"
  8. "net"
  9. "net/http"
  10. "time"
  11. "github.com/coreos/go-raft"
  12. )
  13. // Transporter layer for communication between raft nodes
  14. type transporter struct {
  15. client *http.Client
  16. }
  17. // Create transporter using by raft server
  18. // Create http or https transporter based on
  19. // whether the user give the server cert and key
  20. func newTransporter(scheme string, tlsConf tls.Config) transporter {
  21. t := transporter{}
  22. tr := &http.Transport{
  23. Dial: dialTimeout,
  24. }
  25. if scheme == "https" {
  26. tr.TLSClientConfig = &tlsConf
  27. tr.DisableCompression = true
  28. }
  29. t.client = &http.Client{Transport: tr}
  30. return t
  31. }
  32. // Dial with timeout
  33. func dialTimeout(network, addr string) (net.Conn, error) {
  34. return net.DialTimeout(network, addr, HTTPTimeout)
  35. }
  36. // Sends AppendEntries RPCs to a peer when the server is the leader.
  37. func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
  38. var aersp *raft.AppendEntriesResponse
  39. var b bytes.Buffer
  40. r.serverStats.SendAppendReq()
  41. json.NewEncoder(&b).Encode(req)
  42. u, _ := nameToRaftURL(peer.Name)
  43. debugf("Send LogEntries to %s ", u)
  44. thisPeerStats, ok := r.peersStats[peer.Name]
  45. start := time.Now()
  46. resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
  47. end := time.Now()
  48. if err != nil {
  49. debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
  50. if ok {
  51. thisPeerStats.Fail()
  52. }
  53. } else {
  54. if ok {
  55. thisPeerStats.Succ(end.Sub(start))
  56. }
  57. }
  58. r.peersStats[peer.Name] = thisPeerStats
  59. if resp != nil {
  60. defer resp.Body.Close()
  61. aersp = &raft.AppendEntriesResponse{}
  62. if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  63. return aersp
  64. }
  65. }
  66. return aersp
  67. }
  68. // Sends RequestVote RPCs to a peer when the server is the candidate.
  69. func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
  70. var rvrsp *raft.RequestVoteResponse
  71. var b bytes.Buffer
  72. json.NewEncoder(&b).Encode(req)
  73. u, _ := nameToRaftURL(peer.Name)
  74. debugf("Send Vote to %s", u)
  75. resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
  76. if err != nil {
  77. debugf("Cannot send VoteRequest to %s : %s", u, err)
  78. }
  79. if resp != nil {
  80. defer resp.Body.Close()
  81. rvrsp := &raft.RequestVoteResponse{}
  82. if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
  83. return rvrsp
  84. }
  85. }
  86. return rvrsp
  87. }
  88. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
  89. func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
  90. var aersp *raft.SnapshotResponse
  91. var b bytes.Buffer
  92. json.NewEncoder(&b).Encode(req)
  93. u, _ := nameToRaftURL(peer.Name)
  94. debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
  95. req.LastTerm, req.LastIndex)
  96. resp, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
  97. if err != nil {
  98. debugf("Cannot send SendSnapshotRequest to %s : %s", u, err)
  99. }
  100. if resp != nil {
  101. defer resp.Body.Close()
  102. aersp = &raft.SnapshotResponse{}
  103. if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  104. return aersp
  105. }
  106. }
  107. return aersp
  108. }
  109. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
  110. func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
  111. var aersp *raft.SnapshotRecoveryResponse
  112. var b bytes.Buffer
  113. json.NewEncoder(&b).Encode(req)
  114. u, _ := nameToRaftURL(peer.Name)
  115. debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
  116. req.LastTerm, req.LastIndex)
  117. resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
  118. if err != nil {
  119. debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err)
  120. }
  121. if resp != nil {
  122. defer resp.Body.Close()
  123. aersp = &raft.SnapshotRecoveryResponse{}
  124. if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  125. return aersp
  126. }
  127. }
  128. return aersp
  129. }
  130. // Send server side POST request
  131. func (t transporter) Post(path string, body io.Reader) (*http.Response, error) {
  132. return t.client.Post(path, "application/json", body)
  133. }
  134. // Send server side GET request
  135. func (t transporter) Get(path string) (*http.Response, error) {
  136. return t.client.Get(path)
  137. }