transporter.go 4.8 KB


  1. package main
  2. import (
  3. "bytes"
  4. "crypto/tls"
  5. "encoding/json"
  6. "fmt"
  7. "github.com/coreos/go-raft"
  8. "io"
  9. "net"
  10. "net/http"
  11. "time"
  12. )
  13. // Transporter layer for communication between raft nodes
  14. type transporter struct {
  15. client *http.Client
  16. timeout time.Duration
  17. }
  18. // response struct
  19. type transporterResponse struct {
  20. resp *http.Response
  21. err error
  22. }
  23. // Create transporter using by raft server
  24. // Create http or https transporter based on
  25. // whether the user give the server cert and key
  26. func newTransporter(scheme string, tlsConf tls.Config, timeout time.Duration) *transporter {
  27. t := transporter{}
  28. tr := &http.Transport{
  29. Dial: dialTimeout,
  30. }
  31. if scheme == "https" {
  32. tr.TLSClientConfig = &tlsConf
  33. tr.DisableCompression = true
  34. }
  35. t.client = &http.Client{Transport: tr}
  36. t.timeout = timeout
  37. return &t
  38. }
  39. // Dial with timeout
  40. func dialTimeout(network, addr string) (net.Conn, error) {
  41. return net.DialTimeout(network, addr, HTTPTimeout)
  42. }
  43. // Sends AppendEntries RPCs to a peer when the server is the leader.
  44. func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
  45. var aersp *raft.AppendEntriesResponse
  46. var b bytes.Buffer
  47. json.NewEncoder(&b).Encode(req)
  48. u, _ := nameToRaftURL(peer.Name)
  49. debugf("Send LogEntries to %s ", u)
  50. resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
  51. if err != nil {
  52. debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
  53. }
  54. if resp != nil {
  55. defer resp.Body.Close()
  56. aersp = &raft.AppendEntriesResponse{}
  57. if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  58. return aersp
  59. }
  60. }
  61. return aersp
  62. }
  63. // Sends RequestVote RPCs to a peer when the server is the candidate.
  64. func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
  65. var rvrsp *raft.RequestVoteResponse
  66. var b bytes.Buffer
  67. json.NewEncoder(&b).Encode(req)
  68. u, _ := nameToRaftURL(peer.Name)
  69. debugf("Send Vote to %s", u)
  70. resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
  71. if err != nil {
  72. debugf("Cannot send VoteRequest to %s : %s", u, err)
  73. }
  74. if resp != nil {
  75. defer resp.Body.Close()
  76. rvrsp := &raft.RequestVoteResponse{}
  77. if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
  78. return rvrsp
  79. }
  80. }
  81. return rvrsp
  82. }
  83. // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
  84. func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
  85. var aersp *raft.SnapshotResponse
  86. var b bytes.Buffer
  87. json.NewEncoder(&b).Encode(req)
  88. u, _ := nameToRaftURL(peer.Name)
  89. debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
  90. req.LastTerm, req.LastIndex)
  91. resp, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
  92. if err != nil {
  93. debugf("Cannot send SendSnapshotRequest to %s : %s", u, err)
  94. }
  95. if resp != nil {
  96. defer resp.Body.Close()
  97. aersp = &raft.SnapshotResponse{}
  98. if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  99. return aersp
  100. }
  101. }
  102. return aersp
  103. }
  104. // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
  105. func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
  106. var aersp *raft.SnapshotRecoveryResponse
  107. var b bytes.Buffer
  108. json.NewEncoder(&b).Encode(req)
  109. u, _ := nameToRaftURL(peer.Name)
  110. debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
  111. req.LastTerm, req.LastIndex)
  112. resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
  113. if err != nil {
  114. debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err)
  115. }
  116. if resp != nil {
  117. defer resp.Body.Close()
  118. aersp = &raft.SnapshotRecoveryResponse{}
  119. if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
  120. return aersp
  121. }
  122. }
  123. return aersp
  124. }
  125. // Send server side POST request
  126. func (t *transporter) Post(path string, body io.Reader) (*http.Response, error) {
  127. c := make(chan *transporterResponse, 1)
  128. go func() {
  129. tr := new(transporterResponse)
  130. tr.resp, tr.err = t.client.Post(path, "application/json", body)
  131. c <- tr
  132. }()
  133. return t.waitResponse(c)
  134. }
  135. // Send server side GET request
  136. func (t *transporter) Get(path string) (*http.Response, error) {
  137. c := make(chan *transporterResponse, 1)
  138. go func() {
  139. tr := new(transporterResponse)
  140. tr.resp, tr.err = t.client.Get(path)
  141. c <- tr
  142. }()
  143. return t.waitResponse(c)
  144. }
  145. func (t *transporter) waitResponse(responseChan chan *transporterResponse) (*http.Response, error) {
  146. timeoutChan := time.After(t.timeout)
  147. select {
  148. case <-timeoutChan:
  149. return nil, fmt.Errorf("Wait Response Timeout: %v", t.timeout)
  150. case r := <-responseChan:
  151. return r.resp, r.err
  152. }
  153. // for complier
  154. return nil, nil
  155. }