etcd_handlers.go 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. package main
  2. import (
  3. "fmt"
  4. etcdErr "github.com/coreos/etcd/error"
  5. "github.com/coreos/etcd/store"
  6. "github.com/coreos/go-raft"
  7. "net/http"
  8. "strconv"
  9. "strings"
  10. )
  11. //-------------------------------------------------------------------
  12. // Handlers to handle etcd-store related request via etcd url
  13. //-------------------------------------------------------------------
  14. func NewEtcdMuxer() *http.ServeMux {
  15. // external commands
  16. etcdMux := http.NewServeMux()
  17. etcdMux.Handle("/"+version+"/keys/", errorHandler(Multiplexer))
  18. etcdMux.Handle("/"+version+"/watch/", errorHandler(WatchHttpHandler))
  19. etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler))
  20. etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler))
  21. etcdMux.Handle("/"+version+"/stats", errorHandler(StatsHttpHandler))
  22. etcdMux.Handle("/version", errorHandler(VersionHttpHandler))
  23. etcdMux.HandleFunc("/test/", TestHttpHandler)
  24. return etcdMux
  25. }
  26. type errorHandler func(http.ResponseWriter, *http.Request) error
  27. // addCorsHeader parses the request Origin header and loops through the user
  28. // provided allowed origins and sets the Access-Control-Allow-Origin header if
  29. // there is a match.
  30. func addCorsHeader(w http.ResponseWriter, r *http.Request) {
  31. val, ok := corsList["*"]
  32. if val && ok {
  33. w.Header().Add("Access-Control-Allow-Origin", "*")
  34. return
  35. }
  36. requestOrigin := r.Header.Get("Origin")
  37. val, ok = corsList[requestOrigin]
  38. if val && ok {
  39. w.Header().Add("Access-Control-Allow-Origin", requestOrigin)
  40. return
  41. }
  42. }
  43. func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  44. addCorsHeader(w, r)
  45. if e := fn(w, r); e != nil {
  46. if etcdErr, ok := e.(etcdErr.Error); ok {
  47. debug("Return error: ", etcdErr.Error())
  48. etcdErr.Write(w)
  49. } else {
  50. http.Error(w, e.Error(), http.StatusInternalServerError)
  51. }
  52. }
  53. }
  54. // Multiplex GET/POST/DELETE request to corresponding handlers
  55. func Multiplexer(w http.ResponseWriter, req *http.Request) error {
  56. switch req.Method {
  57. case "GET":
  58. return GetHttpHandler(w, req)
  59. case "POST":
  60. return SetHttpHandler(w, req)
  61. case "PUT":
  62. return SetHttpHandler(w, req)
  63. case "DELETE":
  64. return DeleteHttpHandler(w, req)
  65. default:
  66. w.WriteHeader(http.StatusMethodNotAllowed)
  67. return nil
  68. }
  69. }
  70. //--------------------------------------
  71. // State sensitive handlers
  72. // Set/Delete will dispatch to leader
  73. //--------------------------------------
  74. // Set Command Handler
  75. func SetHttpHandler(w http.ResponseWriter, req *http.Request) error {
  76. key := req.URL.Path[len("/v1/keys/"):]
  77. if store.CheckKeyword(key) {
  78. return etcdErr.NewError(400, "Set")
  79. }
  80. debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
  81. req.ParseForm()
  82. value := req.Form.Get("value")
  83. if len(value) == 0 {
  84. return etcdErr.NewError(200, "Set")
  85. }
  86. strDuration := req.Form.Get("ttl")
  87. expireTime, err := durationToExpireTime(strDuration)
  88. if err != nil {
  89. return etcdErr.NewError(202, "Set")
  90. }
  91. if prevValueArr, ok := req.Form["prevValue"]; ok && len(prevValueArr) > 0 {
  92. command := &TestAndSetCommand{
  93. Key: key,
  94. Value: value,
  95. PrevValue: prevValueArr[0],
  96. ExpireTime: expireTime,
  97. }
  98. return dispatch(command, w, req, true)
  99. } else {
  100. command := &SetCommand{
  101. Key: key,
  102. Value: value,
  103. ExpireTime: expireTime,
  104. }
  105. return dispatch(command, w, req, true)
  106. }
  107. }
  108. // Delete Handler
  109. func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
  110. key := req.URL.Path[len("/v1/keys/"):]
  111. debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
  112. command := &DeleteCommand{
  113. Key: key,
  114. }
  115. return dispatch(command, w, req, true)
  116. }
  117. // Dispatch the command to leader
  118. func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) error {
  119. if r.State() == raft.Leader {
  120. if body, err := r.Do(c); err != nil {
  121. return err
  122. } else {
  123. if body == nil {
  124. return etcdErr.NewError(300, "Empty result from raft")
  125. } else {
  126. body, _ := body.([]byte)
  127. w.WriteHeader(http.StatusOK)
  128. w.Write(body)
  129. return nil
  130. }
  131. }
  132. } else {
  133. leader := r.Leader()
  134. // current no leader
  135. if leader == "" {
  136. return etcdErr.NewError(300, "")
  137. }
  138. // tell the client where is the leader
  139. path := req.URL.Path
  140. var url string
  141. if etcd {
  142. etcdAddr, _ := nameToEtcdURL(leader)
  143. url = etcdAddr + path
  144. } else {
  145. raftAddr, _ := nameToRaftURL(leader)
  146. url = raftAddr + path
  147. }
  148. debugf("Redirect to %s", url)
  149. http.Redirect(w, req, url, http.StatusTemporaryRedirect)
  150. return nil
  151. }
  152. return etcdErr.NewError(300, "")
  153. }
  154. //--------------------------------------
  155. // State non-sensitive handlers
  156. // will not dispatch to leader
  157. // TODO: add sensitive version for these
  158. // command?
  159. //--------------------------------------
  160. // Handler to return the current leader's raft address
  161. func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
  162. leader := r.Leader()
  163. if leader != "" {
  164. w.WriteHeader(http.StatusOK)
  165. raftURL, _ := nameToRaftURL(leader)
  166. w.Write([]byte(raftURL))
  167. return nil
  168. } else {
  169. return etcdErr.NewError(301, "")
  170. }
  171. }
  172. // Handler to return all the known machines in the current cluster
  173. func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
  174. machines := getMachines(nameToEtcdURL)
  175. w.WriteHeader(http.StatusOK)
  176. w.Write([]byte(strings.Join(machines, ", ")))
  177. return nil
  178. }
  179. // Handler to return the current version of etcd
  180. func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
  181. w.WriteHeader(http.StatusOK)
  182. fmt.Fprintf(w, "etcd %s", releaseVersion)
  183. return nil
  184. }
  185. // Handler to return the basic stats of etcd
  186. func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
  187. w.WriteHeader(http.StatusOK)
  188. w.Write(etcdStore.Stats())
  189. return nil
  190. }
  191. // Get Handler
  192. func GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
  193. key := req.URL.Path[len("/v1/keys/"):]
  194. debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
  195. command := &GetCommand{
  196. Key: key,
  197. }
  198. if body, err := command.Apply(r.Server); err != nil {
  199. return err
  200. } else {
  201. body, _ := body.([]byte)
  202. w.WriteHeader(http.StatusOK)
  203. w.Write(body)
  204. return nil
  205. }
  206. }
  207. // Watch handler
  208. func WatchHttpHandler(w http.ResponseWriter, req *http.Request) error {
  209. key := req.URL.Path[len("/v1/watch/"):]
  210. command := &WatchCommand{
  211. Key: key,
  212. }
  213. if req.Method == "GET" {
  214. debugf("[recv] GET %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
  215. command.SinceIndex = 0
  216. } else if req.Method == "POST" {
  217. // watch from a specific index
  218. debugf("[recv] POST %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
  219. content := req.FormValue("index")
  220. sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
  221. if err != nil {
  222. return etcdErr.NewError(203, "Watch From Index")
  223. }
  224. command.SinceIndex = sinceIndex
  225. } else {
  226. w.WriteHeader(http.StatusMethodNotAllowed)
  227. return nil
  228. }
  229. if body, err := command.Apply(r.Server); err != nil {
  230. return etcdErr.NewError(500, key)
  231. } else {
  232. w.WriteHeader(http.StatusOK)
  233. body, _ := body.([]byte)
  234. w.Write(body)
  235. return nil
  236. }
  237. }
  238. // TestHandler
  239. func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
  240. testType := req.URL.Path[len("/test/"):]
  241. if testType == "speed" {
  242. directSet()
  243. w.WriteHeader(http.StatusOK)
  244. w.Write([]byte("speed test success"))
  245. return
  246. }
  247. w.WriteHeader(http.StatusBadRequest)
  248. }