client_handlers.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. package main
  2. import (
  3. "net/http"
  4. "strconv"
  5. "time"
  6. )
  7. //-------------------------------------------------------------------
  8. // Handlers to handle etcd-store related request via raft client port
  9. //-------------------------------------------------------------------
  10. // Multiplex GET/POST/DELETE request to corresponding handlers
  11. func Multiplexer(w http.ResponseWriter, req *http.Request) {
  12. if req.Method == "GET" {
  13. GetHttpHandler(&w, req)
  14. } else if req.Method == "POST" {
  15. SetHttpHandler(&w, req)
  16. } else if req.Method == "DELETE" {
  17. DeleteHttpHandler(&w, req)
  18. } else {
  19. w.WriteHeader(http.StatusMethodNotAllowed)
  20. return
  21. }
  22. }
  23. //--------------------------------------
  24. // State sensitive handlers
  25. // Set/Delte will dispatch to leader
  26. //--------------------------------------
  27. // Set Command Handler
  28. func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
  29. key := req.URL.Path[len("/v1/keys/"):]
  30. debug("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key)
  31. command := &SetCommand{}
  32. command.Key = key
  33. command.Value = req.FormValue("value")
  34. strDuration := req.FormValue("ttl")
  35. var err error
  36. command.ExpireTime, err = durationToExpireTime(strDuration)
  37. if err != nil {
  38. warn("The given duration is not a number: %v", err)
  39. (*w).WriteHeader(http.StatusInternalServerError)
  40. }
  41. dispatch(command, w, req, true)
  42. }
  43. // TestAndSet handler
  44. func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
  45. key := req.URL.Path[len("/v1/testAndSet/"):]
  46. debug("[recv] POST http://%v/v1/testAndSet/%s", raftServer.Name(), key)
  47. command := &TestAndSetCommand{}
  48. command.Key = key
  49. command.PrevValue = req.FormValue("prevValue")
  50. command.Value = req.FormValue("value")
  51. strDuration := req.FormValue("ttl")
  52. var err error
  53. command.ExpireTime, err = durationToExpireTime(strDuration)
  54. if err != nil {
  55. warn("The given duration is not a number: %v", err)
  56. w.WriteHeader(http.StatusInternalServerError)
  57. }
  58. dispatch(command, &w, req, true)
  59. }
  60. // Delete Handler
  61. func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
  62. key := req.URL.Path[len("/v1/keys/"):]
  63. debug("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key)
  64. command := &DeleteCommand{}
  65. command.Key = key
  66. dispatch(command, w, req, true)
  67. }
  68. // Dispatch the command to leader
  69. func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) {
  70. if raftServer.State() == "leader" {
  71. if body, err := raftServer.Do(c); err != nil {
  72. warn("Commit failed %v", err)
  73. (*w).WriteHeader(http.StatusInternalServerError)
  74. return
  75. } else {
  76. (*w).WriteHeader(http.StatusOK)
  77. if body == nil {
  78. return
  79. }
  80. body, ok := body.([]byte)
  81. if !ok {
  82. panic("wrong type")
  83. }
  84. (*w).Write(body)
  85. return
  86. }
  87. } else {
  88. // current no leader
  89. if raftServer.Leader() == "" {
  90. (*w).WriteHeader(http.StatusInternalServerError)
  91. return
  92. }
  93. // tell the client where is the leader
  94. path := req.URL.Path
  95. var scheme string
  96. if scheme = req.URL.Scheme; scheme == "" {
  97. scheme = "http://"
  98. }
  99. var url string
  100. if client {
  101. url = scheme + raftTransporter.GetLeaderClientAddress() + path
  102. } else {
  103. url = scheme + raftServer.Leader() + path
  104. }
  105. debug("Redirect to %s", url)
  106. http.Redirect(*w, req, url, http.StatusTemporaryRedirect)
  107. return
  108. }
  109. (*w).WriteHeader(http.StatusInternalServerError)
  110. return
  111. }
  112. //--------------------------------------
  113. // State non-sensitive handlers
  114. // will not dispatch to leader
  115. // TODO: add sensitive version for these
  116. // command?
  117. //--------------------------------------
  118. // Handler to return the current leader name
  119. func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
  120. w.WriteHeader(http.StatusOK)
  121. w.Write([]byte(raftServer.Leader()))
  122. }
  123. // Get Handler
  124. func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
  125. key := req.URL.Path[len("/v1/keys/"):]
  126. debug("[recv] GET http://%v/v1/keys/%s", raftServer.Name(), key)
  127. command := &GetCommand{}
  128. command.Key = key
  129. if body, err := command.Apply(raftServer); err != nil {
  130. warn("raftd: Unable to write file: %v", err)
  131. (*w).WriteHeader(http.StatusInternalServerError)
  132. return
  133. } else {
  134. (*w).WriteHeader(http.StatusOK)
  135. body, ok := body.([]byte)
  136. if !ok {
  137. panic("wrong type")
  138. }
  139. (*w).Write(body)
  140. return
  141. }
  142. }
  143. // List Handler
  144. func ListHttpHandler(w http.ResponseWriter, req *http.Request) {
  145. prefix := req.URL.Path[len("/v1/list/"):]
  146. debug("[recv] GET http://%v/v1/list/%s", raftServer.Name(), prefix)
  147. command := &ListCommand{}
  148. command.Prefix = prefix
  149. if body, err := command.Apply(raftServer); err != nil {
  150. warn("Unable to write file: %v", err)
  151. w.WriteHeader(http.StatusInternalServerError)
  152. return
  153. } else {
  154. w.WriteHeader(http.StatusOK)
  155. body, ok := body.([]byte)
  156. if !ok {
  157. panic("wrong type")
  158. }
  159. w.Write(body)
  160. return
  161. }
  162. }
  163. // Watch handler
  164. func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
  165. key := req.URL.Path[len("/v1/watch/"):]
  166. command := &WatchCommand{}
  167. command.Key = key
  168. if req.Method == "GET" {
  169. debug("[recv] GET http://%v/watch/%s", raftServer.Name(), key)
  170. command.SinceIndex = 0
  171. } else if req.Method == "POST" {
  172. // watch from a specific index
  173. debug("[recv] POST http://%v/watch/%s", raftServer.Name(), key)
  174. content := req.FormValue("index")
  175. sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
  176. if err != nil {
  177. w.WriteHeader(http.StatusBadRequest)
  178. }
  179. command.SinceIndex = sinceIndex
  180. } else {
  181. w.WriteHeader(http.StatusMethodNotAllowed)
  182. return
  183. }
  184. if body, err := command.Apply(raftServer); err != nil {
  185. warn("Unable to do watch command: %v", err)
  186. w.WriteHeader(http.StatusInternalServerError)
  187. return
  188. } else {
  189. w.WriteHeader(http.StatusOK)
  190. body, ok := body.([]byte)
  191. if !ok {
  192. panic("wrong type")
  193. }
  194. w.Write(body)
  195. return
  196. }
  197. }
  198. // Convert string duration to time format
  199. func durationToExpireTime(strDuration string) (time.Time, error) {
  200. if strDuration != "" {
  201. duration, err := strconv.Atoi(strDuration)
  202. if err != nil {
  203. return time.Unix(0, 0), err
  204. }
  205. return time.Now().Add(time.Second * (time.Duration)(duration)), nil
  206. } else {
  207. return time.Unix(0, 0), nil
  208. }
  209. }