http_transporter.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. package raft
  2. import (
  3. "bytes"
  4. "fmt"
  5. "io"
  6. "net/http"
  7. "net/url"
  8. "path"
  9. "time"
  10. )
  11. // Parts from this transporter were heavily influenced by Peter Bougon's
  12. // raft implementation: https://github.com/peterbourgon/raft
  13. //------------------------------------------------------------------------------
  14. //
  15. // Typedefs
  16. //
  17. //------------------------------------------------------------------------------
  18. // An HTTPTransporter is a default transport layer used to communicate between
  19. // multiple servers.
  20. type HTTPTransporter struct {
  21. DisableKeepAlives bool
  22. prefix string
  23. appendEntriesPath string
  24. requestVotePath string
  25. snapshotPath string
  26. snapshotRecoveryPath string
  27. httpClient http.Client
  28. Transport *http.Transport
  29. }
  30. type HTTPMuxer interface {
  31. HandleFunc(string, func(http.ResponseWriter, *http.Request))
  32. }
  33. //------------------------------------------------------------------------------
  34. //
  35. // Constructor
  36. //
  37. //------------------------------------------------------------------------------
  38. // Creates a new HTTP transporter with the given path prefix.
  39. func NewHTTPTransporter(prefix string, timeout time.Duration) *HTTPTransporter {
  40. t := &HTTPTransporter{
  41. DisableKeepAlives: false,
  42. prefix: prefix,
  43. appendEntriesPath: joinPath(prefix, "/appendEntries"),
  44. requestVotePath: joinPath(prefix, "/requestVote"),
  45. snapshotPath: joinPath(prefix, "/snapshot"),
  46. snapshotRecoveryPath: joinPath(prefix, "/snapshotRecovery"),
  47. Transport: &http.Transport{DisableKeepAlives: false},
  48. }
  49. t.httpClient.Transport = t.Transport
  50. t.Transport.ResponseHeaderTimeout = timeout
  51. return t
  52. }
  53. //------------------------------------------------------------------------------
  54. //
  55. // Accessors
  56. //
  57. //------------------------------------------------------------------------------
  58. // Retrieves the path prefix used by the transporter.
  59. func (t *HTTPTransporter) Prefix() string {
  60. return t.prefix
  61. }
  62. // Retrieves the AppendEntries path.
  63. func (t *HTTPTransporter) AppendEntriesPath() string {
  64. return t.appendEntriesPath
  65. }
  66. // Retrieves the RequestVote path.
  67. func (t *HTTPTransporter) RequestVotePath() string {
  68. return t.requestVotePath
  69. }
  70. // Retrieves the Snapshot path.
  71. func (t *HTTPTransporter) SnapshotPath() string {
  72. return t.snapshotPath
  73. }
  74. // Retrieves the SnapshotRecovery path.
  75. func (t *HTTPTransporter) SnapshotRecoveryPath() string {
  76. return t.snapshotRecoveryPath
  77. }
  78. //------------------------------------------------------------------------------
  79. //
  80. // Methods
  81. //
  82. //------------------------------------------------------------------------------
  83. //--------------------------------------
  84. // Installation
  85. //--------------------------------------
  86. // Applies Raft routes to an HTTP router for a given server.
  87. func (t *HTTPTransporter) Install(server Server, mux HTTPMuxer) {
  88. mux.HandleFunc(t.AppendEntriesPath(), t.appendEntriesHandler(server))
  89. mux.HandleFunc(t.RequestVotePath(), t.requestVoteHandler(server))
  90. mux.HandleFunc(t.SnapshotPath(), t.snapshotHandler(server))
  91. mux.HandleFunc(t.SnapshotRecoveryPath(), t.snapshotRecoveryHandler(server))
  92. }
  93. //--------------------------------------
  94. // Outgoing
  95. //--------------------------------------
  96. // Sends an AppendEntries RPC to a peer.
  97. func (t *HTTPTransporter) SendAppendEntriesRequest(server Server, peer *Peer, req *AppendEntriesRequest) *AppendEntriesResponse {
  98. var b bytes.Buffer
  99. if _, err := req.Encode(&b); err != nil {
  100. traceln("transporter.ae.encoding.error:", err)
  101. return nil
  102. }
  103. url := joinPath(peer.ConnectionString, t.AppendEntriesPath())
  104. traceln(server.Name(), "POST", url)
  105. httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
  106. if httpResp == nil || err != nil {
  107. traceln("transporter.ae.response.error:", err)
  108. return nil
  109. }
  110. defer httpResp.Body.Close()
  111. resp := &AppendEntriesResponse{}
  112. if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
  113. traceln("transporter.ae.decoding.error:", err)
  114. return nil
  115. }
  116. return resp
  117. }
  118. // Sends a RequestVote RPC to a peer.
  119. func (t *HTTPTransporter) SendVoteRequest(server Server, peer *Peer, req *RequestVoteRequest) *RequestVoteResponse {
  120. var b bytes.Buffer
  121. if _, err := req.Encode(&b); err != nil {
  122. traceln("transporter.rv.encoding.error:", err)
  123. return nil
  124. }
  125. url := fmt.Sprintf("%s%s", peer.ConnectionString, t.RequestVotePath())
  126. traceln(server.Name(), "POST", url)
  127. httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
  128. if httpResp == nil || err != nil {
  129. traceln("transporter.rv.response.error:", err)
  130. return nil
  131. }
  132. defer httpResp.Body.Close()
  133. resp := &RequestVoteResponse{}
  134. if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
  135. traceln("transporter.rv.decoding.error:", err)
  136. return nil
  137. }
  138. return resp
  139. }
  140. func joinPath(connectionString, thePath string) string {
  141. u, err := url.Parse(connectionString)
  142. if err != nil {
  143. panic(err)
  144. }
  145. u.Path = path.Join(u.Path, thePath)
  146. return u.String()
  147. }
  148. // Sends a SnapshotRequest RPC to a peer.
  149. func (t *HTTPTransporter) SendSnapshotRequest(server Server, peer *Peer, req *SnapshotRequest) *SnapshotResponse {
  150. var b bytes.Buffer
  151. if _, err := req.Encode(&b); err != nil {
  152. traceln("transporter.rv.encoding.error:", err)
  153. return nil
  154. }
  155. url := joinPath(peer.ConnectionString, t.snapshotPath)
  156. traceln(server.Name(), "POST", url)
  157. httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
  158. if httpResp == nil || err != nil {
  159. traceln("transporter.rv.response.error:", err)
  160. return nil
  161. }
  162. defer httpResp.Body.Close()
  163. resp := &SnapshotResponse{}
  164. if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
  165. traceln("transporter.rv.decoding.error:", err)
  166. return nil
  167. }
  168. return resp
  169. }
  170. // Sends a SnapshotRequest RPC to a peer.
  171. func (t *HTTPTransporter) SendSnapshotRecoveryRequest(server Server, peer *Peer, req *SnapshotRecoveryRequest) *SnapshotRecoveryResponse {
  172. var b bytes.Buffer
  173. if _, err := req.Encode(&b); err != nil {
  174. traceln("transporter.rv.encoding.error:", err)
  175. return nil
  176. }
  177. url := joinPath(peer.ConnectionString, t.snapshotRecoveryPath)
  178. traceln(server.Name(), "POST", url)
  179. httpResp, err := t.httpClient.Post(url, "application/protobuf", &b)
  180. if httpResp == nil || err != nil {
  181. traceln("transporter.rv.response.error:", err)
  182. return nil
  183. }
  184. defer httpResp.Body.Close()
  185. resp := &SnapshotRecoveryResponse{}
  186. if _, err = resp.Decode(httpResp.Body); err != nil && err != io.EOF {
  187. traceln("transporter.rv.decoding.error:", err)
  188. return nil
  189. }
  190. return resp
  191. }
  192. //--------------------------------------
  193. // Incoming
  194. //--------------------------------------
  195. // Handles incoming AppendEntries requests.
  196. func (t *HTTPTransporter) appendEntriesHandler(server Server) http.HandlerFunc {
  197. return func(w http.ResponseWriter, r *http.Request) {
  198. traceln(server.Name(), "RECV /appendEntries")
  199. req := &AppendEntriesRequest{}
  200. if _, err := req.Decode(r.Body); err != nil {
  201. http.Error(w, "", http.StatusBadRequest)
  202. return
  203. }
  204. resp := server.AppendEntries(req)
  205. if resp == nil {
  206. http.Error(w, "Failed creating response.", http.StatusInternalServerError)
  207. return
  208. }
  209. if _, err := resp.Encode(w); err != nil {
  210. http.Error(w, "", http.StatusInternalServerError)
  211. return
  212. }
  213. }
  214. }
  215. // Handles incoming RequestVote requests.
  216. func (t *HTTPTransporter) requestVoteHandler(server Server) http.HandlerFunc {
  217. return func(w http.ResponseWriter, r *http.Request) {
  218. traceln(server.Name(), "RECV /requestVote")
  219. req := &RequestVoteRequest{}
  220. if _, err := req.Decode(r.Body); err != nil {
  221. http.Error(w, "", http.StatusBadRequest)
  222. return
  223. }
  224. resp := server.RequestVote(req)
  225. if resp == nil {
  226. http.Error(w, "Failed creating response.", http.StatusInternalServerError)
  227. return
  228. }
  229. if _, err := resp.Encode(w); err != nil {
  230. http.Error(w, "", http.StatusInternalServerError)
  231. return
  232. }
  233. }
  234. }
  235. // Handles incoming Snapshot requests.
  236. func (t *HTTPTransporter) snapshotHandler(server Server) http.HandlerFunc {
  237. return func(w http.ResponseWriter, r *http.Request) {
  238. traceln(server.Name(), "RECV /snapshot")
  239. req := &SnapshotRequest{}
  240. if _, err := req.Decode(r.Body); err != nil {
  241. http.Error(w, "", http.StatusBadRequest)
  242. return
  243. }
  244. resp := server.RequestSnapshot(req)
  245. if resp == nil {
  246. http.Error(w, "Failed creating response.", http.StatusInternalServerError)
  247. return
  248. }
  249. if _, err := resp.Encode(w); err != nil {
  250. http.Error(w, "", http.StatusInternalServerError)
  251. return
  252. }
  253. }
  254. }
  255. // Handles incoming SnapshotRecovery requests.
  256. func (t *HTTPTransporter) snapshotRecoveryHandler(server Server) http.HandlerFunc {
  257. return func(w http.ResponseWriter, r *http.Request) {
  258. traceln(server.Name(), "RECV /snapshotRecovery")
  259. req := &SnapshotRecoveryRequest{}
  260. if _, err := req.Decode(r.Body); err != nil {
  261. http.Error(w, "", http.StatusBadRequest)
  262. return
  263. }
  264. resp := server.SnapshotRecoveryRequest(req)
  265. if resp == nil {
  266. http.Error(w, "Failed creating response.", http.StatusInternalServerError)
  267. return
  268. }
  269. if _, err := resp.Encode(w); err != nil {
  270. http.Error(w, "", http.StatusInternalServerError)
  271. return
  272. }
  273. }
  274. }