etcd_handlers.go 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package main
  2. import (
  3. "fmt"
  4. etcdErr "github.com/coreos/etcd/error"
  5. "github.com/coreos/etcd/store"
  6. "github.com/coreos/go-raft"
  7. "net/http"
  8. "strconv"
  9. "strings"
  10. )
  11. //-------------------------------------------------------------------
  12. // Handlers to handle etcd-store related request via etcd url
  13. //-------------------------------------------------------------------
  14. func NewEtcdMuxer() *http.ServeMux {
  15. // external commands
  16. etcdMux := http.NewServeMux()
  17. etcdMux.Handle("/"+version+"/keys/", errorHandler(Multiplexer))
  18. etcdMux.Handle("/"+version+"/watch/", errorHandler(WatchHttpHandler))
  19. etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler))
  20. etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler))
  21. etcdMux.Handle("/"+version+"/stats", errorHandler(StatsHttpHandler))
  22. etcdMux.Handle("/version", errorHandler(VersionHttpHandler))
  23. etcdMux.HandleFunc("/test/", TestHttpHandler)
  24. return etcdMux
  25. }
  26. type errorHandler func(http.ResponseWriter, *http.Request) error
  27. func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  28. if e := fn(w, r); e != nil {
  29. if etcdErr, ok := e.(etcdErr.Error); ok {
  30. debug("Return error: ", etcdErr.Error())
  31. etcdErr.Write(w)
  32. } else {
  33. http.Error(w, e.Error(), http.StatusInternalServerError)
  34. }
  35. }
  36. }
  37. // Multiplex GET/POST/DELETE request to corresponding handlers
  38. func Multiplexer(w http.ResponseWriter, req *http.Request) error {
  39. switch req.Method {
  40. case "GET":
  41. return GetHttpHandler(w, req)
  42. case "POST":
  43. return SetHttpHandler(w, req)
  44. case "PUT":
  45. return SetHttpHandler(w, req)
  46. case "DELETE":
  47. return DeleteHttpHandler(w, req)
  48. default:
  49. w.WriteHeader(http.StatusMethodNotAllowed)
  50. return nil
  51. }
  52. }
  53. //--------------------------------------
  54. // State sensitive handlers
  55. // Set/Delete will dispatch to leader
  56. //--------------------------------------
  57. // Set Command Handler
  58. func SetHttpHandler(w http.ResponseWriter, req *http.Request) error {
  59. key := req.URL.Path[len("/v1/keys/"):]
  60. if store.CheckKeyword(key) {
  61. return etcdErr.NewError(400, "Set")
  62. }
  63. debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
  64. value := req.FormValue("value")
  65. if len(value) == 0 {
  66. return etcdErr.NewError(200, "Set")
  67. }
  68. prevValue := req.FormValue("prevValue")
  69. strDuration := req.FormValue("ttl")
  70. expireTime, err := durationToExpireTime(strDuration)
  71. if err != nil {
  72. return etcdErr.NewError(202, "Set")
  73. }
  74. if len(prevValue) != 0 {
  75. command := &TestAndSetCommand{
  76. Key: key,
  77. Value: value,
  78. PrevValue: prevValue,
  79. ExpireTime: expireTime,
  80. }
  81. return dispatch(command, w, req, true)
  82. } else {
  83. command := &SetCommand{
  84. Key: key,
  85. Value: value,
  86. ExpireTime: expireTime,
  87. }
  88. return dispatch(command, w, req, true)
  89. }
  90. }
  91. // Delete Handler
  92. func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
  93. key := req.URL.Path[len("/v1/keys/"):]
  94. debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
  95. command := &DeleteCommand{
  96. Key: key,
  97. }
  98. return dispatch(command, w, req, true)
  99. }
  100. // Dispatch the command to leader
  101. func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) error {
  102. if r.State() == raft.Leader {
  103. if body, err := r.Do(c); err != nil {
  104. return err
  105. } else {
  106. if body == nil {
  107. return etcdErr.NewError(300, "Empty result from raft")
  108. } else {
  109. body, _ := body.([]byte)
  110. w.WriteHeader(http.StatusOK)
  111. w.Write(body)
  112. return nil
  113. }
  114. }
  115. } else {
  116. leader := r.Leader()
  117. // current no leader
  118. if leader == "" {
  119. return etcdErr.NewError(300, "")
  120. }
  121. // tell the client where is the leader
  122. path := req.URL.Path
  123. var url string
  124. if etcd {
  125. etcdAddr, _ := nameToEtcdURL(leader)
  126. url = etcdAddr + path
  127. } else {
  128. raftAddr, _ := nameToRaftURL(leader)
  129. url = raftAddr + path
  130. }
  131. debugf("Redirect to %s", url)
  132. http.Redirect(w, req, url, http.StatusTemporaryRedirect)
  133. return nil
  134. }
  135. return etcdErr.NewError(300, "")
  136. }
  137. //--------------------------------------
  138. // State non-sensitive handlers
  139. // will not dispatch to leader
  140. // TODO: add sensitive version for these
  141. // command?
  142. //--------------------------------------
  143. // Handler to return the current leader's raft address
  144. // func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
  145. // leader := r.Leader()
  146. // if leader != "" {
  147. // w.WriteHeader(http.StatusOK)
  148. // raftURL, _ := nameToRaftURL(leader)
  149. // w.Write([]byte(raftURL))
  150. // return nil
  151. // } else {
  152. // return etcdErr.NewError(301, "")
  153. // }
  154. // }
  155. // Handler to return all the known machines in the current cluster
  156. func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
  157. machines := getMachines(nameToEtcdURL)
  158. w.WriteHeader(http.StatusOK)
  159. w.Write([]byte(strings.Join(machines, ", ")))
  160. return nil
  161. }
  162. // Handler to return the current version of etcd
  163. func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
  164. w.WriteHeader(http.StatusOK)
  165. fmt.Fprintf(w, "etcd %s", releaseVersion)
  166. return nil
  167. }
  168. // Handler to return the basic stats of etcd
  169. func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
  170. w.WriteHeader(http.StatusOK)
  171. w.Write(etcdStore.Stats())
  172. return nil
  173. }
  174. // Get Handler
  175. func GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
  176. key := req.URL.Path[len("/v1/keys/"):]
  177. debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
  178. command := &GetCommand{
  179. Key: key,
  180. }
  181. if body, err := command.Apply(r.Server); err != nil {
  182. return err
  183. } else {
  184. body, _ := body.([]byte)
  185. w.WriteHeader(http.StatusOK)
  186. w.Write(body)
  187. return nil
  188. }
  189. }
  190. // Watch handler
  191. func WatchHttpHandler(w http.ResponseWriter, req *http.Request) error {
  192. key := req.URL.Path[len("/v1/watch/"):]
  193. command := &WatchCommand{
  194. Key: key,
  195. }
  196. if req.Method == "GET" {
  197. debugf("[recv] GET %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
  198. command.SinceIndex = 0
  199. } else if req.Method == "POST" {
  200. // watch from a specific index
  201. debugf("[recv] POST %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
  202. content := req.FormValue("index")
  203. sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
  204. if err != nil {
  205. return etcdErr.NewError(203, "Watch From Index")
  206. }
  207. command.SinceIndex = sinceIndex
  208. } else {
  209. w.WriteHeader(http.StatusMethodNotAllowed)
  210. return nil
  211. }
  212. if body, err := command.Apply(r.Server); err != nil {
  213. return etcdErr.NewError(500, key)
  214. } else {
  215. w.WriteHeader(http.StatusOK)
  216. body, _ := body.([]byte)
  217. w.Write(body)
  218. return nil
  219. }
  220. }
  221. // TestHandler
  222. func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
  223. testType := req.URL.Path[len("/test/"):]
  224. if testType == "speed" {
  225. directSet()
  226. w.WriteHeader(http.StatusOK)
  227. w.Write([]byte("speed test success"))
  228. return
  229. }
  230. w.WriteHeader(http.StatusBadRequest)
  231. }