client_handlers.go 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. package main
  2. import (
  3. "github.com/coreos/etcd/store"
  4. "net/http"
  5. "strconv"
  6. "time"
  7. )
  8. //-------------------------------------------------------------------
  9. // Handlers to handle etcd-store related request via raft client port
  10. //-------------------------------------------------------------------
  11. // Multiplex GET/POST/DELETE request to corresponding handlers
  12. func Multiplexer(w http.ResponseWriter, req *http.Request) {
  13. if req.Method == "GET" {
  14. GetHttpHandler(&w, req)
  15. } else if req.Method == "POST" {
  16. SetHttpHandler(&w, req)
  17. } else if req.Method == "DELETE" {
  18. DeleteHttpHandler(&w, req)
  19. } else {
  20. w.WriteHeader(http.StatusMethodNotAllowed)
  21. return
  22. }
  23. }
  24. //--------------------------------------
  25. // State sensitive handlers
  26. // Set/Delete will dispatch to leader
  27. //--------------------------------------
  28. // Set Command Handler
  29. func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
  30. key := req.URL.Path[len("/v1/keys/"):]
  31. if store.CheckKeyword(key) {
  32. (*w).WriteHeader(http.StatusBadRequest)
  33. (*w).Write(newJsonError(400, "Set"))
  34. return
  35. }
  36. debug("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key)
  37. value := req.FormValue("value")
  38. if len(value) == 0 {
  39. (*w).WriteHeader(http.StatusBadRequest)
  40. (*w).Write(newJsonError(200, "Set"))
  41. return
  42. }
  43. prevValue := req.FormValue("prevValue")
  44. strDuration := req.FormValue("ttl")
  45. expireTime, err := durationToExpireTime(strDuration)
  46. if err != nil {
  47. (*w).WriteHeader(http.StatusBadRequest)
  48. (*w).Write(newJsonError(202, "Set"))
  49. return
  50. }
  51. if len(prevValue) != 0 {
  52. command := &TestAndSetCommand{}
  53. command.Key = key
  54. command.Value = value
  55. command.PrevValue = prevValue
  56. command.ExpireTime = expireTime
  57. dispatch(command, w, req, true)
  58. } else {
  59. command := &SetCommand{}
  60. command.Key = key
  61. command.Value = value
  62. command.ExpireTime = expireTime
  63. dispatch(command, w, req, true)
  64. }
  65. }
  66. // Delete Handler
  67. func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
  68. key := req.URL.Path[len("/v1/keys/"):]
  69. debug("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key)
  70. command := &DeleteCommand{}
  71. command.Key = key
  72. dispatch(command, w, req, true)
  73. }
  74. // Dispatch the command to leader
  75. func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) {
  76. if raftServer.State() == "leader" {
  77. if body, err := raftServer.Do(c); err != nil {
  78. if _, ok := err.(store.NotFoundError); ok {
  79. http.NotFound((*w), req)
  80. return
  81. }
  82. if _, ok := err.(store.TestFail); ok {
  83. (*w).WriteHeader(http.StatusBadRequest)
  84. (*w).Write(newJsonError(101, err.Error()))
  85. return
  86. }
  87. if _, ok := err.(store.NotFile); ok {
  88. (*w).WriteHeader(http.StatusBadRequest)
  89. (*w).Write(newJsonError(102, err.Error()))
  90. return
  91. }
  92. if err.Error() == errors[103] {
  93. (*w).WriteHeader(http.StatusBadRequest)
  94. (*w).Write(newJsonError(103, ""))
  95. return
  96. }
  97. (*w).WriteHeader(http.StatusInternalServerError)
  98. (*w).Write(newJsonError(300, err.Error()))
  99. return
  100. } else {
  101. if body == nil {
  102. http.NotFound((*w), req)
  103. } else {
  104. body, ok := body.([]byte)
  105. // this should not happen
  106. if !ok {
  107. panic("wrong type")
  108. }
  109. (*w).WriteHeader(http.StatusOK)
  110. (*w).Write(body)
  111. }
  112. return
  113. }
  114. } else {
  115. // current no leader
  116. if raftServer.Leader() == "" {
  117. (*w).WriteHeader(http.StatusInternalServerError)
  118. (*w).Write(newJsonError(300, ""))
  119. return
  120. }
  121. // tell the client where is the leader
  122. path := req.URL.Path
  123. var scheme string
  124. if scheme = req.URL.Scheme; scheme == "" {
  125. scheme = "http://"
  126. }
  127. var url string
  128. if client {
  129. clientAddr, _ := getClientAddr(raftServer.Leader())
  130. url = scheme + clientAddr + path
  131. } else {
  132. url = scheme + raftServer.Leader() + path
  133. }
  134. debug("Redirect to %s", url)
  135. http.Redirect(*w, req, url, http.StatusTemporaryRedirect)
  136. return
  137. }
  138. (*w).WriteHeader(http.StatusInternalServerError)
  139. (*w).Write(newJsonError(300, ""))
  140. return
  141. }
  142. //--------------------------------------
  143. // State non-sensitive handlers
  144. // will not dispatch to leader
  145. // TODO: add sensitive version for these
  146. // command?
  147. //--------------------------------------
  148. // Handler to return the current leader name
  149. func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
  150. leader := raftServer.Leader()
  151. if leader != "" {
  152. w.WriteHeader(http.StatusOK)
  153. w.Write([]byte(raftServer.Leader()))
  154. } else {
  155. // not likely, but it may happen
  156. w.WriteHeader(http.StatusInternalServerError)
  157. w.Write(newJsonError(301, ""))
  158. }
  159. }
  160. // Handler to return all the known machines in the current cluster
  161. func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
  162. peers := raftServer.Peers()
  163. // Add itself to the machine list first
  164. // Since peer map does not contain the server itself
  165. machines, _ := getClientAddr(raftServer.Name())
  166. // Add all peers to the list and separate by comma
  167. // We do not use json here since we accept machines list
  168. // in the command line separate by comma.
  169. for peerName, _ := range peers {
  170. if addr, ok := getClientAddr(peerName); ok {
  171. machines = machines + "," + addr
  172. }
  173. }
  174. w.WriteHeader(http.StatusOK)
  175. w.Write([]byte(machines))
  176. }
  177. // Get Handler
  178. func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
  179. key := req.URL.Path[len("/v1/keys/"):]
  180. debug("[recv] GET http://%v/v1/keys/%s", raftServer.Name(), key)
  181. command := &GetCommand{}
  182. command.Key = key
  183. if body, err := command.Apply(raftServer); err != nil {
  184. if _, ok := err.(store.NotFoundError); ok {
  185. http.NotFound((*w), req)
  186. return
  187. }
  188. (*w).WriteHeader(http.StatusInternalServerError)
  189. (*w).Write(newJsonError(300, ""))
  190. } else {
  191. body, ok := body.([]byte)
  192. if !ok {
  193. panic("wrong type")
  194. }
  195. (*w).WriteHeader(http.StatusOK)
  196. (*w).Write(body)
  197. }
  198. }
  199. // Watch handler
  200. func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
  201. key := req.URL.Path[len("/v1/watch/"):]
  202. command := &WatchCommand{}
  203. command.Key = key
  204. if req.Method == "GET" {
  205. debug("[recv] GET http://%v/watch/%s", raftServer.Name(), key)
  206. command.SinceIndex = 0
  207. } else if req.Method == "POST" {
  208. // watch from a specific index
  209. debug("[recv] POST http://%v/watch/%s", raftServer.Name(), key)
  210. content := req.FormValue("index")
  211. sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
  212. if err != nil {
  213. w.WriteHeader(http.StatusBadRequest)
  214. w.Write(newJsonError(203, "Watch From Index"))
  215. }
  216. command.SinceIndex = sinceIndex
  217. } else {
  218. w.WriteHeader(http.StatusMethodNotAllowed)
  219. return
  220. }
  221. if body, err := command.Apply(raftServer); err != nil {
  222. warn("Unable to do watch command: %v", err)
  223. w.WriteHeader(http.StatusInternalServerError)
  224. } else {
  225. w.WriteHeader(http.StatusOK)
  226. body, ok := body.([]byte)
  227. if !ok {
  228. panic("wrong type")
  229. }
  230. w.Write(body)
  231. }
  232. }
  233. // Convert string duration to time format
  234. func durationToExpireTime(strDuration string) (time.Time, error) {
  235. if strDuration != "" {
  236. duration, err := strconv.Atoi(strDuration)
  237. if err != nil {
  238. return time.Unix(0, 0), err
  239. }
  240. return time.Now().Add(time.Second * (time.Duration)(duration)), nil
  241. } else {
  242. return time.Unix(0, 0), nil
  243. }
  244. }