transporter.go 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package main
  2. import (
  3. "bytes"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/coreos/go-raft"
  7. "io"
  8. "net/http"
  9. )
  10. // Transporter layer for communication between raft nodes
  11. type transporter struct {
  12. client *http.Client
  13. // scheme
  14. scheme string
  15. }
  16. // Sends AppendEntries RPCs to a peer when the server is the leader.
  17. func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
  18. var aersp *raft.AppendEntriesResponse
  19. var b bytes.Buffer
  20. json.NewEncoder(&b).Encode(req)
  21. u, _ := nameToRaftURL(peer.Name())
  22. debugf("Send LogEntries to %s ", u)
  23. resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
  24. if err != nil {
  25. debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
  26. }
  27. if resp != nil {
  28. defer resp.Body.Close()
  29. aersp = &raft.AppendEntriesResponse{}
  30. if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  31. return aersp
  32. }
  33. }
  34. return aersp
  35. }
  36. // Sends RequestVote RPCs to a peer when the server is the candidate.
  37. func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
  38. var rvrsp *raft.RequestVoteResponse
  39. var b bytes.Buffer
  40. json.NewEncoder(&b).Encode(req)
  41. u, _ := nameToRaftURL(peer.Name())
  42. debugf("Send Vote to %s", u)
  43. resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
  44. if err != nil {
  45. debugf("Cannot send VoteRequest to %s : %s", u, err)
  46. }
  47. if resp != nil {
  48. defer resp.Body.Close()
  49. rvrsp := &raft.RequestVoteResponse{}
  50. if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
  51. return rvrsp
  52. }
  53. }
  54. return rvrsp
  55. }
  56. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
  57. func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
  58. var aersp *raft.SnapshotResponse
  59. var b bytes.Buffer
  60. json.NewEncoder(&b).Encode(req)
  61. u, _ := nameToRaftURL(peer.Name())
  62. debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
  63. req.LastTerm, req.LastIndex)
  64. resp, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
  65. if resp != nil {
  66. defer resp.Body.Close()
  67. aersp = &raft.SnapshotResponse{}
  68. if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  69. return aersp
  70. }
  71. }
  72. return aersp
  73. }
  74. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
  75. func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
  76. var aersp *raft.SnapshotRecoveryResponse
  77. var b bytes.Buffer
  78. json.NewEncoder(&b).Encode(req)
  79. u, _ := nameToRaftURL(peer.Name())
  80. debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
  81. req.LastTerm, req.LastIndex)
  82. resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
  83. if resp != nil {
  84. defer resp.Body.Close()
  85. aersp = &raft.SnapshotRecoveryResponse{}
  86. if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  87. return aersp
  88. }
  89. }
  90. return aersp
  91. }
  92. // Send server side POST request
  93. func (t transporter) Post(path string, body io.Reader) (*http.Response, error) {
  94. resp, err := t.client.Post(path, "application/json", body)
  95. return resp, err
  96. }
  97. // Send server side GET request
  98. func (t transporter) Get(path string) (*http.Response, error) {
  99. resp, err := t.client.Get(path)
  100. return resp, err
  101. }