http_transporter.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package raft
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. )
  8. // Parts from this transporter were heavily influenced by Peter Bougon's
  9. // raft implementation: https://github.com/peterbourgon/raft
  10. //------------------------------------------------------------------------------
  11. //
  12. // Typedefs
  13. //
  14. //------------------------------------------------------------------------------
  15. // An HTTPTransporter is a default transport layer used to communicate between
  16. // multiple servers.
  17. type HTTPTransporter struct {
  18. DisableKeepAlives bool
  19. prefix string
  20. appendEntriesPath string
  21. requestVotePath string
  22. }
  23. type HTTPMuxer interface {
  24. HandleFunc(string, func(http.ResponseWriter, *http.Request))
  25. }
  26. //------------------------------------------------------------------------------
  27. //
  28. // Constructor
  29. //
  30. //------------------------------------------------------------------------------
  31. // Creates a new HTTP transporter with the given path prefix.
  32. func NewHTTPTransporter(prefix string) *HTTPTransporter {
  33. return &HTTPTransporter{
  34. DisableKeepAlives: false,
  35. prefix: prefix,
  36. appendEntriesPath: fmt.Sprintf("%s%s", prefix, "/appendEntries"),
  37. requestVotePath: fmt.Sprintf("%s%s", prefix, "/requestVote"),
  38. }
  39. }
  40. //------------------------------------------------------------------------------
  41. //
  42. // Accessors
  43. //
  44. //------------------------------------------------------------------------------
  45. // Retrieves the path prefix used by the transporter.
  46. func (t *HTTPTransporter) Prefix() string {
  47. return t.prefix
  48. }
  49. // Retrieves the AppendEntries path.
  50. func (t *HTTPTransporter) AppendEntriesPath() string {
  51. return t.appendEntriesPath
  52. }
  53. // Retrieves the RequestVote path.
  54. func (t *HTTPTransporter) RequestVotePath() string {
  55. return t.requestVotePath
  56. }
  57. //------------------------------------------------------------------------------
  58. //
  59. // Methods
  60. //
  61. //------------------------------------------------------------------------------
  62. //--------------------------------------
  63. // Installation
  64. //--------------------------------------
  65. // Applies Raft routes to an HTTP router for a given server.
  66. func (t *HTTPTransporter) Install(server *Server, mux HTTPMuxer) {
  67. mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server))
  68. mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server))
  69. }
  70. //--------------------------------------
  71. // Outgoing
  72. //--------------------------------------
  73. // Sends an AppendEntries RPC to a peer.
  74. func (t *HTTPTransporter) SendAppendEntriesRequest(server *Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
  75. var b bytes.Buffer
  76. if _, err := req.encode(&b); err != nil {
  77. traceln("transporter.ae.encoding.error:", err)
  78. return nil
  79. }
  80. url := fmt.Sprintf("http://%s%s", peer.Name(), t.AppendEntriesPath())
  81. traceln(server.Name(), "POST", url)
  82. client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
  83. httpResp, err := client.Post(url, "application/protobuf", &b)
  84. if httpResp == nil || err != nil {
  85. traceln("transporter.ae.response.error:", err)
  86. return nil
  87. }
  88. defer httpResp.Body.Close()
  89. resp := &AppendEntriesResponse{}
  90. if _, err = resp.decode(httpResp.Body); err != nil && err != io.EOF {
  91. traceln("transporter.ae.decoding.error:", err)
  92. return nil
  93. }
  94. return resp
  95. }
  96. // Sends a RequestVote RPC to a peer.
  97. func (t *HTTPTransporter) SendVoteRequest(server *Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
  98. var b bytes.Buffer
  99. if _, err := req.encode(&b); err != nil {
  100. traceln("transporter.rv.encoding.error:", err)
  101. return nil
  102. }
  103. url := fmt.Sprintf("http://%s%s", peer.Name(), t.RequestVotePath())
  104. traceln(server.Name(), "POST", url)
  105. client := &http.Client{Transport: &http.Transport{DisableKeepAlives: t.DisableKeepAlives}}
  106. httpResp, err := client.Post(url, "application/protobuf", &b)
  107. if httpResp == nil || err != nil {
  108. traceln("transporter.rv.response.error:", err)
  109. return nil
  110. }
  111. defer httpResp.Body.Close()
  112. resp := &RequestVoteResponse{}
  113. if _, err = resp.decode(httpResp.Body); err != nil && err != io.EOF {
  114. traceln("transporter.rv.decoding.error:", err)
  115. return nil
  116. }
  117. return resp
  118. }
  119. // Sends a SnapshotRequest RPC to a peer.
  120. func (t *HTTPTransporter) SendSnapshotRequest(server *Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
  121. return nil
  122. }
  123. // Sends a SnapshotRequest RPC to a peer.
  124. func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server *Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  125. return nil
  126. }
  127. //--------------------------------------
  128. // Incoming
  129. //--------------------------------------
  130. // Handles incoming AppendEntries requests.
  131. func (t *HTTPTransporter) appendEntriesHandler(server *Server) http.HandlerFunc {
  132. return func(w http.ResponseWriter, r *http.Request) {
  133. traceln(server.Name(), "RECV /appendEntries")
  134. req := &AppendEntriesRequest{}
  135. if _, err := req.decode(r.Body); err != nil {
  136. http.Error(w, "", http.StatusBadRequest)
  137. return
  138. }
  139. resp := server.AppendEntries(req)
  140. if _, err := resp.encode(w); err != nil {
  141. http.Error(w, "", http.StatusInternalServerError)
  142. return
  143. }
  144. }
  145. }
  146. // Handles incoming RequestVote requests.
  147. func (t *HTTPTransporter) requestVoteHandler(server *Server) http.HandlerFunc {
  148. return func(w http.ResponseWriter, r *http.Request) {
  149. traceln(server.Name(), "RECV /requestVote")
  150. req := &RequestVoteRequest{}
  151. if _, err := req.decode(r.Body); err != nil {
  152. http.Error(w, "", http.StatusBadRequest)
  153. return
  154. }
  155. resp := server.RequestVote(req)
  156. if _, err := resp.encode(w); err != nil {
  157. http.Error(w, "", http.StatusInternalServerError)
  158. return
  159. }
  160. }
  161. }