handlers.go 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "net/http"
  6. "strconv"
  7. "strings"
  8. etcdErr "github.com/coreos/etcd/error"
  9. "github.com/coreos/etcd/store"
  10. "github.com/coreos/go-raft"
  11. )
  12. //-------------------------------------------------------------------
  13. // Handlers to handle etcd-store related request via etcd url
  14. //-------------------------------------------------------------------
  15. func NewEtcdMuxer() *http.ServeMux {
  16. // external commands
  17. router := mux.NewRouter()
  18. etcdMux.Handle("/v2/keys/", errorHandler(e.Multiplexer))
  19. etcdMux.Handle("/v2/leader", errorHandler(e.LeaderHttpHandler))
  20. etcdMux.Handle("/v2/machines", errorHandler(e.MachinesHttpHandler))
  21. etcdMux.Handle("/v2/stats/", errorHandler(e.StatsHttpHandler))
  22. etcdMux.Handle("/version", errorHandler(e.VersionHttpHandler))
  23. etcdMux.HandleFunc("/test/", TestHttpHandler)
  24. // backward support
  25. etcdMux.Handle("/v1/keys/", errorHandler(e.MultiplexerV1))
  26. etcdMux.Handle("/v1/leader", errorHandler(e.LeaderHttpHandler))
  27. etcdMux.Handle("/v1/machines", errorHandler(e.MachinesHttpHandler))
  28. etcdMux.Handle("/v1/stats/", errorHandler(e.StatsHttpHandler))
  29. return etcdMux
  30. }
  31. type errorHandler func(http.ResponseWriter, *http.Request) error
  32. // addCorsHeader parses the request Origin header and loops through the user
  33. // provided allowed origins and sets the Access-Control-Allow-Origin header if
  34. // there is a match.
  35. func addCorsHeader(w http.ResponseWriter, r *http.Request) {
  36. val, ok := corsList["*"]
  37. if val && ok {
  38. w.Header().Add("Access-Control-Allow-Origin", "*")
  39. return
  40. }
  41. requestOrigin := r.Header.Get("Origin")
  42. val, ok = corsList[requestOrigin]
  43. if val && ok {
  44. w.Header().Add("Access-Control-Allow-Origin", requestOrigin)
  45. return
  46. }
  47. }
  48. func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  49. addCorsHeader(w, r)
  50. if e := fn(w, r); e != nil {
  51. if etcdErr, ok := e.(*etcdErr.Error); ok {
  52. debug("Return error: ", (*etcdErr).Error())
  53. etcdErr.Write(w)
  54. } else {
  55. http.Error(w, e.Error(), http.StatusInternalServerError)
  56. }
  57. }
  58. }
  59. // Multiplex GET/POST/DELETE request to corresponding handlers
  60. func (e *etcdServer) Multiplexer(w http.ResponseWriter, req *http.Request) error {
  61. switch req.Method {
  62. case "GET":
  63. return e.GetHttpHandler(w, req)
  64. case "POST":
  65. return e.CreateHttpHandler(w, req)
  66. case "PUT":
  67. return e.UpdateHttpHandler(w, req)
  68. case "DELETE":
  69. return e.DeleteHttpHandler(w, req)
  70. default:
  71. w.WriteHeader(http.StatusMethodNotAllowed)
  72. return nil
  73. }
  74. return nil
  75. }
  76. //--------------------------------------
  77. // State sensitive handlers
  78. // Set/Delete will dispatch to leader
  79. //--------------------------------------
  80. func (e *etcdServer) CreateHttpHandler(w http.ResponseWriter, req *http.Request) error {
  81. key := getNodePath(req.URL.Path)
  82. debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
  83. value := req.FormValue("value")
  84. expireTime, err := durationToExpireTime(req.FormValue("ttl"))
  85. if err != nil {
  86. return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm)
  87. }
  88. command := &CreateCommand{
  89. Key: key,
  90. Value: value,
  91. ExpireTime: expireTime,
  92. }
  93. if req.FormValue("incremental") == "true" {
  94. command.IncrementalSuffix = true
  95. }
  96. return e.dispatchEtcdCommand(command, w, req)
  97. }
  98. func (e *etcdServer) UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
  99. key := getNodePath(req.URL.Path)
  100. debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
  101. req.ParseForm()
  102. value := req.Form.Get("value")
  103. expireTime, err := durationToExpireTime(req.Form.Get("ttl"))
  104. if err != nil {
  105. return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
  106. }
  107. // update should give at least one option
  108. if value == "" && expireTime.Sub(store.Permanent) == 0 {
  109. return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
  110. }
  111. prevValue, valueOk := req.Form["prevValue"]
  112. prevIndexStr, indexOk := req.Form["prevIndex"]
  113. if !valueOk && !indexOk { // update without test
  114. command := &UpdateCommand{
  115. Key: key,
  116. Value: value,
  117. ExpireTime: expireTime,
  118. }
  119. return e.dispatchEtcdCommand(command, w, req)
  120. } else { // update with test
  121. var prevIndex uint64
  122. if indexOk {
  123. prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64)
  124. // bad previous index
  125. if err != nil {
  126. return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm)
  127. }
  128. } else {
  129. prevIndex = 0
  130. }
  131. command := &TestAndSetCommand{
  132. Key: key,
  133. Value: value,
  134. PrevValue: prevValue[0],
  135. PrevIndex: prevIndex,
  136. }
  137. return e.dispatchEtcdCommand(command, w, req)
  138. }
  139. }
  140. // Delete Handler
  141. func (e *etcdServer) DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
  142. key := getNodePath(req.URL.Path)
  143. debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
  144. command := &DeleteCommand{
  145. Key: key,
  146. }
  147. if req.FormValue("recursive") == "true" {
  148. command.Recursive = true
  149. }
  150. return e.dispatchEtcdCommand(command, w, req)
  151. }
  152. // Dispatch the command to leader
  153. func (e *etcdServer) dispatchEtcdCommand(c Command, w http.ResponseWriter, req *http.Request) error {
  154. return e.raftServer.dispatch(c, w, req, nameToEtcdURL)
  155. }
  156. //--------------------------------------
  157. // State non-sensitive handlers
  158. // command with consistent option will
  159. // still dispatch to the leader
  160. //--------------------------------------
  161. // Handler to return the current leader's raft address
  162. func (e *etcdServer) LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
  163. r := e.raftServer
  164. leader := r.Leader()
  165. if leader != "" {
  166. w.WriteHeader(http.StatusOK)
  167. raftURL, _ := nameToRaftURL(leader)
  168. w.Write([]byte(raftURL))
  169. return nil
  170. } else {
  171. return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm)
  172. }
  173. }
  174. // Handler to return all the known machines in the current cluster
  175. func (e *etcdServer) MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
  176. machines := e.raftServer.getMachines(nameToEtcdURL)
  177. w.WriteHeader(http.StatusOK)
  178. w.Write([]byte(strings.Join(machines, ", ")))
  179. return nil
  180. }
  181. // Handler to return the current version of etcd
  182. func (e *etcdServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
  183. w.WriteHeader(http.StatusOK)
  184. fmt.Fprintf(w, "etcd %s", releaseVersion)
  185. return nil
  186. }
  187. // Handler to return the basic stats of etcd
  188. func (e *etcdServer) StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
  189. option := req.URL.Path[len("/v1/stats/"):]
  190. w.WriteHeader(http.StatusOK)
  191. r := e.raftServer
  192. switch option {
  193. case "self":
  194. w.Write(r.Stats())
  195. case "leader":
  196. if r.State() == raft.Leader {
  197. w.Write(r.PeerStats())
  198. } else {
  199. leader := r.Leader()
  200. // current no leader
  201. if leader == "" {
  202. return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
  203. }
  204. hostname, _ := nameToEtcdURL(leader)
  205. redirect(hostname, w, req)
  206. }
  207. case "store":
  208. w.Write(etcdStore.JsonStats())
  209. }
  210. return nil
  211. }
  212. func (e *etcdServer) GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
  213. var err error
  214. var event interface{}
  215. r := e.raftServer
  216. debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
  217. if req.FormValue("consistent") == "true" && r.State() != raft.Leader {
  218. // help client to redirect the request to the current leader
  219. leader := r.Leader()
  220. hostname, _ := nameToEtcdURL(leader)
  221. redirect(hostname, w, req)
  222. return nil
  223. }
  224. key := getNodePath(req.URL.Path)
  225. recursive := req.FormValue("recursive")
  226. if req.FormValue("wait") == "true" { // watch
  227. command := &WatchCommand{
  228. Key: key,
  229. }
  230. if recursive == "true" {
  231. command.Recursive = true
  232. }
  233. indexStr := req.FormValue("wait_index")
  234. if indexStr != "" {
  235. sinceIndex, err := strconv.ParseUint(indexStr, 10, 64)
  236. if err != nil {
  237. return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm)
  238. }
  239. command.SinceIndex = sinceIndex
  240. }
  241. event, err = command.Apply(r.Server)
  242. } else { //get
  243. command := &GetCommand{
  244. Key: key,
  245. }
  246. sorted := req.FormValue("sorted")
  247. if sorted == "true" {
  248. command.Sorted = true
  249. }
  250. if recursive == "true" {
  251. command.Recursive = true
  252. }
  253. event, err = command.Apply(r.Server)
  254. }
  255. if err != nil {
  256. return err
  257. } else {
  258. event, _ := event.(*store.Event)
  259. bytes, _ := json.Marshal(event)
  260. w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
  261. w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
  262. w.WriteHeader(http.StatusOK)
  263. w.Write(bytes)
  264. return nil
  265. }
  266. }
  267. // TestHandler
  268. func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
  269. testType := req.URL.Path[len("/test/"):]
  270. if testType == "speed" {
  271. directSet()
  272. w.WriteHeader(http.StatusOK)
  273. w.Write([]byte("speed test success"))
  274. return
  275. }
  276. w.WriteHeader(http.StatusBadRequest)
  277. }