etcd_handlers.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. package main
  2. import (
  3. "fmt"
  4. "net/http"
  5. "strconv"
  6. "strings"
  7. etcdErr "github.com/coreos/etcd/error"
  8. "github.com/coreos/etcd/store"
  9. "github.com/coreos/go-raft"
  10. )
  11. //-------------------------------------------------------------------
  12. // Handlers to handle etcd-store related request via etcd url
  13. //-------------------------------------------------------------------
  14. func NewEtcdMuxer() *http.ServeMux {
  15. // external commands
  16. etcdMux := http.NewServeMux()
  17. etcdMux.Handle("/"+version+"/keys/", errorHandler(Multiplexer))
  18. etcdMux.Handle("/"+version+"/watch/", errorHandler(WatchHttpHandler))
  19. etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler))
  20. etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler))
  21. etcdMux.Handle("/"+version+"/stats", errorHandler(StatsHttpHandler))
  22. etcdMux.Handle("/version", errorHandler(VersionHttpHandler))
  23. etcdMux.HandleFunc("/test/", TestHttpHandler)
  24. return etcdMux
  25. }
  26. type errorHandler func(http.ResponseWriter, *http.Request) error
  27. func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  28. if e := fn(w, r); e != nil {
  29. if etcdErr, ok := e.(etcdErr.Error); ok {
  30. debug("Return error: ", etcdErr.Error())
  31. etcdErr.Write(w)
  32. } else {
  33. http.Error(w, e.Error(), http.StatusInternalServerError)
  34. }
  35. }
  36. }
  37. // Multiplex GET/POST/DELETE request to corresponding handlers
  38. func Multiplexer(w http.ResponseWriter, req *http.Request) error {
  39. switch req.Method {
  40. case "GET":
  41. return GetHttpHandler(w, req)
  42. case "POST":
  43. return SetHttpHandler(w, req)
  44. case "DELETE":
  45. return DeleteHttpHandler(w, req)
  46. default:
  47. w.WriteHeader(http.StatusMethodNotAllowed)
  48. return nil
  49. }
  50. }
  51. //--------------------------------------
  52. // State sensitive handlers
  53. // Set/Delete will dispatch to leader
  54. //--------------------------------------
  55. // Set Command Handler
  56. func SetHttpHandler(w http.ResponseWriter, req *http.Request) error {
  57. key := req.URL.Path[len("/v1/keys/"):]
  58. if store.CheckKeyword(key) {
  59. return etcdErr.NewError(400, "Set")
  60. }
  61. debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
  62. value := req.FormValue("value")
  63. if len(value) == 0 {
  64. return etcdErr.NewError(200, "Set")
  65. }
  66. prevValue := req.FormValue("prevValue")
  67. strDuration := req.FormValue("ttl")
  68. expireTime, err := durationToExpireTime(strDuration)
  69. if err != nil {
  70. return etcdErr.NewError(202, "Set")
  71. }
  72. if len(prevValue) != 0 {
  73. command := &TestAndSetCommand{
  74. Key: key,
  75. Value: value,
  76. PrevValue: prevValue,
  77. ExpireTime: expireTime,
  78. }
  79. return dispatch(command, w, req, true)
  80. } else {
  81. command := &SetCommand{
  82. Key: key,
  83. Value: value,
  84. ExpireTime: expireTime,
  85. }
  86. return dispatch(command, w, req, true)
  87. }
  88. }
  89. // Delete Handler
  90. func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
  91. key := req.URL.Path[len("/v1/keys/"):]
  92. debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
  93. command := &DeleteCommand{
  94. Key: key,
  95. }
  96. return dispatch(command, w, req, true)
  97. }
  98. // Dispatch the command to leader
  99. func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) error {
  100. if r.State() == raft.Leader {
  101. if body, err := r.Do(c); err != nil {
  102. return err
  103. } else {
  104. if body == nil {
  105. return etcdErr.NewError(300, "Empty result from raft")
  106. } else {
  107. body, _ := body.([]byte)
  108. w.WriteHeader(http.StatusOK)
  109. w.Write(body)
  110. return nil
  111. }
  112. }
  113. } else {
  114. leader := r.Leader()
  115. // current no leader
  116. if leader == "" {
  117. return etcdErr.NewError(300, "")
  118. }
  119. // tell the client where is the leader
  120. path := req.URL.Path
  121. var url string
  122. if etcd {
  123. etcdAddr, _ := nameToEtcdURL(leader)
  124. url = etcdAddr + path
  125. } else {
  126. raftAddr, _ := nameToRaftURL(leader)
  127. url = raftAddr + path
  128. }
  129. debugf("Redirect to %s", url)
  130. http.Redirect(w, req, url, http.StatusTemporaryRedirect)
  131. return nil
  132. }
  133. return etcdErr.NewError(300, "")
  134. }
  135. //--------------------------------------
  136. // State non-sensitive handlers
  137. // will not dispatch to leader
  138. // TODO: add sensitive version for these
  139. // command?
  140. //--------------------------------------
  141. // Handler to return the current leader's raft address
  142. func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
  143. leader := r.Leader()
  144. if leader != "" {
  145. w.WriteHeader(http.StatusOK)
  146. raftURL, _ := nameToRaftURL(leader)
  147. w.Write([]byte(raftURL))
  148. return nil
  149. } else {
  150. return etcdErr.NewError(301, "")
  151. }
  152. }
  153. // Handler to return all the known machines in the current cluster
  154. func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
  155. machines := getMachines(nameToEtcdURL)
  156. w.WriteHeader(http.StatusOK)
  157. w.Write([]byte(strings.Join(machines, ", ")))
  158. return nil
  159. }
  160. // Handler to return the current version of etcd
  161. func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
  162. w.WriteHeader(http.StatusOK)
  163. fmt.Fprintf(w, "etcd %s", releaseVersion)
  164. return nil
  165. }
  166. // Handler to return the basic stats of etcd
  167. func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
  168. w.WriteHeader(http.StatusOK)
  169. w.Write(etcdStore.Stats())
  170. w.Write(r.Stats())
  171. return nil
  172. }
  173. // Get Handler
  174. func GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
  175. key := req.URL.Path[len("/v1/keys/"):]
  176. debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
  177. command := &GetCommand{
  178. Key: key,
  179. }
  180. if body, err := command.Apply(r.Server); err != nil {
  181. return err
  182. } else {
  183. body, _ := body.([]byte)
  184. w.WriteHeader(http.StatusOK)
  185. w.Write(body)
  186. return nil
  187. }
  188. }
  189. // Watch handler
  190. func WatchHttpHandler(w http.ResponseWriter, req *http.Request) error {
  191. key := req.URL.Path[len("/v1/watch/"):]
  192. command := &WatchCommand{
  193. Key: key,
  194. }
  195. if req.Method == "GET" {
  196. debugf("[recv] GET %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
  197. command.SinceIndex = 0
  198. } else if req.Method == "POST" {
  199. // watch from a specific index
  200. debugf("[recv] POST %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
  201. content := req.FormValue("index")
  202. sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
  203. if err != nil {
  204. return etcdErr.NewError(203, "Watch From Index")
  205. }
  206. command.SinceIndex = sinceIndex
  207. } else {
  208. w.WriteHeader(http.StatusMethodNotAllowed)
  209. return nil
  210. }
  211. if body, err := command.Apply(r.Server); err != nil {
  212. return etcdErr.NewError(500, key)
  213. } else {
  214. w.WriteHeader(http.StatusOK)
  215. body, _ := body.([]byte)
  216. w.Write(body)
  217. return nil
  218. }
  219. }
  220. // TestHandler
  221. func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
  222. testType := req.URL.Path[len("/test/"):]
  223. if testType == "speed" {
  224. directSet()
  225. w.WriteHeader(http.StatusOK)
  226. w.Write([]byte("speed test success"))
  227. return
  228. }
  229. w.WriteHeader(http.StatusBadRequest)
  230. }