client_handlers.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. package main
  2. import (
  3. "github.com/coreos/etcd/store"
  4. "net/http"
  5. "strconv"
  6. "time"
  7. )
  8. //-------------------------------------------------------------------
  9. // Handlers to handle etcd-store related request via raft client port
  10. //-------------------------------------------------------------------
  11. // Multiplex GET/POST/DELETE request to corresponding handlers
  12. func Multiplexer(w http.ResponseWriter, req *http.Request) {
  13. if req.Method == "GET" {
  14. GetHttpHandler(&w, req)
  15. } else if req.Method == "POST" {
  16. SetHttpHandler(&w, req)
  17. } else if req.Method == "DELETE" {
  18. DeleteHttpHandler(&w, req)
  19. } else {
  20. w.WriteHeader(http.StatusMethodNotAllowed)
  21. return
  22. }
  23. }
  24. //--------------------------------------
  25. // State sensitive handlers
  26. // Set/Delete will dispatch to leader
  27. //--------------------------------------
  28. // Set Command Handler
  29. func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
  30. key := req.URL.Path[len("/v1/keys/"):]
  31. debug("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key)
  32. value := req.FormValue("value")
  33. if len(value) == 0 {
  34. (*w).WriteHeader(http.StatusBadRequest)
  35. (*w).Write(newJsonError(200, "Set"))
  36. return
  37. }
  38. prevValue := req.FormValue("prevValue")
  39. strDuration := req.FormValue("ttl")
  40. expireTime, err := durationToExpireTime(strDuration)
  41. if err != nil {
  42. (*w).WriteHeader(http.StatusBadRequest)
  43. (*w).Write(newJsonError(202, "Set"))
  44. }
  45. if len(prevValue) != 0 {
  46. command := &TestAndSetCommand{}
  47. command.Key = key
  48. command.Value = value
  49. command.PrevValue = prevValue
  50. command.ExpireTime = expireTime
  51. dispatch(command, w, req, true)
  52. } else {
  53. command := &SetCommand{}
  54. command.Key = key
  55. command.Value = value
  56. command.ExpireTime = expireTime
  57. dispatch(command, w, req, true)
  58. }
  59. }
  60. // Delete Handler
  61. func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
  62. key := req.URL.Path[len("/v1/keys/"):]
  63. debug("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key)
  64. command := &DeleteCommand{}
  65. command.Key = key
  66. dispatch(command, w, req, true)
  67. }
  68. // Dispatch the command to leader
  69. func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) {
  70. if raftServer.State() == "leader" {
  71. if body, err := raftServer.Do(c); err != nil {
  72. if _, ok := err.(store.NotFoundError); ok {
  73. http.NotFound((*w), req)
  74. return
  75. }
  76. if _, ok := err.(store.TestFail); ok {
  77. (*w).WriteHeader(http.StatusBadRequest)
  78. (*w).Write(newJsonError(101, err.Error()))
  79. return
  80. }
  81. if _, ok := err.(store.NotFile); ok {
  82. (*w).WriteHeader(http.StatusBadRequest)
  83. (*w).Write(newJsonError(102, err.Error()))
  84. return
  85. }
  86. (*w).WriteHeader(http.StatusInternalServerError)
  87. (*w).Write(newJsonError(300, err.Error()))
  88. return
  89. } else {
  90. if body == nil {
  91. http.NotFound((*w), req)
  92. } else {
  93. body, ok := body.([]byte)
  94. // this should not happen
  95. if !ok {
  96. panic("wrong type")
  97. }
  98. (*w).WriteHeader(http.StatusOK)
  99. (*w).Write(body)
  100. }
  101. return
  102. }
  103. } else {
  104. // current no leader
  105. if raftServer.Leader() == "" {
  106. (*w).WriteHeader(http.StatusInternalServerError)
  107. (*w).Write(newJsonError(300, ""))
  108. return
  109. }
  110. // tell the client where is the leader
  111. path := req.URL.Path
  112. var scheme string
  113. if scheme = req.URL.Scheme; scheme == "" {
  114. scheme = "http://"
  115. }
  116. var url string
  117. if client {
  118. clientAddr, _ := getClientAddr(raftServer.Leader())
  119. url = scheme + clientAddr + path
  120. } else {
  121. url = scheme + raftServer.Leader() + path
  122. }
  123. debug("Redirect to %s", url)
  124. http.Redirect(*w, req, url, http.StatusTemporaryRedirect)
  125. return
  126. }
  127. (*w).WriteHeader(http.StatusInternalServerError)
  128. (*w).Write(newJsonError(300, ""))
  129. return
  130. }
  131. //--------------------------------------
  132. // State non-sensitive handlers
  133. // will not dispatch to leader
  134. // TODO: add sensitive version for these
  135. // command?
  136. //--------------------------------------
  137. // Handler to return the current leader name
  138. func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
  139. leader := raftServer.Leader()
  140. if leader != "" {
  141. w.WriteHeader(http.StatusOK)
  142. w.Write([]byte(raftServer.Leader()))
  143. } else {
  144. // not likely, but it may happen
  145. w.WriteHeader(http.StatusInternalServerError)
  146. w.Write(newJsonError(301, ""))
  147. }
  148. }
  149. // Handler to return all the known machines in the current cluster
  150. func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
  151. peers := raftServer.Peers()
  152. // Add itself to the machine list first
  153. // Since peer map does not contain the server itself
  154. machines, _ := getClientAddr(raftServer.Name())
  155. // Add all peers to the list and separate by comma
  156. // We do not use json here since we accept machines list
  157. // in the command line separate by comma.
  158. for peerName, _ := range peers {
  159. if addr, ok := getClientAddr(peerName); ok {
  160. machines = machines + "," + addr
  161. }
  162. }
  163. w.WriteHeader(http.StatusOK)
  164. w.Write([]byte(machines))
  165. }
  166. // Get Handler
  167. func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
  168. key := req.URL.Path[len("/v1/keys/"):]
  169. debug("[recv] GET http://%v/v1/keys/%s", raftServer.Name(), key)
  170. command := &GetCommand{}
  171. command.Key = key
  172. if body, err := command.Apply(raftServer); err != nil {
  173. if _, ok := err.(store.NotFoundError); ok {
  174. http.NotFound((*w), req)
  175. return
  176. }
  177. (*w).WriteHeader(http.StatusInternalServerError)
  178. (*w).Write(newJsonError(300, ""))
  179. return
  180. } else {
  181. body, ok := body.([]byte)
  182. if !ok {
  183. panic("wrong type")
  184. }
  185. (*w).WriteHeader(http.StatusOK)
  186. (*w).Write(body)
  187. return
  188. }
  189. }
  190. // Watch handler
  191. func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
  192. key := req.URL.Path[len("/v1/watch/"):]
  193. command := &WatchCommand{}
  194. command.Key = key
  195. if req.Method == "GET" {
  196. debug("[recv] GET http://%v/watch/%s", raftServer.Name(), key)
  197. command.SinceIndex = 0
  198. } else if req.Method == "POST" {
  199. // watch from a specific index
  200. debug("[recv] POST http://%v/watch/%s", raftServer.Name(), key)
  201. content := req.FormValue("index")
  202. sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
  203. if err != nil {
  204. w.WriteHeader(http.StatusBadRequest)
  205. w.Write(newJsonError(203, "Watch From Index"))
  206. }
  207. command.SinceIndex = sinceIndex
  208. } else {
  209. w.WriteHeader(http.StatusMethodNotAllowed)
  210. return
  211. }
  212. if body, err := command.Apply(raftServer); err != nil {
  213. warn("Unable to do watch command: %v", err)
  214. w.WriteHeader(http.StatusInternalServerError)
  215. return
  216. } else {
  217. w.WriteHeader(http.StatusOK)
  218. body, ok := body.([]byte)
  219. if !ok {
  220. panic("wrong type")
  221. }
  222. w.Write(body)
  223. return
  224. }
  225. }
  226. // Convert string duration to time format
  227. func durationToExpireTime(strDuration string) (time.Time, error) {
  228. if strDuration != "" {
  229. duration, err := strconv.Atoi(strDuration)
  230. if err != nil {
  231. return time.Unix(0, 0), err
  232. }
  233. return time.Now().Add(time.Second * (time.Duration)(duration)), nil
  234. } else {
  235. return time.Unix(0, 0), nil
  236. }
  237. }