client_handlers.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. package main
  2. import (
  3. "github.com/coreos/etcd/store"
  4. "net/http"
  5. "strconv"
  6. "time"
  7. )
  8. //-------------------------------------------------------------------
  9. // Handlers to handle etcd-store related request via raft client port
  10. //-------------------------------------------------------------------
  11. // Multiplex GET/POST/DELETE request to corresponding handlers
  12. func Multiplexer(w http.ResponseWriter, req *http.Request) {
  13. if req.Method == "GET" {
  14. GetHttpHandler(&w, req)
  15. } else if req.Method == "POST" {
  16. SetHttpHandler(&w, req)
  17. } else if req.Method == "DELETE" {
  18. DeleteHttpHandler(&w, req)
  19. } else {
  20. w.WriteHeader(http.StatusMethodNotAllowed)
  21. return
  22. }
  23. }
  24. //--------------------------------------
  25. // State sensitive handlers
  26. // Set/Delete will dispatch to leader
  27. //--------------------------------------
  28. // Set Command Handler
  29. func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
  30. key := req.URL.Path[len("/v1/keys/"):]
  31. debug("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key)
  32. value := req.FormValue("value")
  33. if len(value) == 0 {
  34. (*w).WriteHeader(http.StatusBadRequest)
  35. (*w).Write(newJsonError(200, "Set"))
  36. return
  37. }
  38. prevValue := req.FormValue("prevValue")
  39. strDuration := req.FormValue("ttl")
  40. expireTime, err := durationToExpireTime(strDuration)
  41. if err != nil {
  42. (*w).WriteHeader(http.StatusBadRequest)
  43. (*w).Write(newJsonError(202, "Set"))
  44. }
  45. if len(prevValue) != 0 {
  46. command := &TestAndSetCommand{}
  47. command.Key = key
  48. command.Value = value
  49. command.PrevValue = prevValue
  50. command.ExpireTime = expireTime
  51. dispatch(command, w, req, true)
  52. } else {
  53. command := &SetCommand{}
  54. command.Key = key
  55. command.Value = value
  56. command.ExpireTime = expireTime
  57. dispatch(command, w, req, true)
  58. }
  59. }
  60. // Delete Handler
  61. func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
  62. key := req.URL.Path[len("/v1/keys/"):]
  63. debug("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key)
  64. command := &DeleteCommand{}
  65. command.Key = key
  66. dispatch(command, w, req, true)
  67. }
  68. // Dispatch the command to leader
  69. func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) {
  70. if raftServer.State() == "leader" {
  71. if body, err := raftServer.Do(c); err != nil {
  72. if _, ok := err.(store.NotFoundError); ok {
  73. http.NotFound((*w), req)
  74. return
  75. }
  76. if _, ok := err.(store.TestFail); ok {
  77. (*w).WriteHeader(http.StatusBadRequest)
  78. (*w).Write(newJsonError(101, err.Error()))
  79. return
  80. }
  81. if _, ok := err.(store.NotFile); ok {
  82. (*w).WriteHeader(http.StatusBadRequest)
  83. (*w).Write(newJsonError(102, err.Error()))
  84. return
  85. }
  86. (*w).WriteHeader(http.StatusInternalServerError)
  87. (*w).Write(newJsonError(300, "No Leader"))
  88. return
  89. } else {
  90. if body == nil {
  91. http.NotFound((*w), req)
  92. } else {
  93. body, ok := body.([]byte)
  94. if !ok {
  95. panic("wrong type")
  96. }
  97. (*w).WriteHeader(http.StatusOK)
  98. (*w).Write(body)
  99. }
  100. return
  101. }
  102. } else {
  103. // current no leader
  104. if raftServer.Leader() == "" {
  105. (*w).WriteHeader(http.StatusInternalServerError)
  106. (*w).Write(newJsonError(300, ""))
  107. return
  108. }
  109. // tell the client where is the leader
  110. path := req.URL.Path
  111. var scheme string
  112. if scheme = req.URL.Scheme; scheme == "" {
  113. scheme = "http://"
  114. }
  115. var url string
  116. if client {
  117. clientAddr, _ := getClientAddr(raftServer.Leader())
  118. url = scheme + clientAddr + path
  119. } else {
  120. url = scheme + raftServer.Leader() + path
  121. }
  122. debug("Redirect to %s", url)
  123. http.Redirect(*w, req, url, http.StatusTemporaryRedirect)
  124. return
  125. }
  126. (*w).WriteHeader(http.StatusInternalServerError)
  127. (*w).Write(newJsonError(300, ""))
  128. return
  129. }
  130. //--------------------------------------
  131. // State non-sensitive handlers
  132. // will not dispatch to leader
  133. // TODO: add sensitive version for these
  134. // command?
  135. //--------------------------------------
  136. // Handler to return the current leader name
  137. func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
  138. leader := raftServer.Leader()
  139. if leader != "" {
  140. w.WriteHeader(http.StatusOK)
  141. w.Write([]byte(raftServer.Leader()))
  142. } else {
  143. // not likely, but it may happen
  144. w.WriteHeader(http.StatusInternalServerError)
  145. w.Write(newJsonError(301, ""))
  146. }
  147. }
  148. // Handler to return all the known machines in the current cluster
  149. func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
  150. peers := raftServer.Peers()
  151. // Add itself to the machine list first
  152. // Since peer map does not contain the server itself
  153. machines, _ := getClientAddr(raftServer.Name())
  154. // Add all peers to the list and separate by comma
  155. // We do not use json here since we accept machines list
  156. // in the command line separate by comma.
  157. for peerName, _ := range peers {
  158. if addr, ok := getClientAddr(peerName); ok {
  159. machines = machines + "," + addr
  160. }
  161. }
  162. w.WriteHeader(http.StatusOK)
  163. w.Write([]byte(machines))
  164. }
  165. // Get Handler
  166. func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
  167. key := req.URL.Path[len("/v1/keys/"):]
  168. debug("[recv] GET http://%v/v1/keys/%s", raftServer.Name(), key)
  169. command := &GetCommand{}
  170. command.Key = key
  171. if body, err := command.Apply(raftServer); err != nil {
  172. if _, ok := err.(store.NotFoundError); ok {
  173. http.NotFound((*w), req)
  174. return
  175. }
  176. (*w).WriteHeader(http.StatusInternalServerError)
  177. (*w).Write(newJsonError(300, ""))
  178. return
  179. } else {
  180. body, ok := body.([]byte)
  181. if !ok {
  182. panic("wrong type")
  183. }
  184. (*w).WriteHeader(http.StatusOK)
  185. (*w).Write(body)
  186. return
  187. }
  188. }
  189. // Watch handler
  190. func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
  191. key := req.URL.Path[len("/v1/watch/"):]
  192. command := &WatchCommand{}
  193. command.Key = key
  194. if req.Method == "GET" {
  195. debug("[recv] GET http://%v/watch/%s", raftServer.Name(), key)
  196. command.SinceIndex = 0
  197. } else if req.Method == "POST" {
  198. // watch from a specific index
  199. debug("[recv] POST http://%v/watch/%s", raftServer.Name(), key)
  200. content := req.FormValue("index")
  201. sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
  202. if err != nil {
  203. w.WriteHeader(http.StatusBadRequest)
  204. w.Write(newJsonError(203, "Watch From Index"))
  205. }
  206. command.SinceIndex = sinceIndex
  207. } else {
  208. w.WriteHeader(http.StatusMethodNotAllowed)
  209. return
  210. }
  211. if body, err := command.Apply(raftServer); err != nil {
  212. warn("Unable to do watch command: %v", err)
  213. w.WriteHeader(http.StatusInternalServerError)
  214. return
  215. } else {
  216. w.WriteHeader(http.StatusOK)
  217. body, ok := body.([]byte)
  218. if !ok {
  219. panic("wrong type")
  220. }
  221. w.Write(body)
  222. return
  223. }
  224. }
  225. // Convert string duration to time format
  226. func durationToExpireTime(strDuration string) (time.Time, error) {
  227. if strDuration != "" {
  228. duration, err := strconv.Atoi(strDuration)
  229. if err != nil {
  230. return time.Unix(0, 0), err
  231. }
  232. return time.Now().Add(time.Second * (time.Duration)(duration)), nil
  233. } else {
  234. return time.Unix(0, 0), nil
  235. }
  236. }