etcd_handlers.go 8.0 KB


  1. /*
  2. Copyright 2013 CoreOS Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package main
  14. import (
  15. "fmt"
  16. "net/http"
  17. "strconv"
  18. "strings"
  19. etcdErr "github.com/coreos/etcd/error"
  20. "github.com/coreos/etcd/store"
  21. "github.com/coreos/etcd/mod"
  22. "github.com/coreos/go-raft"
  23. )
  24. //-------------------------------------------------------------------
  25. // Handlers to handle etcd-store related request via etcd url
  26. //-------------------------------------------------------------------
  27. func NewEtcdMuxer() *http.ServeMux {
  28. // external commands
  29. etcdMux := http.NewServeMux()
  30. etcdMux.Handle("/"+version+"/keys/", errorHandler(Multiplexer))
  31. etcdMux.Handle("/"+version+"/watch/", errorHandler(WatchHttpHandler))
  32. etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler))
  33. etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler))
  34. etcdMux.Handle("/"+version+"/stats/", errorHandler(StatsHttpHandler))
  35. etcdMux.Handle("/version", errorHandler(VersionHttpHandler))
  36. etcdMux.HandleFunc("/test/", TestHttpHandler)
  37. // TODO: Use a mux in 0.2 that can handle this
  38. etcdMux.Handle("/etcd/mod/dashboard/", *mod.ServeMux)
  39. return etcdMux
  40. }
  41. type errorHandler func(http.ResponseWriter, *http.Request) error
  42. // addCorsHeader parses the request Origin header and loops through the user
  43. // provided allowed origins and sets the Access-Control-Allow-Origin header if
  44. // there is a match.
  45. func addCorsHeader(w http.ResponseWriter, r *http.Request) {
  46. val, ok := corsList["*"]
  47. if val && ok {
  48. w.Header().Add("Access-Control-Allow-Origin", "*")
  49. return
  50. }
  51. requestOrigin := r.Header.Get("Origin")
  52. val, ok = corsList[requestOrigin]
  53. if val && ok {
  54. w.Header().Add("Access-Control-Allow-Origin", requestOrigin)
  55. return
  56. }
  57. }
  58. func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  59. addCorsHeader(w, r)
  60. if e := fn(w, r); e != nil {
  61. if etcdErr, ok := e.(etcdErr.Error); ok {
  62. debug("Return error: ", etcdErr.Error())
  63. etcdErr.Write(w)
  64. } else {
  65. http.Error(w, e.Error(), http.StatusInternalServerError)
  66. }
  67. }
  68. }
  69. // Multiplex GET/POST/DELETE request to corresponding handlers
  70. func Multiplexer(w http.ResponseWriter, req *http.Request) error {
  71. switch req.Method {
  72. case "GET":
  73. return GetHttpHandler(w, req)
  74. case "POST":
  75. return SetHttpHandler(w, req)
  76. case "PUT":
  77. return SetHttpHandler(w, req)
  78. case "DELETE":
  79. return DeleteHttpHandler(w, req)
  80. default:
  81. w.WriteHeader(http.StatusMethodNotAllowed)
  82. return nil
  83. }
  84. }
  85. //--------------------------------------
  86. // State sensitive handlers
  87. // Set/Delete will dispatch to leader
  88. //--------------------------------------
  89. // Set Command Handler
  90. func SetHttpHandler(w http.ResponseWriter, req *http.Request) error {
  91. key := req.URL.Path[len("/v1/keys/"):]
  92. if store.CheckKeyword(key) {
  93. return etcdErr.NewError(400, "Set")
  94. }
  95. debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
  96. req.ParseForm()
  97. value := req.Form.Get("value")
  98. if len(value) == 0 {
  99. return etcdErr.NewError(200, "Set")
  100. }
  101. strDuration := req.Form.Get("ttl")
  102. expireTime, err := durationToExpireTime(strDuration)
  103. if err != nil {
  104. return etcdErr.NewError(202, "Set")
  105. }
  106. if prevValueArr, ok := req.Form["prevValue"]; ok && len(prevValueArr) > 0 {
  107. command := &TestAndSetCommand{
  108. Key: key,
  109. Value: value,
  110. PrevValue: prevValueArr[0],
  111. ExpireTime: expireTime,
  112. }
  113. return dispatch(command, w, req, true)
  114. } else {
  115. command := &SetCommand{
  116. Key: key,
  117. Value: value,
  118. ExpireTime: expireTime,
  119. }
  120. return dispatch(command, w, req, true)
  121. }
  122. }
  123. // Delete Handler
  124. func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
  125. key := req.URL.Path[len("/v1/keys/"):]
  126. debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
  127. command := &DeleteCommand{
  128. Key: key,
  129. }
  130. return dispatch(command, w, req, true)
  131. }
  132. // Dispatch the command to leader
  133. func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) error {
  134. if r.State() == raft.Leader {
  135. if body, err := r.Do(c); err != nil {
  136. return err
  137. } else {
  138. if body == nil {
  139. return etcdErr.NewError(300, "Empty result from raft")
  140. } else {
  141. body, _ := body.([]byte)
  142. w.WriteHeader(http.StatusOK)
  143. w.Write(body)
  144. return nil
  145. }
  146. }
  147. } else {
  148. leader := r.Leader()
  149. // current no leader
  150. if leader == "" {
  151. return etcdErr.NewError(300, "")
  152. }
  153. redirect(leader, etcd, w, req)
  154. return nil
  155. }
  156. return etcdErr.NewError(300, "")
  157. }
  158. //--------------------------------------
  159. // State non-sensitive handlers
  160. // will not dispatch to leader
  161. // TODO: add sensitive version for these
  162. // command?
  163. //--------------------------------------
  164. // Handler to return the current leader's raft address
  165. func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
  166. leader := r.Leader()
  167. if leader != "" {
  168. w.WriteHeader(http.StatusOK)
  169. raftURL, _ := nameToRaftURL(leader)
  170. w.Write([]byte(raftURL))
  171. return nil
  172. } else {
  173. return etcdErr.NewError(301, "")
  174. }
  175. }
  176. // Handler to return all the known machines in the current cluster
  177. func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
  178. machines := getMachines(nameToEtcdURL)
  179. w.WriteHeader(http.StatusOK)
  180. w.Write([]byte(strings.Join(machines, ", ")))
  181. return nil
  182. }
  183. // Handler to return the current version of etcd
  184. func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
  185. w.WriteHeader(http.StatusOK)
  186. fmt.Fprintf(w, "etcd %s", releaseVersion)
  187. return nil
  188. }
  189. // Handler to return the basic stats of etcd
  190. func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
  191. option := req.URL.Path[len("/v1/stats/"):]
  192. switch option {
  193. case "self":
  194. w.WriteHeader(http.StatusOK)
  195. w.Write(r.Stats())
  196. case "leader":
  197. if r.State() == raft.Leader {
  198. w.Write(r.PeerStats())
  199. } else {
  200. leader := r.Leader()
  201. // current no leader
  202. if leader == "" {
  203. return etcdErr.NewError(300, "")
  204. }
  205. redirect(leader, true, w, req)
  206. }
  207. case "store":
  208. w.WriteHeader(http.StatusOK)
  209. w.Write(etcdStore.Stats())
  210. }
  211. return nil
  212. }
  213. // Get Handler
  214. func GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
  215. key := req.URL.Path[len("/v1/keys/"):]
  216. debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
  217. command := &GetCommand{
  218. Key: key,
  219. }
  220. if body, err := command.Apply(r.Server); err != nil {
  221. return err
  222. } else {
  223. body, _ := body.([]byte)
  224. w.WriteHeader(http.StatusOK)
  225. w.Write(body)
  226. return nil
  227. }
  228. }
  229. // Watch handler
  230. func WatchHttpHandler(w http.ResponseWriter, req *http.Request) error {
  231. key := req.URL.Path[len("/v1/watch/"):]
  232. command := &WatchCommand{
  233. Key: key,
  234. }
  235. if req.Method == "GET" {
  236. debugf("[recv] GET %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
  237. command.SinceIndex = 0
  238. } else if req.Method == "POST" {
  239. // watch from a specific index
  240. debugf("[recv] POST %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
  241. content := req.FormValue("index")
  242. sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
  243. if err != nil {
  244. return etcdErr.NewError(203, "Watch From Index")
  245. }
  246. command.SinceIndex = sinceIndex
  247. } else {
  248. w.WriteHeader(http.StatusMethodNotAllowed)
  249. return nil
  250. }
  251. if body, err := command.Apply(r.Server); err != nil {
  252. return etcdErr.NewError(500, key)
  253. } else {
  254. w.WriteHeader(http.StatusOK)
  255. body, _ := body.([]byte)
  256. w.Write(body)
  257. return nil
  258. }
  259. }
  260. // TestHandler
  261. func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
  262. testType := req.URL.Path[len("/test/"):]
  263. if testType == "speed" {
  264. directSet()
  265. w.WriteHeader(http.StatusOK)
  266. w.Write([]byte("speed test success"))
  267. return
  268. }
  269. w.WriteHeader(http.StatusBadRequest)
  270. }