client_handlers.go 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. package main
  2. import (
  3. "fmt"
  4. "github.com/coreos/etcd/store"
  5. "net/http"
  6. "strconv"
  7. "time"
  8. )
  9. //-------------------------------------------------------------------
  10. // Handlers to handle etcd-store related request via raft client port
  11. //-------------------------------------------------------------------
  12. // Multiplex GET/POST/DELETE request to corresponding handlers
  13. func Multiplexer(w http.ResponseWriter, req *http.Request) {
  14. switch req.Method {
  15. case "GET":
  16. GetHttpHandler(&w, req)
  17. case "POST":
  18. SetHttpHandler(&w, req)
  19. case "DELETE":
  20. DeleteHttpHandler(&w, req)
  21. default:
  22. w.WriteHeader(http.StatusMethodNotAllowed)
  23. return
  24. }
  25. }
  26. //--------------------------------------
  27. // State sensitive handlers
  28. // Set/Delete will dispatch to leader
  29. //--------------------------------------
  30. // Set Command Handler
  31. func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
  32. key := req.URL.Path[len("/v1/keys/"):]
  33. if store.CheckKeyword(key) {
  34. (*w).WriteHeader(http.StatusBadRequest)
  35. (*w).Write(newJsonError(400, "Set"))
  36. return
  37. }
  38. debugf("[recv] POST %v/v1/keys/%s", raftServer.Name(), key)
  39. value := req.FormValue("value")
  40. if len(value) == 0 {
  41. (*w).WriteHeader(http.StatusBadRequest)
  42. (*w).Write(newJsonError(200, "Set"))
  43. return
  44. }
  45. prevValue := req.FormValue("prevValue")
  46. strDuration := req.FormValue("ttl")
  47. expireTime, err := durationToExpireTime(strDuration)
  48. if err != nil {
  49. (*w).WriteHeader(http.StatusBadRequest)
  50. (*w).Write(newJsonError(202, "Set"))
  51. return
  52. }
  53. if len(prevValue) != 0 {
  54. command := &TestAndSetCommand{
  55. Key: key,
  56. Value: value,
  57. PrevValue: prevValue,
  58. ExpireTime: expireTime,
  59. }
  60. dispatch(command, w, req, true)
  61. } else {
  62. command := &SetCommand{
  63. Key: key,
  64. Value: value,
  65. ExpireTime: expireTime,
  66. }
  67. dispatch(command, w, req, true)
  68. }
  69. }
  70. // Delete Handler
  71. func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
  72. key := req.URL.Path[len("/v1/keys/"):]
  73. debugf("[recv] DELETE %v/v1/keys/%s", raftServer.Name(), key)
  74. command := &DeleteCommand{
  75. Key: key,
  76. }
  77. dispatch(command, w, req, true)
  78. }
  79. // Dispatch the command to leader
  80. func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) {
  81. if raftServer.State() == "leader" {
  82. if body, err := raftServer.Do(c); err != nil {
  83. if _, ok := err.(store.NotFoundError); ok {
  84. (*w).WriteHeader(http.StatusNotFound)
  85. (*w).Write(newJsonError(100, err.Error()))
  86. return
  87. }
  88. if _, ok := err.(store.TestFail); ok {
  89. (*w).WriteHeader(http.StatusBadRequest)
  90. (*w).Write(newJsonError(101, err.Error()))
  91. return
  92. }
  93. if _, ok := err.(store.NotFile); ok {
  94. (*w).WriteHeader(http.StatusBadRequest)
  95. (*w).Write(newJsonError(102, err.Error()))
  96. return
  97. }
  98. if err.Error() == errors[103] {
  99. (*w).WriteHeader(http.StatusBadRequest)
  100. (*w).Write(newJsonError(103, ""))
  101. return
  102. }
  103. (*w).WriteHeader(http.StatusInternalServerError)
  104. (*w).Write(newJsonError(300, err.Error()))
  105. return
  106. } else {
  107. if body == nil {
  108. (*w).WriteHeader(http.StatusNotFound)
  109. (*w).Write(newJsonError(300, "Empty result from raft"))
  110. } else {
  111. body, ok := body.([]byte)
  112. // this should not happen
  113. if !ok {
  114. panic("wrong type")
  115. }
  116. (*w).WriteHeader(http.StatusOK)
  117. (*w).Write(body)
  118. }
  119. return
  120. }
  121. } else {
  122. // current no leader
  123. if raftServer.Leader() == "" {
  124. (*w).WriteHeader(http.StatusInternalServerError)
  125. (*w).Write(newJsonError(300, ""))
  126. return
  127. }
  128. // tell the client where is the leader
  129. path := req.URL.Path
  130. var scheme string
  131. if scheme = req.URL.Scheme; scheme == "" {
  132. scheme = "http://"
  133. }
  134. var url string
  135. if client {
  136. clientAddr, _ := getClientAddr(raftServer.Leader())
  137. url = clientAddr + path
  138. } else {
  139. url = raftServer.Leader() + path
  140. }
  141. debugf("Redirect to %s", url)
  142. http.Redirect(*w, req, url, http.StatusTemporaryRedirect)
  143. return
  144. }
  145. (*w).WriteHeader(http.StatusInternalServerError)
  146. (*w).Write(newJsonError(300, ""))
  147. return
  148. }
  149. //--------------------------------------
  150. // State non-sensitive handlers
  151. // will not dispatch to leader
  152. // TODO: add sensitive version for these
  153. // command?
  154. //--------------------------------------
  155. // Handler to return the current leader's raft address
  156. func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
  157. leader := raftServer.Leader()
  158. if leader != "" {
  159. w.WriteHeader(http.StatusOK)
  160. raftURL, _ := nameToRaftURL(leader)
  161. w.Write([]byte(raftURL))
  162. } else {
  163. // not likely, but it may happen
  164. w.WriteHeader(http.StatusInternalServerError)
  165. w.Write(newJsonError(301, ""))
  166. }
  167. }
  168. // Handler to return all the known machines in the current cluster
  169. func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
  170. peers := raftServer.Peers()
  171. // Add itself to the machine list first
  172. // Since peer map does not contain the server itself
  173. machines, _ := getClientAddr(raftServer.Name())
  174. // Add all peers to the list and separate by comma
  175. // We do not use json here since we accept machines list
  176. // in the command line separate by comma.
  177. for peerName, _ := range peers {
  178. if addr, ok := getClientAddr(peerName); ok {
  179. machines = machines + "," + addr
  180. }
  181. }
  182. w.WriteHeader(http.StatusOK)
  183. w.Write([]byte(machines))
  184. }
  185. // Handler to return the current version of etcd
  186. func VersionHttpHandler(w http.ResponseWriter, req *http.Request) {
  187. w.WriteHeader(http.StatusOK)
  188. w.Write([]byte(fmt.Sprintf("etcd %s", releaseVersion)))
  189. }
  190. // Handler to return the basic stats of etcd
  191. func StatsHttpHandler(w http.ResponseWriter, req *http.Request) {
  192. w.WriteHeader(http.StatusOK)
  193. w.Write(etcdStore.Stats())
  194. }
  195. // Get Handler
  196. func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
  197. key := req.URL.Path[len("/v1/keys/"):]
  198. debugf("[recv] GET http://%v/v1/keys/%s", raftServer.Name(), key)
  199. command := &GetCommand{
  200. Key: key,
  201. }
  202. if body, err := command.Apply(raftServer); err != nil {
  203. if _, ok := err.(store.NotFoundError); ok {
  204. (*w).WriteHeader(http.StatusNotFound)
  205. (*w).Write(newJsonError(100, err.Error()))
  206. return
  207. }
  208. (*w).WriteHeader(http.StatusInternalServerError)
  209. (*w).Write(newJsonError(300, ""))
  210. } else {
  211. body, ok := body.([]byte)
  212. if !ok {
  213. panic("wrong type")
  214. }
  215. (*w).WriteHeader(http.StatusOK)
  216. (*w).Write(body)
  217. }
  218. }
  219. // Watch handler
  220. func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
  221. key := req.URL.Path[len("/v1/watch/"):]
  222. command := &WatchCommand{
  223. Key: key,
  224. }
  225. if req.Method == "GET" {
  226. debugf("[recv] GET http://%v/watch/%s", raftServer.Name(), key)
  227. command.SinceIndex = 0
  228. } else if req.Method == "POST" {
  229. // watch from a specific index
  230. debugf("[recv] POST http://%v/watch/%s", raftServer.Name(), key)
  231. content := req.FormValue("index")
  232. sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
  233. if err != nil {
  234. w.WriteHeader(http.StatusBadRequest)
  235. w.Write(newJsonError(203, "Watch From Index"))
  236. }
  237. command.SinceIndex = sinceIndex
  238. } else {
  239. w.WriteHeader(http.StatusMethodNotAllowed)
  240. return
  241. }
  242. if body, err := command.Apply(raftServer); err != nil {
  243. w.WriteHeader(http.StatusInternalServerError)
  244. w.Write(newJsonError(500, key))
  245. } else {
  246. w.WriteHeader(http.StatusOK)
  247. body, ok := body.([]byte)
  248. if !ok {
  249. panic("wrong type")
  250. }
  251. w.Write(body)
  252. }
  253. }
  254. // TestHandler
  255. func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
  256. testType := req.URL.Path[len("/test/"):]
  257. if testType == "speed" {
  258. directSet()
  259. w.WriteHeader(http.StatusOK)
  260. w.Write([]byte("speed test success"))
  261. return
  262. }
  263. w.WriteHeader(http.StatusBadRequest)
  264. }
  265. // Convert string duration to time format
  266. func durationToExpireTime(strDuration string) (time.Time, error) {
  267. if strDuration != "" {
  268. duration, err := strconv.Atoi(strDuration)
  269. if err != nil {
  270. return time.Unix(0, 0), err
  271. }
  272. return time.Now().Add(time.Second * (time.Duration)(duration)), nil
  273. } else {
  274. return time.Unix(0, 0), nil
  275. }
  276. }