http.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446
  1. package etcdhttp
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. "net/http"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  14. etcdErr "github.com/coreos/etcd/error"
  15. "github.com/coreos/etcd/etcdserver"
  16. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  17. "github.com/coreos/etcd/raft/raftpb"
  18. "github.com/coreos/etcd/store"
  19. )
  20. const (
  21. keysPrefix = "/v2/keys"
  22. deprecatedMachinesPrefix = "/v2/machines"
  23. adminMembersPrefix = "/v2/admin/members/"
  24. raftPrefix = "/raft"
  25. // time to wait for response from EtcdServer requests
  26. defaultServerTimeout = 500 * time.Millisecond
  27. // time to wait for a Watch request
  28. defaultWatchTimeout = 5 * time.Minute
  29. )
  30. var errClosed = errors.New("etcdhttp: client closed connection")
  31. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  32. func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
  33. sh := &serverHandler{
  34. server: server,
  35. clusterStore: server.ClusterStore,
  36. timer: server,
  37. timeout: defaultServerTimeout,
  38. }
  39. mux := http.NewServeMux()
  40. mux.HandleFunc(keysPrefix, sh.serveKeys)
  41. mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
  42. // TODO: dynamic configuration may make this outdated. take care of it.
  43. // TODO: dynamic configuration may introduce race also.
  44. // TODO: add serveMembers
  45. mux.HandleFunc(deprecatedMachinesPrefix, sh.serveMachines)
  46. mux.HandleFunc(adminMembersPrefix, sh.serveAdminMembers)
  47. mux.HandleFunc("/", http.NotFound)
  48. return mux
  49. }
  50. // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
  51. func NewPeerHandler(server etcdserver.Server) http.Handler {
  52. sh := &serverHandler{
  53. server: server,
  54. }
  55. mux := http.NewServeMux()
  56. mux.HandleFunc(raftPrefix, sh.serveRaft)
  57. mux.HandleFunc("/", http.NotFound)
  58. return mux
  59. }
  60. // serverHandler provides http.Handlers for etcd client and raft communication.
  61. type serverHandler struct {
  62. timeout time.Duration
  63. server etcdserver.Server
  64. timer etcdserver.RaftTimer
  65. clusterStore etcdserver.ClusterStore
  66. }
  67. func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
  68. if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
  69. return
  70. }
  71. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  72. defer cancel()
  73. rr, err := parseRequest(r, etcdserver.GenID())
  74. if err != nil {
  75. writeError(w, err)
  76. return
  77. }
  78. resp, err := h.server.Do(ctx, rr)
  79. if err != nil {
  80. writeError(w, err)
  81. return
  82. }
  83. switch {
  84. case resp.Event != nil:
  85. if err := writeEvent(w, resp.Event, h.timer); err != nil {
  86. // Should never be reached
  87. log.Printf("error writing event: %v", err)
  88. }
  89. case resp.Watcher != nil:
  90. ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
  91. defer cancel()
  92. handleWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
  93. default:
  94. writeError(w, errors.New("received response with no Event/Watcher!"))
  95. }
  96. }
  97. // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
  98. func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
  99. if !allowMethod(w, r.Method, "GET", "HEAD") {
  100. return
  101. }
  102. endpoints := h.clusterStore.Get().ClientURLs()
  103. w.Write([]byte(strings.Join(endpoints, ", ")))
  104. }
  105. func (h serverHandler) serveAdminMembers(w http.ResponseWriter, r *http.Request) {
  106. if !allowMethod(w, r.Method, "PUT", "DELETE") {
  107. return
  108. }
  109. ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout)
  110. defer cancel()
  111. idStr := strings.TrimPrefix(r.URL.Path, adminMembersPrefix)
  112. id, err := strconv.ParseUint(idStr, 16, 64)
  113. if err != nil {
  114. http.Error(w, err.Error(), http.StatusBadRequest)
  115. return
  116. }
  117. switch r.Method {
  118. case "PUT":
  119. if err := r.ParseForm(); err != nil {
  120. http.Error(w, err.Error(), http.StatusBadRequest)
  121. return
  122. }
  123. peerURLs := r.PostForm["PeerURLs"]
  124. log.Printf("etcdhttp: add node %x with peer urls %v", id, peerURLs)
  125. m := etcdserver.Member{
  126. ID: id,
  127. RaftAttributes: etcdserver.RaftAttributes{
  128. PeerURLs: peerURLs,
  129. },
  130. }
  131. if err := h.server.AddMember(ctx, m); err != nil {
  132. log.Printf("etcdhttp: error adding node %x: %v", id, err)
  133. writeError(w, err)
  134. return
  135. }
  136. w.WriteHeader(http.StatusCreated)
  137. case "DELETE":
  138. log.Printf("etcdhttp: remove node %x", id)
  139. if err := h.server.RemoveMember(ctx, id); err != nil {
  140. log.Printf("etcdhttp: error removing node %x: %v", id, err)
  141. writeError(w, err)
  142. return
  143. }
  144. w.WriteHeader(http.StatusNoContent)
  145. }
  146. }
  147. func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
  148. if !allowMethod(w, r.Method, "POST") {
  149. return
  150. }
  151. b, err := ioutil.ReadAll(r.Body)
  152. if err != nil {
  153. log.Println("etcdhttp: error reading raft message:", err)
  154. http.Error(w, "error reading raft message", http.StatusBadRequest)
  155. return
  156. }
  157. var m raftpb.Message
  158. if err := m.Unmarshal(b); err != nil {
  159. log.Println("etcdhttp: error unmarshaling raft message:", err)
  160. http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
  161. return
  162. }
  163. log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
  164. if err := h.server.Process(context.TODO(), m); err != nil {
  165. log.Println("etcdhttp: error processing raft message:", err)
  166. writeError(w, err)
  167. return
  168. }
  169. w.WriteHeader(http.StatusNoContent)
  170. }
  171. // parseRequest converts a received http.Request to a server Request,
  172. // performing validation of supplied fields as appropriate.
  173. // If any validation fails, an empty Request and non-nil error is returned.
  174. func parseRequest(r *http.Request, id uint64) (etcdserverpb.Request, error) {
  175. emptyReq := etcdserverpb.Request{}
  176. err := r.ParseForm()
  177. if err != nil {
  178. return emptyReq, etcdErr.NewRequestError(
  179. etcdErr.EcodeInvalidForm,
  180. err.Error(),
  181. )
  182. }
  183. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  184. return emptyReq, etcdErr.NewRequestError(
  185. etcdErr.EcodeInvalidForm,
  186. "incorrect key prefix",
  187. )
  188. }
  189. p := r.URL.Path[len(keysPrefix):]
  190. var pIdx, wIdx uint64
  191. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  192. return emptyReq, etcdErr.NewRequestError(
  193. etcdErr.EcodeIndexNaN,
  194. `invalid value for "prevIndex"`,
  195. )
  196. }
  197. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  198. return emptyReq, etcdErr.NewRequestError(
  199. etcdErr.EcodeIndexNaN,
  200. `invalid value for "waitIndex"`,
  201. )
  202. }
  203. var rec, sort, wait, dir, stream bool
  204. if rec, err = getBool(r.Form, "recursive"); err != nil {
  205. return emptyReq, etcdErr.NewRequestError(
  206. etcdErr.EcodeInvalidField,
  207. `invalid value for "recursive"`,
  208. )
  209. }
  210. if sort, err = getBool(r.Form, "sorted"); err != nil {
  211. return emptyReq, etcdErr.NewRequestError(
  212. etcdErr.EcodeInvalidField,
  213. `invalid value for "sorted"`,
  214. )
  215. }
  216. if wait, err = getBool(r.Form, "wait"); err != nil {
  217. return emptyReq, etcdErr.NewRequestError(
  218. etcdErr.EcodeInvalidField,
  219. `invalid value for "wait"`,
  220. )
  221. }
  222. // TODO(jonboulle): define what parameters dir is/isn't compatible with?
  223. if dir, err = getBool(r.Form, "dir"); err != nil {
  224. return emptyReq, etcdErr.NewRequestError(
  225. etcdErr.EcodeInvalidField,
  226. `invalid value for "dir"`,
  227. )
  228. }
  229. if stream, err = getBool(r.Form, "stream"); err != nil {
  230. return emptyReq, etcdErr.NewRequestError(
  231. etcdErr.EcodeInvalidField,
  232. `invalid value for "stream"`,
  233. )
  234. }
  235. if wait && r.Method != "GET" {
  236. return emptyReq, etcdErr.NewRequestError(
  237. etcdErr.EcodeInvalidField,
  238. `"wait" can only be used with GET requests`,
  239. )
  240. }
  241. pV := r.FormValue("prevValue")
  242. if _, ok := r.Form["prevValue"]; ok && pV == "" {
  243. return emptyReq, etcdErr.NewRequestError(
  244. etcdErr.EcodeInvalidField,
  245. `"prevValue" cannot be empty`,
  246. )
  247. }
  248. // TTL is nullable, so leave it null if not specified
  249. // or an empty string
  250. var ttl *uint64
  251. if len(r.FormValue("ttl")) > 0 {
  252. i, err := getUint64(r.Form, "ttl")
  253. if err != nil {
  254. return emptyReq, etcdErr.NewRequestError(
  255. etcdErr.EcodeTTLNaN,
  256. `invalid value for "ttl"`,
  257. )
  258. }
  259. ttl = &i
  260. }
  261. // prevExist is nullable, so leave it null if not specified
  262. var pe *bool
  263. if _, ok := r.Form["prevExist"]; ok {
  264. bv, err := getBool(r.Form, "prevExist")
  265. if err != nil {
  266. return emptyReq, etcdErr.NewRequestError(
  267. etcdErr.EcodeInvalidField,
  268. "invalid value for prevExist",
  269. )
  270. }
  271. pe = &bv
  272. }
  273. rr := etcdserverpb.Request{
  274. ID: id,
  275. Method: r.Method,
  276. Path: p,
  277. Val: r.FormValue("value"),
  278. Dir: dir,
  279. PrevValue: pV,
  280. PrevIndex: pIdx,
  281. PrevExist: pe,
  282. Recursive: rec,
  283. Since: wIdx,
  284. Sorted: sort,
  285. Stream: stream,
  286. Wait: wait,
  287. }
  288. if pe != nil {
  289. rr.PrevExist = pe
  290. }
  291. // Null TTL is equivalent to unset Expiration
  292. // TODO(jonboulle): use fake clock instead of time module
  293. // https://github.com/coreos/etcd/issues/1021
  294. if ttl != nil {
  295. expr := time.Duration(*ttl) * time.Second
  296. rr.Expiration = time.Now().Add(expr).UnixNano()
  297. }
  298. return rr, nil
  299. }
  300. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  301. // not exist in the form, 0 is returned. If the key exists but the value is
  302. // badly formed, an error is returned. If multiple values are present only the
  303. // first is considered.
  304. func getUint64(form url.Values, key string) (i uint64, err error) {
  305. if vals, ok := form[key]; ok {
  306. i, err = strconv.ParseUint(vals[0], 10, 64)
  307. }
  308. return
  309. }
  310. // getBool extracts a bool by the given key from a Form. If the key does not
  311. // exist in the form, false is returned. If the key exists but the value is
  312. // badly formed, an error is returned. If multiple values are present only the
  313. // first is considered.
  314. func getBool(form url.Values, key string) (b bool, err error) {
  315. if vals, ok := form[key]; ok {
  316. b, err = strconv.ParseBool(vals[0])
  317. }
  318. return
  319. }
  320. // writeError logs and writes the given Error to the ResponseWriter
  321. // If Error is an etcdErr, it is rendered to the ResponseWriter
  322. // Otherwise, it is assumed to be an InternalServerError
  323. func writeError(w http.ResponseWriter, err error) {
  324. if err == nil {
  325. return
  326. }
  327. log.Println(err)
  328. if e, ok := err.(*etcdErr.Error); ok {
  329. e.Write(w)
  330. } else {
  331. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  332. }
  333. }
  334. // writeEvent serializes a single Event and writes the resulting
  335. // JSON to the given ResponseWriter, along with the appropriate
  336. // headers
  337. func writeEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
  338. if ev == nil {
  339. return errors.New("cannot write empty Event!")
  340. }
  341. w.Header().Set("Content-Type", "application/json")
  342. w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
  343. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  344. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  345. if ev.IsCreated() {
  346. w.WriteHeader(http.StatusCreated)
  347. }
  348. return json.NewEncoder(w).Encode(ev)
  349. }
  350. func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
  351. defer wa.Remove()
  352. ech := wa.EventChan()
  353. var nch <-chan bool
  354. if x, ok := w.(http.CloseNotifier); ok {
  355. nch = x.CloseNotify()
  356. }
  357. w.Header().Set("Content-Type", "application/json")
  358. w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
  359. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  360. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  361. w.WriteHeader(http.StatusOK)
  362. // Ensure headers are flushed early, in case of long polling
  363. w.(http.Flusher).Flush()
  364. for {
  365. select {
  366. case <-nch:
  367. // Client closed connection. Nothing to do.
  368. return
  369. case <-ctx.Done():
  370. // Timed out. net/http will close the connection for us, so nothing to do.
  371. return
  372. case ev, ok := <-ech:
  373. if !ok {
  374. // If the channel is closed this may be an indication of
  375. // that notifications are much more than we are able to
  376. // send to the client in time. Then we simply end streaming.
  377. return
  378. }
  379. if err := json.NewEncoder(w).Encode(ev); err != nil {
  380. // Should never be reached
  381. log.Println("error writing event: %v", err)
  382. return
  383. }
  384. if !stream {
  385. return
  386. }
  387. w.(http.Flusher).Flush()
  388. }
  389. }
  390. }
  391. // allowMethod verifies that the given method is one of the allowed methods,
  392. // and if not, it writes an error to w. A boolean is returned indicating
  393. // whether or not the method is allowed.
  394. func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
  395. for _, meth := range ms {
  396. if m == meth {
  397. return true
  398. }
  399. }
  400. w.Header().Set("Allow", strings.Join(ms, ","))
  401. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  402. return false
  403. }