http_transporter.go 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. package raft
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "net/url"
  8. "path"
  9. )
  10. // Parts from this transporter were heavily influenced by Peter Bougon's
  11. // raft implementation: https://github.com/peterbourgon/raft
  12. //------------------------------------------------------------------------------
  13. //
  14. // Typedefs
  15. //
  16. //------------------------------------------------------------------------------
  17. // An HTTPTransporter is a default transport layer used to communicate between
  18. // multiple servers.
  19. type HTTPTransporter struct {
  20. DisableKeepAlives bool
  21. prefix string
  22. appendEntriesPath string
  23. requestVotePath string
  24. snapshotPath string
  25. snapshotRecoveryPath string
  26. httpClient http.Client
  27. Transport *http.Transport
  28. }
  29. type HTTPMuxer interface {
  30. HandleFunc(string, func(http.ResponseWriter, *http.Request))
  31. }
  32. //------------------------------------------------------------------------------
  33. //
  34. // Constructor
  35. //
  36. //------------------------------------------------------------------------------
  37. // Creates a new HTTP transporter with the given path prefix.
  38. func NewHTTPTransporter(prefix string) *HTTPTransporter {
  39. t := &HTTPTransporter{
  40. DisableKeepAlives: false,
  41. prefix: prefix,
  42. appendEntriesPath: joinPath(prefix, "/appendEntries"),
  43. requestVotePath: joinPath(prefix, "/requestVote"),
  44. snapshotPath: joinPath(prefix, "/snapshot"),
  45. snapshotRecoveryPath: joinPath(prefix, "/snapshotRecovery"),
  46. Transport: &http.Transport{DisableKeepAlives: false},
  47. }
  48. t.httpClient.Transport = t.Transport
  49. return t
  50. }
  51. //------------------------------------------------------------------------------
  52. //
  53. // Accessors
  54. //
  55. //------------------------------------------------------------------------------
  56. // Retrieves the path prefix used by the transporter.
  57. func (t *HTTPTransporter) Prefix() string {
  58. return t.prefix
  59. }
  60. // Retrieves the AppendEntries path.
  61. func (t *HTTPTransporter) AppendEntriesPath() string {
  62. return t.appendEntriesPath
  63. }
  64. // Retrieves the RequestVote path.
  65. func (t *HTTPTransporter) RequestVotePath() string {
  66. return t.requestVotePath
  67. }
  68. // Retrieves the Snapshot path.
  69. func (t *HTTPTransporter) SnapshotPath() string {
  70. return t.snapshotPath
  71. }
  72. // Retrieves the SnapshotRecovery path.
  73. func (t *HTTPTransporter) SnapshotRecoveryPath() string {
  74. return t.snapshotRecoveryPath
  75. }
  76. //------------------------------------------------------------------------------
  77. //
  78. // Methods
  79. //
  80. //------------------------------------------------------------------------------
  81. //--------------------------------------
  82. // Installation
  83. //--------------------------------------
  84. // Applies Raft routes to an HTTP router for a given server.
  85. func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer) {
  86. mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server))
  87. mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server))
  88. mux.HandleFunc(t.SnapshotPath(), t.snapshotHandler(server))
  89. mux.HandleFunc(t.SnapshotRecoveryPath(), t.snapshotRecoveryHandler(server))
  90. }
  91. //--------------------------------------
  92. // Outgoing
  93. //--------------------------------------
  94. // Sends an AppendEntries RPC to a peer.
  95. func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
  96. var b bytes.Buffer
  97. if _, err := req.Encode(&b); err != nil {
  98. traceln("transporter.ae.encoding.error:", err)
  99. return nil
  100. }
  101. url := joinPath(peer.ConnectionString, t.AppendEntriesPath())
  102. traceln(server.Name(), "POST", url)
  103. t.Transport.ResponseHeaderTimeout = server.ElectionTimeout()
  104. httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
  105. if httpResp == nil || err != nil {
  106. traceln("transporter.ae.response.error:", err)
  107. return nil
  108. }
  109. defer httpResp.Body.Close()
  110. resp := &AppendEntriesResponse{}
  111. if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
  112. traceln("transporter.ae.decoding.error:", err)
  113. return nil
  114. }
  115. return resp
  116. }
  117. // Sends a RequestVote RPC to a peer.
  118. func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
  119. var b bytes.Buffer
  120. if _, err := req.Encode(&b); err != nil {
  121. traceln("transporter.rv.encoding.error:", err)
  122. return nil
  123. }
  124. url := fmt.Sprintf("%s%s", peer.ConnectionString, t.RequestVotePath())
  125. traceln(server.Name(), "POST", url)
  126. httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
  127. if httpResp == nil || err != nil {
  128. traceln("transporter.rv.response.error:", err)
  129. return nil
  130. }
  131. defer httpResp.Body.Close()
  132. resp := &RequestVoteResponse{}
  133. if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
  134. traceln("transporter.rv.decoding.error:", err)
  135. return nil
  136. }
  137. return resp
  138. }
  139. func joinPath(connectionString, thePath string) string {
  140. u, err := url.Parse(connectionString)
  141. if err != nil {
  142. panic(err)
  143. }
  144. u.Path = path.Join(u.Path, thePath)
  145. return u.String()
  146. }
  147. // Sends a SnapshotRequest RPC to a peer.
  148. func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
  149. var b bytes.Buffer
  150. if _, err := req.Encode(&b); err != nil {
  151. traceln("transporter.rv.encoding.error:", err)
  152. return nil
  153. }
  154. url := joinPath(peer.ConnectionString, t.snapshotPath)
  155. traceln(server.Name(), "POST", url)
  156. httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
  157. if httpResp == nil || err != nil {
  158. traceln("transporter.rv.response.error:", err)
  159. return nil
  160. }
  161. defer httpResp.Body.Close()
  162. resp := &SnapshotResponse{}
  163. if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
  164. traceln("transporter.rv.decoding.error:", err)
  165. return nil
  166. }
  167. return resp
  168. }
  169. // Sends a SnapshotRequest RPC to a peer.
  170. func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  171. var b bytes.Buffer
  172. if _, err := req.Encode(&b); err != nil {
  173. traceln("transporter.rv.encoding.error:", err)
  174. return nil
  175. }
  176. url := joinPath(peer.ConnectionString, t.snapshotRecoveryPath)
  177. traceln(server.Name(), "POST", url)
  178. httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
  179. if httpResp == nil || err != nil {
  180. traceln("transporter.rv.response.error:", err)
  181. return nil
  182. }
  183. defer httpResp.Body.Close()
  184. resp := &SnapshotRecoveryResponse{}
  185. if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
  186. traceln("transporter.rv.decoding.error:", err)
  187. return nil
  188. }
  189. return resp
  190. }
  191. //--------------------------------------
  192. // Incoming
  193. //--------------------------------------
  194. // Handles incoming AppendEntries requests.
  195. func (t *HTTPTransporter) appendEntriesHandler(server Server) http.HandlerFunc {
  196. return func(w http.ResponseWriter, r *http.Request) {
  197. traceln(server.Name(), "RECV /appendEntries")
  198. req := &AppendEntriesRequest{}
  199. if _, err := req.Decode(r.Body); err != nil {
  200. http.Error(w, "", http.StatusBadRequest)
  201. return
  202. }
  203. resp := server.AppendEntries(req)
  204. if _, err := resp.Encode(w); err != nil {
  205. http.Error(w, "", http.StatusInternalServerError)
  206. return
  207. }
  208. }
  209. }
  210. // Handles incoming RequestVote requests.
  211. func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc {
  212. return func(w http.ResponseWriter, r *http.Request) {
  213. traceln(server.Name(), "RECV /requestVote")
  214. req := &RequestVoteRequest{}
  215. if _, err := req.Decode(r.Body); err != nil {
  216. http.Error(w, "", http.StatusBadRequest)
  217. return
  218. }
  219. resp := server.RequestVote(req)
  220. if _, err := resp.Encode(w); err != nil {
  221. http.Error(w, "", http.StatusInternalServerError)
  222. return
  223. }
  224. }
  225. }
  226. // Handles incoming Snapshot requests.
  227. func (t *HTTPTransporter) snapshotHandler(server Server) http.HandlerFunc {
  228. return func(w http.ResponseWriter, r *http.Request) {
  229. traceln(server.Name(), "RECV /snapshot")
  230. req := &SnapshotRequest{}
  231. if _, err := req.Decode(r.Body); err != nil {
  232. http.Error(w, "", http.StatusBadRequest)
  233. return
  234. }
  235. resp := server.RequestSnapshot(req)
  236. if _, err := resp.Encode(w); err != nil {
  237. http.Error(w, "", http.StatusInternalServerError)
  238. return
  239. }
  240. }
  241. }
  242. // Handles incoming SnapshotRecovery requests.
  243. func (t *HTTPTransporter) snapshotRecoveryHandler(server Server) http.HandlerFunc {
  244. return func(w http.ResponseWriter, r *http.Request) {
  245. traceln(server.Name(), "RECV /snapshotRecovery")
  246. req := &SnapshotRecoveryRequest{}
  247. if _, err := req.Decode(r.Body); err != nil {
  248. http.Error(w, "", http.StatusBadRequest)
  249. return
  250. }
  251. resp := server.SnapshotRecoveryRequest(req)
  252. if _, err := resp.Encode(w); err != nil {
  253. http.Error(w, "", http.StatusInternalServerError)
  254. return
  255. }
  256. }
  257. }