handlers.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. package main
  2. import (
  3. "encoding/json"
  4. "github.com/xiangli-cmu/go-raft"
  5. "net/http"
  6. //"fmt"
  7. "io/ioutil"
  8. //"bytes"
  9. "strconv"
  10. //"strings"
  11. "time"
  12. )
  13. //--------------------------------------
  14. // Internal HTTP Handlers via server port
  15. //--------------------------------------
  16. // Get all the current logs
  17. func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
  18. debug("[recv] GET http://%v/log", server.Name())
  19. w.Header().Set("Content-Type", "application/json")
  20. w.WriteHeader(http.StatusOK)
  21. json.NewEncoder(w).Encode(server.LogEntries())
  22. }
  23. func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
  24. rvreq := &raft.RequestVoteRequest{}
  25. err := decodeJsonRequest(req, rvreq)
  26. if err == nil {
  27. debug("[recv] POST http://%v/vote [%s]", server.Name(), rvreq.CandidateName)
  28. if resp, _ := server.RequestVote(rvreq); resp != nil {
  29. w.WriteHeader(http.StatusOK)
  30. json.NewEncoder(w).Encode(resp)
  31. return
  32. }
  33. }
  34. warn("[vote] ERROR: %v", err)
  35. w.WriteHeader(http.StatusInternalServerError)
  36. }
  37. func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
  38. aereq := &raft.AppendEntriesRequest{}
  39. err := decodeJsonRequest(req, aereq)
  40. if err == nil {
  41. debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries))
  42. if resp, _ := server.AppendEntries(aereq); resp != nil {
  43. w.WriteHeader(http.StatusOK)
  44. json.NewEncoder(w).Encode(resp)
  45. if !resp.Success {
  46. debug("[Append Entry] Step back")
  47. }
  48. return
  49. }
  50. }
  51. warn("[append] ERROR: %v", err)
  52. w.WriteHeader(http.StatusInternalServerError)
  53. }
  54. func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
  55. aereq := &raft.SnapshotRequest{}
  56. err := decodeJsonRequest(req, aereq)
  57. if err == nil {
  58. debug("[recv] POST http://%s/snapshot/ ", server.Name())
  59. if resp, _ := server.SnapshotRecovery(aereq); resp != nil {
  60. w.WriteHeader(http.StatusOK)
  61. json.NewEncoder(w).Encode(resp)
  62. return
  63. }
  64. }
  65. warn("[snapshot] ERROR: %v", err)
  66. w.WriteHeader(http.StatusInternalServerError)
  67. }
  68. func clientHttpHandler(w http.ResponseWriter, req *http.Request) {
  69. debug("[recv] Get http://%v/client/ ", server.Name())
  70. w.WriteHeader(http.StatusOK)
  71. client := address + ":" + strconv.Itoa(clientPort)
  72. w.Write([]byte(client))
  73. }
  74. func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
  75. command := &JoinCommand{}
  76. if err := decodeJsonRequest(req, command); err == nil {
  77. debug("Receive Join Request from %s", command.Name)
  78. excute(command, &w, req)
  79. } else {
  80. w.WriteHeader(http.StatusInternalServerError)
  81. return
  82. }
  83. }
  84. //--------------------------------------
  85. // external HTTP Handlers via client port
  86. //--------------------------------------
  87. func Multiplexer(w http.ResponseWriter, req *http.Request) {
  88. if req.Method == "GET" {
  89. GetHttpHandler(&w, req)
  90. } else if req.Method == "POST" {
  91. SetHttpHandler(&w, req)
  92. } else if req.Method == "DELETE" {
  93. DeleteHttpHandler(&w, req)
  94. } else {
  95. w.WriteHeader(http.StatusMethodNotAllowed)
  96. return
  97. }
  98. }
  99. func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
  100. key := req.URL.Path[len("/v1/keys/"):]
  101. debug("[recv] POST http://%v/v1/keys/%s", server.Name(), key)
  102. command := &SetCommand{}
  103. command.Key = key
  104. command.Value = req.FormValue("value")
  105. strDuration := req.FormValue("ttl")
  106. if strDuration != "" {
  107. duration, err := strconv.Atoi(strDuration)
  108. if err != nil {
  109. warn("raftd: Bad duration: %v", err)
  110. (*w).WriteHeader(http.StatusInternalServerError)
  111. return
  112. }
  113. command.ExpireTime = time.Now().Add(time.Second * (time.Duration)(duration))
  114. } else {
  115. command.ExpireTime = time.Unix(0, 0)
  116. }
  117. excute(command, w, req)
  118. }
  119. func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
  120. key := req.URL.Path[len("/v1/keys/"):]
  121. debug("[recv] DELETE http://%v/v1/keys/%s", server.Name(), key)
  122. command := &DeleteCommand{}
  123. command.Key = key
  124. excute(command, w, req)
  125. }
  126. func excute(c Command, w *http.ResponseWriter, req *http.Request) {
  127. if server.State() == "leader" {
  128. if body, err := server.Do(c); err != nil {
  129. warn("Commit failed %v", err)
  130. (*w).WriteHeader(http.StatusInternalServerError)
  131. return
  132. } else {
  133. (*w).WriteHeader(http.StatusOK)
  134. if body == nil {
  135. return
  136. }
  137. body, ok := body.([]byte)
  138. if !ok {
  139. panic("wrong type")
  140. }
  141. (*w).Write(body)
  142. return
  143. }
  144. } else {
  145. // current no leader
  146. if server.Leader() == "" {
  147. (*w).WriteHeader(http.StatusInternalServerError)
  148. return
  149. }
  150. // tell the client where is the leader
  151. debug("Redirect to the leader %s", server.Leader())
  152. path := req.URL.Path
  153. var scheme string
  154. if scheme = req.URL.Scheme; scheme == "" {
  155. scheme = "http://"
  156. }
  157. url := scheme + leaderClient() + path
  158. debug("redirect to %s", url)
  159. http.Redirect(*w, req, url, http.StatusTemporaryRedirect)
  160. return
  161. }
  162. (*w).WriteHeader(http.StatusInternalServerError)
  163. return
  164. }
  165. func MasterHttpHandler(w http.ResponseWriter, req *http.Request) {
  166. w.WriteHeader(http.StatusOK)
  167. w.Write([]byte(server.Leader()))
  168. }
  169. func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
  170. key := req.URL.Path[len("/v1/keys/"):]
  171. debug("[recv] GET http://%v/v1/keys/%s", server.Name(), key)
  172. command := &GetCommand{}
  173. command.Key = key
  174. if body, err := command.Apply(server); err != nil {
  175. warn("raftd: Unable to write file: %v", err)
  176. (*w).WriteHeader(http.StatusInternalServerError)
  177. return
  178. } else {
  179. (*w).WriteHeader(http.StatusOK)
  180. body, ok := body.([]byte)
  181. if !ok {
  182. panic("wrong type")
  183. }
  184. (*w).Write(body)
  185. return
  186. }
  187. }
  188. func ListHttpHandler(w http.ResponseWriter, req *http.Request) {
  189. prefix := req.URL.Path[len("/v1/list/"):]
  190. debug("[recv] GET http://%v/v1/list/%s", server.Name(), prefix)
  191. command := &ListCommand{}
  192. command.Prefix = prefix
  193. if body, err := command.Apply(server); err != nil {
  194. warn("raftd: Unable to write file: %v", err)
  195. w.WriteHeader(http.StatusInternalServerError)
  196. return
  197. } else {
  198. w.WriteHeader(http.StatusOK)
  199. body, ok := body.([]byte)
  200. if !ok {
  201. panic("wrong type")
  202. }
  203. w.Write(body)
  204. return
  205. }
  206. }
  207. func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
  208. key := req.URL.Path[len("/v1/watch/"):]
  209. command := &WatchCommand{}
  210. command.Key = key
  211. if req.Method == "GET" {
  212. debug("[recv] GET http://%v/watch/%s", server.Name(), key)
  213. command.SinceIndex = 0
  214. } else if req.Method == "POST" {
  215. debug("[recv] POST http://%v/watch/%s", server.Name(), key)
  216. content, err := ioutil.ReadAll(req.Body)
  217. sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
  218. if err != nil {
  219. w.WriteHeader(http.StatusBadRequest)
  220. }
  221. command.SinceIndex = sinceIndex
  222. } else {
  223. w.WriteHeader(http.StatusMethodNotAllowed)
  224. return
  225. }
  226. if body, err := command.Apply(server); err != nil {
  227. warn("raftd: Unable to write file: %v", err)
  228. w.WriteHeader(http.StatusInternalServerError)
  229. return
  230. } else {
  231. w.WriteHeader(http.StatusOK)
  232. body, ok := body.([]byte)
  233. if !ok {
  234. panic("wrong type")
  235. }
  236. w.Write(body)
  237. return
  238. }
  239. }