client_handlers.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273
  1. package main
  2. import (
  3. "net/http"
  4. "strconv"
  5. "time"
  6. )
  7. //-------------------------------------------------------------------
  8. // Handlers to handle etcd-store related request via raft client port
  9. //-------------------------------------------------------------------
  10. // Multiplex GET/POST/DELETE request to corresponding handlers
  11. func Multiplexer(w http.ResponseWriter, req *http.Request) {
  12. if req.Method == "GET" {
  13. GetHttpHandler(&w, req)
  14. } else if req.Method == "POST" {
  15. SetHttpHandler(&w, req)
  16. } else if req.Method == "DELETE" {
  17. DeleteHttpHandler(&w, req)
  18. } else {
  19. w.WriteHeader(http.StatusMethodNotAllowed)
  20. return
  21. }
  22. }
  23. //--------------------------------------
  24. // State sensitive handlers
  25. // Set/Delte will dispatch to leader
  26. //--------------------------------------
  27. // Set Command Handler
  28. func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
  29. key := req.URL.Path[len("/v1/keys/"):]
  30. debug("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key)
  31. command := &SetCommand{}
  32. command.Key = key
  33. command.Value = req.FormValue("value")
  34. strDuration := req.FormValue("ttl")
  35. var err error
  36. command.ExpireTime, err = durationToExpireTime(strDuration)
  37. if err != nil {
  38. warn("The given duration is not a number: %v", err)
  39. (*w).WriteHeader(http.StatusInternalServerError)
  40. }
  41. dispatch(command, w, req)
  42. }
  43. // TestAndSet handler
  44. func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
  45. key := req.URL.Path[len("/v1/testAndSet/"):]
  46. debug("[recv] POST http://%v/v1/testAndSet/%s", raftServer.Name(), key)
  47. command := &TestAndSetCommand{}
  48. command.Key = key
  49. command.PrevValue = req.FormValue("prevValue")
  50. command.Value = req.FormValue("value")
  51. strDuration := req.FormValue("ttl")
  52. var err error
  53. command.ExpireTime, err = durationToExpireTime(strDuration)
  54. if err != nil {
  55. warn("The given duration is not a number: %v", err)
  56. w.WriteHeader(http.StatusInternalServerError)
  57. }
  58. dispatch(command, &w, req)
  59. }
  60. // Delete Handler
  61. func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
  62. key := req.URL.Path[len("/v1/keys/"):]
  63. debug("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key)
  64. command := &DeleteCommand{}
  65. command.Key = key
  66. dispatch(command, w, req)
  67. }
  68. // Dispatch the command to leader
  69. func dispatch(c Command, w *http.ResponseWriter, req *http.Request) {
  70. if raftServer.State() == "leader" {
  71. if body, err := raftServer.Do(c); err != nil {
  72. warn("Commit failed %v", err)
  73. (*w).WriteHeader(http.StatusInternalServerError)
  74. return
  75. } else {
  76. (*w).WriteHeader(http.StatusOK)
  77. if body == nil {
  78. return
  79. }
  80. body, ok := body.([]byte)
  81. if !ok {
  82. panic("wrong type")
  83. }
  84. (*w).Write(body)
  85. return
  86. }
  87. } else {
  88. // current no leader
  89. if raftServer.Leader() == "" {
  90. (*w).WriteHeader(http.StatusInternalServerError)
  91. return
  92. }
  93. // tell the client where is the leader
  94. path := req.URL.Path
  95. var scheme string
  96. if scheme = req.URL.Scheme; scheme == "" {
  97. scheme = "http://"
  98. }
  99. url := scheme + raftTransporter.GetLeaderClientAddress() + path
  100. debug("Redirect to %s", url)
  101. http.Redirect(*w, req, url, http.StatusTemporaryRedirect)
  102. return
  103. }
  104. (*w).WriteHeader(http.StatusInternalServerError)
  105. return
  106. }
  107. //--------------------------------------
  108. // State non-sensitive handlers
  109. // will not dispatch to leader
  110. // TODO: add sensitive version for these
  111. // command?
  112. //--------------------------------------
  113. // Handler to return the current leader name
  114. func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
  115. w.WriteHeader(http.StatusOK)
  116. w.Write([]byte(raftServer.Leader()))
  117. }
  118. // Get Handler
  119. func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
  120. key := req.URL.Path[len("/v1/keys/"):]
  121. debug("[recv] GET http://%v/v1/keys/%s", raftServer.Name(), key)
  122. command := &GetCommand{}
  123. command.Key = key
  124. if body, err := command.Apply(raftServer); err != nil {
  125. warn("raftd: Unable to write file: %v", err)
  126. (*w).WriteHeader(http.StatusInternalServerError)
  127. return
  128. } else {
  129. (*w).WriteHeader(http.StatusOK)
  130. body, ok := body.([]byte)
  131. if !ok {
  132. panic("wrong type")
  133. }
  134. (*w).Write(body)
  135. return
  136. }
  137. }
  138. // List Handler
  139. func ListHttpHandler(w http.ResponseWriter, req *http.Request) {
  140. prefix := req.URL.Path[len("/v1/list/"):]
  141. debug("[recv] GET http://%v/v1/list/%s", raftServer.Name(), prefix)
  142. command := &ListCommand{}
  143. command.Prefix = prefix
  144. if body, err := command.Apply(raftServer); err != nil {
  145. warn("Unable to write file: %v", err)
  146. w.WriteHeader(http.StatusInternalServerError)
  147. return
  148. } else {
  149. w.WriteHeader(http.StatusOK)
  150. body, ok := body.([]byte)
  151. if !ok {
  152. panic("wrong type")
  153. }
  154. w.Write(body)
  155. return
  156. }
  157. }
  158. // Watch handler
  159. func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
  160. key := req.URL.Path[len("/v1/watch/"):]
  161. command := &WatchCommand{}
  162. command.Key = key
  163. if req.Method == "GET" {
  164. debug("[recv] GET http://%v/watch/%s", raftServer.Name(), key)
  165. command.SinceIndex = 0
  166. } else if req.Method == "POST" {
  167. // watch from a specific index
  168. debug("[recv] POST http://%v/watch/%s", raftServer.Name(), key)
  169. content := req.FormValue("index")
  170. sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
  171. if err != nil {
  172. w.WriteHeader(http.StatusBadRequest)
  173. }
  174. command.SinceIndex = sinceIndex
  175. } else {
  176. w.WriteHeader(http.StatusMethodNotAllowed)
  177. return
  178. }
  179. if body, err := command.Apply(raftServer); err != nil {
  180. warn("Unable to do watch command: %v", err)
  181. w.WriteHeader(http.StatusInternalServerError)
  182. return
  183. } else {
  184. w.WriteHeader(http.StatusOK)
  185. body, ok := body.([]byte)
  186. if !ok {
  187. panic("wrong type")
  188. }
  189. w.Write(body)
  190. return
  191. }
  192. }
  193. // Convert string duration to time format
  194. func durationToExpireTime(strDuration string) (time.Time, error) {
  195. if strDuration != "" {
  196. duration, err := strconv.Atoi(strDuration)
  197. if err != nil {
  198. return time.Unix(0, 0), err
  199. }
  200. return time.Now().Add(time.Second * (time.Duration)(duration)), nil
  201. } else {
  202. return time.Unix(0, 0), nil
  203. }
  204. }