client_handlers.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. package main
  2. import (
  3. "github.com/coreos/etcd/store"
  4. "net/http"
  5. "strconv"
  6. "time"
  7. )
  8. //-------------------------------------------------------------------
  9. // Handlers to handle etcd-store related request via raft client port
  10. //-------------------------------------------------------------------
  11. // Multiplex GET/POST/DELETE request to corresponding handlers
  12. func Multiplexer(w http.ResponseWriter, req *http.Request) {
  13. switch req.Method {
  14. case "GET":
  15. GetHttpHandler(&w, req)
  16. case "POST":
  17. SetHttpHandler(&w, req)
  18. case "DELETE":
  19. DeleteHttpHandler(&w, req)
  20. default:
  21. w.WriteHeader(http.StatusMethodNotAllowed)
  22. return
  23. }
  24. }
  25. //--------------------------------------
  26. // State sensitive handlers
  27. // Set/Delete will dispatch to leader
  28. //--------------------------------------
  29. // Set Command Handler
  30. func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
  31. key := req.URL.Path[len("/v1/keys/"):]
  32. if store.CheckKeyword(key) {
  33. (*w).WriteHeader(http.StatusBadRequest)
  34. (*w).Write(newJsonError(400, "Set"))
  35. return
  36. }
  37. debugf("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key)
  38. value := req.FormValue("value")
  39. if len(value) == 0 {
  40. (*w).WriteHeader(http.StatusBadRequest)
  41. (*w).Write(newJsonError(200, "Set"))
  42. return
  43. }
  44. prevValue := req.FormValue("prevValue")
  45. strDuration := req.FormValue("ttl")
  46. expireTime, err := durationToExpireTime(strDuration)
  47. if err != nil {
  48. (*w).WriteHeader(http.StatusBadRequest)
  49. (*w).Write(newJsonError(202, "Set"))
  50. return
  51. }
  52. if len(prevValue) != 0 {
  53. command := &TestAndSetCommand{
  54. Key: key,
  55. Value: value,
  56. PrevValue: prevValue,
  57. ExpireTime: expireTime,
  58. }
  59. dispatch(command, w, req, true)
  60. } else {
  61. command := &SetCommand{
  62. Key: key,
  63. Value: value,
  64. ExpireTime: expireTime,
  65. }
  66. dispatch(command, w, req, true)
  67. }
  68. }
  69. // Delete Handler
  70. func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
  71. key := req.URL.Path[len("/v1/keys/"):]
  72. debugf("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key)
  73. command := &DeleteCommand{
  74. Key: key,
  75. }
  76. dispatch(command, w, req, true)
  77. }
  78. // Dispatch the command to leader
  79. func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) {
  80. if raftServer.State() == "leader" {
  81. if body, err := raftServer.Do(c); err != nil {
  82. if _, ok := err.(store.NotFoundError); ok {
  83. (*w).WriteHeader(http.StatusNotFound)
  84. (*w).Write(newJsonError(100, err.Error()))
  85. return
  86. }
  87. if _, ok := err.(store.TestFail); ok {
  88. (*w).WriteHeader(http.StatusBadRequest)
  89. (*w).Write(newJsonError(101, err.Error()))
  90. return
  91. }
  92. if _, ok := err.(store.NotFile); ok {
  93. (*w).WriteHeader(http.StatusBadRequest)
  94. (*w).Write(newJsonError(102, err.Error()))
  95. return
  96. }
  97. if err.Error() == errors[103] {
  98. (*w).WriteHeader(http.StatusBadRequest)
  99. (*w).Write(newJsonError(103, ""))
  100. return
  101. }
  102. (*w).WriteHeader(http.StatusInternalServerError)
  103. (*w).Write(newJsonError(300, err.Error()))
  104. return
  105. } else {
  106. if body == nil {
  107. (*w).WriteHeader(http.StatusNotFound)
  108. (*w).Write(newJsonError(300, "Empty result from raft"))
  109. } else {
  110. body, ok := body.([]byte)
  111. // this should not happen
  112. if !ok {
  113. panic("wrong type")
  114. }
  115. (*w).WriteHeader(http.StatusOK)
  116. (*w).Write(body)
  117. }
  118. return
  119. }
  120. } else {
  121. // current no leader
  122. if raftServer.Leader() == "" {
  123. (*w).WriteHeader(http.StatusInternalServerError)
  124. (*w).Write(newJsonError(300, ""))
  125. return
  126. }
  127. // tell the client where is the leader
  128. path := req.URL.Path
  129. var scheme string
  130. if scheme = req.URL.Scheme; scheme == "" {
  131. scheme = "http://"
  132. }
  133. var url string
  134. if client {
  135. clientAddr, _ := getClientAddr(raftServer.Leader())
  136. url = scheme + clientAddr + path
  137. } else {
  138. url = scheme + raftServer.Leader() + path
  139. }
  140. debugf("Redirect to %s", url)
  141. http.Redirect(*w, req, url, http.StatusTemporaryRedirect)
  142. return
  143. }
  144. (*w).WriteHeader(http.StatusInternalServerError)
  145. (*w).Write(newJsonError(300, ""))
  146. return
  147. }
  148. //--------------------------------------
  149. // State non-sensitive handlers
  150. // will not dispatch to leader
  151. // TODO: add sensitive version for these
  152. // command?
  153. //--------------------------------------
  154. // Handler to return the current leader name
  155. func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
  156. leader := raftServer.Leader()
  157. if leader != "" {
  158. w.WriteHeader(http.StatusOK)
  159. w.Write([]byte(raftServer.Leader()))
  160. } else {
  161. // not likely, but it may happen
  162. w.WriteHeader(http.StatusInternalServerError)
  163. w.Write(newJsonError(301, ""))
  164. }
  165. }
  166. // Handler to return all the known machines in the current cluster
  167. func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
  168. peers := raftServer.Peers()
  169. // Add itself to the machine list first
  170. // Since peer map does not contain the server itself
  171. machines, _ := getClientAddr(raftServer.Name())
  172. // Add all peers to the list and separate by comma
  173. // We do not use json here since we accept machines list
  174. // in the command line separate by comma.
  175. for peerName, _ := range peers {
  176. if addr, ok := getClientAddr(peerName); ok {
  177. machines = machines + "," + addr
  178. }
  179. }
  180. w.WriteHeader(http.StatusOK)
  181. w.Write([]byte(machines))
  182. }
  183. // Handler to return the current version of etcd
  184. func VersionHttpHandler(w http.ResponseWriter, req *http.Request) {
  185. w.WriteHeader(http.StatusOK)
  186. w.Write([]byte(releaseVersion))
  187. }
  188. // Handler to return the basic stats of etcd
  189. func StatsHttpHandler(w http.ResponseWriter, req *http.Request) {
  190. w.WriteHeader(http.StatusOK)
  191. w.Write(etcdStore.Stats())
  192. }
  193. // Get Handler
  194. func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
  195. key := req.URL.Path[len("/v1/keys/"):]
  196. debugf("[recv] GET http://%v/v1/keys/%s", raftServer.Name(), key)
  197. command := &GetCommand{
  198. Key: key,
  199. }
  200. if body, err := command.Apply(raftServer); err != nil {
  201. if _, ok := err.(store.NotFoundError); ok {
  202. (*w).WriteHeader(http.StatusNotFound)
  203. (*w).Write(newJsonError(100, err.Error()))
  204. return
  205. }
  206. (*w).WriteHeader(http.StatusInternalServerError)
  207. (*w).Write(newJsonError(300, ""))
  208. } else {
  209. body, ok := body.([]byte)
  210. if !ok {
  211. panic("wrong type")
  212. }
  213. (*w).WriteHeader(http.StatusOK)
  214. (*w).Write(body)
  215. }
  216. }
  217. // Watch handler
  218. func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
  219. key := req.URL.Path[len("/v1/watch/"):]
  220. command := &WatchCommand{
  221. Key: key,
  222. }
  223. if req.Method == "GET" {
  224. debugf("[recv] GET http://%v/watch/%s", raftServer.Name(), key)
  225. command.SinceIndex = 0
  226. } else if req.Method == "POST" {
  227. // watch from a specific index
  228. debugf("[recv] POST http://%v/watch/%s", raftServer.Name(), key)
  229. content := req.FormValue("index")
  230. sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
  231. if err != nil {
  232. w.WriteHeader(http.StatusBadRequest)
  233. w.Write(newJsonError(203, "Watch From Index"))
  234. }
  235. command.SinceIndex = sinceIndex
  236. } else {
  237. w.WriteHeader(http.StatusMethodNotAllowed)
  238. return
  239. }
  240. if body, err := command.Apply(raftServer); err != nil {
  241. w.WriteHeader(http.StatusInternalServerError)
  242. w.Write(newJsonError(500, key))
  243. } else {
  244. w.WriteHeader(http.StatusOK)
  245. body, ok := body.([]byte)
  246. if !ok {
  247. panic("wrong type")
  248. }
  249. w.Write(body)
  250. }
  251. }
  252. // Convert string duration to time format
  253. func durationToExpireTime(strDuration string) (time.Time, error) {
  254. if strDuration != "" {
  255. duration, err := strconv.Atoi(strDuration)
  256. if err != nil {
  257. return time.Unix(0, 0), err
  258. }
  259. return time.Now().Add(time.Second * (time.Duration)(duration)), nil
  260. } else {
  261. return time.Unix(0, 0), nil
  262. }
  263. }