http.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. package etcdhttp
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. "net/http"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/coreos/etcd/elog"
  14. etcdErr "github.com/coreos/etcd/error"
  15. "github.com/coreos/etcd/etcdserver"
  16. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  17. "github.com/coreos/etcd/raft/raftpb"
  18. "github.com/coreos/etcd/store"
  19. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  20. )
  21. const (
  22. keysPrefix = "/v2/keys"
  23. machinesPrefix = "/v2/machines"
  24. raftPrefix = "/raft"
  25. DefaultTimeout = 500 * time.Millisecond
  26. )
  27. var errClosed = errors.New("etcdhttp: client closed connection")
  28. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  29. func NewClientHandler(server etcdserver.Server, peers Peers, timeout time.Duration) http.Handler {
  30. sh := &serverHandler{
  31. server: server,
  32. peers: peers,
  33. timeout: timeout,
  34. }
  35. if sh.timeout == 0 {
  36. sh.timeout = DefaultTimeout
  37. }
  38. mux := http.NewServeMux()
  39. mux.HandleFunc(keysPrefix, sh.serveKeys)
  40. mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
  41. // TODO: dynamic configuration may make this outdated. take care of it.
  42. // TODO: dynamic configuration may introduce race also.
  43. mux.HandleFunc(machinesPrefix, sh.serveMachines)
  44. mux.HandleFunc("/", http.NotFound)
  45. return mux
  46. }
  47. // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
  48. func NewPeerHandler(server etcdserver.Server) http.Handler {
  49. sh := &serverHandler{
  50. server: server,
  51. }
  52. mux := http.NewServeMux()
  53. mux.HandleFunc(raftPrefix, sh.serveRaft)
  54. mux.HandleFunc("/", http.NotFound)
  55. return mux
  56. }
  57. // serverHandler provides http.Handlers for etcd client and raft communication.
  58. type serverHandler struct {
  59. timeout time.Duration
  60. server etcdserver.Server
  61. peers Peers
  62. }
  63. func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
  64. if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
  65. return
  66. }
  67. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  68. defer cancel()
  69. rr, err := parseRequest(r, etcdserver.GenID())
  70. if err != nil {
  71. writeError(w, err)
  72. return
  73. }
  74. resp, err := h.server.Do(ctx, rr)
  75. if err != nil {
  76. writeError(w, err)
  77. return
  78. }
  79. var ev *store.Event
  80. switch {
  81. case resp.Event != nil:
  82. ev = resp.Event
  83. case resp.Watcher != nil:
  84. if ev, err = waitForEvent(ctx, w, resp.Watcher); err != nil {
  85. http.Error(w, err.Error(), http.StatusGatewayTimeout)
  86. return
  87. }
  88. default:
  89. writeError(w, errors.New("received response with no Event/Watcher!"))
  90. return
  91. }
  92. if err = writeEvent(w, ev); err != nil {
  93. // Should never be reached
  94. log.Println("error writing event: %v", err)
  95. }
  96. }
  97. // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
  98. // TODO: rethink the format of machine list because it is not json format.
  99. func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
  100. if !allowMethod(w, r.Method, "GET", "HEAD") {
  101. return
  102. }
  103. endpoints := h.peers.Endpoints()
  104. w.Write([]byte(strings.Join(endpoints, ", ")))
  105. }
  106. func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
  107. if !allowMethod(w, r.Method, "POST") {
  108. return
  109. }
  110. b, err := ioutil.ReadAll(r.Body)
  111. if err != nil {
  112. log.Println("etcdhttp: error reading raft message:", err)
  113. http.Error(w, "error reading raft message", http.StatusBadRequest)
  114. return
  115. }
  116. var m raftpb.Message
  117. if err := m.Unmarshal(b); err != nil {
  118. log.Println("etcdhttp: error unmarshaling raft message:", err)
  119. http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
  120. return
  121. }
  122. log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
  123. if err := h.server.Process(context.TODO(), m); err != nil {
  124. log.Println("etcdhttp: error processing raft message:", err)
  125. writeError(w, err)
  126. return
  127. }
  128. w.WriteHeader(http.StatusNoContent)
  129. }
  130. // parseRequest converts a received http.Request to a server Request,
  131. // performing validation of supplied fields as appropriate.
  132. // If any validation fails, an empty Request and non-nil error is returned.
  133. func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
  134. emptyReq := etcdserverpb.Request{}
  135. err := r.ParseForm()
  136. if err != nil {
  137. return emptyReq, etcdErr.NewRequestError(
  138. etcdErr.EcodeInvalidForm,
  139. err.Error(),
  140. )
  141. }
  142. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  143. return emptyReq, etcdErr.NewRequestError(
  144. etcdErr.EcodeInvalidForm,
  145. "incorrect key prefix",
  146. )
  147. }
  148. p := r.URL.Path[len(keysPrefix):]
  149. var pIdx, wIdx, ttl uint64
  150. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  151. return emptyReq, etcdErr.NewRequestError(
  152. etcdErr.EcodeIndexNaN,
  153. `invalid value for "prevIndex"`,
  154. )
  155. }
  156. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  157. return emptyReq, etcdErr.NewRequestError(
  158. etcdErr.EcodeIndexNaN,
  159. `invalid value for "waitIndex"`,
  160. )
  161. }
  162. if ttl, err = getUint64(r.Form, "ttl"); err != nil {
  163. return emptyReq, etcdErr.NewRequestError(
  164. etcdErr.EcodeTTLNaN,
  165. `invalid value for "ttl"`,
  166. )
  167. }
  168. var rec, sort, wait bool
  169. if rec, err = getBool(r.Form, "recursive"); err != nil {
  170. return emptyReq, etcdErr.NewRequestError(
  171. etcdErr.EcodeInvalidField,
  172. `invalid value for "recursive"`,
  173. )
  174. }
  175. if sort, err = getBool(r.Form, "sorted"); err != nil {
  176. return emptyReq, etcdErr.NewRequestError(
  177. etcdErr.EcodeInvalidField,
  178. `invalid value for "sorted"`,
  179. )
  180. }
  181. if wait, err = getBool(r.Form, "wait"); err != nil {
  182. return emptyReq, etcdErr.NewRequestError(
  183. etcdErr.EcodeInvalidField,
  184. `invalid value for "wait"`,
  185. )
  186. }
  187. // prevExist is nullable, so leave it null if not specified
  188. var pe *bool
  189. if _, ok := r.Form["prevExist"]; ok {
  190. bv, err := getBool(r.Form, "prevExist")
  191. if err != nil {
  192. return emptyReq, etcdErr.NewRequestError(
  193. etcdErr.EcodeInvalidField,
  194. "invalid value for prevExist",
  195. )
  196. }
  197. pe = &bv
  198. }
  199. rr := etcdserverpb.Request{
  200. Id: id,
  201. Method: r.Method,
  202. Path: p,
  203. Val: r.FormValue("value"),
  204. PrevValue: r.FormValue("prevValue"),
  205. PrevIndex: pIdx,
  206. PrevExist: pe,
  207. Recursive: rec,
  208. Since: wIdx,
  209. Sorted: sort,
  210. Wait: wait,
  211. }
  212. if pe != nil {
  213. rr.PrevExist = pe
  214. }
  215. // TODO(jonboulle): use fake clock instead of time module
  216. // https://github.com/coreos/etcd/issues/1021
  217. if ttl > 0 {
  218. expr := time.Duration(ttl) * time.Second
  219. rr.Expiration = time.Now().Add(expr).UnixNano()
  220. }
  221. return rr, nil
  222. }
  223. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  224. // not exist in the form, 0 is returned. If the key exists but the value is
  225. // badly formed, an error is returned. If multiple values are present only the
  226. // first is considered.
  227. func getUint64(form url.Values, key string) (i uint64, err error) {
  228. if vals, ok := form[key]; ok {
  229. i, err = strconv.ParseUint(vals[0], 10, 64)
  230. }
  231. return
  232. }
  233. // getBool extracts a bool by the given key from a Form. If the key does not
  234. // exist in the form, false is returned. If the key exists but the value is
  235. // badly formed, an error is returned. If multiple values are present only the
  236. // first is considered.
  237. func getBool(form url.Values, key string) (b bool, err error) {
  238. if vals, ok := form[key]; ok {
  239. b, err = strconv.ParseBool(vals[0])
  240. }
  241. return
  242. }
  243. // writeError logs and writes the given Error to the ResponseWriter
  244. // If Error is an etcdErr, it is rendered to the ResponseWriter
  245. // Otherwise, it is assumed to be an InternalServerError
  246. func writeError(w http.ResponseWriter, err error) {
  247. if err == nil {
  248. return
  249. }
  250. log.Println(err)
  251. if e, ok := err.(*etcdErr.Error); ok {
  252. e.Write(w)
  253. } else {
  254. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  255. }
  256. }
  257. // writeEvent serializes the given Event and writes the resulting JSON to the
  258. // given ResponseWriter
  259. func writeEvent(w http.ResponseWriter, ev *store.Event) error {
  260. if ev == nil {
  261. return errors.New("cannot write empty Event!")
  262. }
  263. w.Header().Set("Content-Type", "application/json")
  264. w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.Index()))
  265. if ev.IsCreated() {
  266. w.WriteHeader(http.StatusCreated)
  267. }
  268. return json.NewEncoder(w).Encode(ev)
  269. }
  270. // waitForEvent waits for a given Watcher to return its associated
  271. // event. It returns a non-nil error if the given Context times out
  272. // or the given ResponseWriter triggers a CloseNotify.
  273. func waitForEvent(ctx context.Context, w http.ResponseWriter, wa store.Watcher) (*store.Event, error) {
  274. // TODO(bmizerany): support streaming?
  275. defer wa.Remove()
  276. var nch <-chan bool
  277. if x, ok := w.(http.CloseNotifier); ok {
  278. nch = x.CloseNotify()
  279. }
  280. select {
  281. case ev := <-wa.EventChan():
  282. return ev, nil
  283. case <-nch:
  284. elog.TODO()
  285. return nil, errClosed
  286. case <-ctx.Done():
  287. return nil, ctx.Err()
  288. }
  289. }
  290. // allowMethod verifies that the given method is one of the allowed methods,
  291. // and if not, it writes an error to w. A boolean is returned indicating
  292. // whether or not the method is allowed.
  293. func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
  294. for _, meth := range ms {
  295. if m == meth {
  296. return true
  297. }
  298. }
  299. w.Header().Set("Allow", strings.Join(ms, ","))
  300. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  301. return false
  302. }