http.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. package etcdhttp
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. "net/http"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  14. etcdErr "github.com/coreos/etcd/error"
  15. "github.com/coreos/etcd/etcdserver"
  16. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  17. "github.com/coreos/etcd/raft/raftpb"
  18. "github.com/coreos/etcd/store"
  19. )
  20. const (
  21. keysPrefix = "/v2/keys"
  22. deprecatedMachinesPrefix = "/v2/machines"
  23. adminMembersPrefix = "/v2/admin/members/"
  24. raftPrefix = "/raft"
  25. statsPrefix = "/v2/stats"
  26. // time to wait for response from EtcdServer requests
  27. defaultServerTimeout = 500 * time.Millisecond
  28. // time to wait for a Watch request
  29. defaultWatchTimeout = 5 * time.Minute
  30. )
  31. var errClosed = errors.New("etcdhttp: client closed connection")
  32. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  33. func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
  34. sh := &serverHandler{
  35. server: server,
  36. clusterStore: server.ClusterStore,
  37. stats: server,
  38. timer: server,
  39. timeout: defaultServerTimeout,
  40. }
  41. mux := http.NewServeMux()
  42. mux.HandleFunc(keysPrefix, sh.serveKeys)
  43. mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
  44. mux.HandleFunc(statsPrefix+"/store", sh.serveStoreStats)
  45. mux.HandleFunc(statsPrefix+"/self", sh.serveSelfStats)
  46. mux.HandleFunc(statsPrefix+"/leader", sh.serveLeaderStats)
  47. // TODO: dynamic configuration may make this outdated. take care of it.
  48. // TODO: dynamic configuration may introduce race also.
  49. // TODO: add serveMembers
  50. mux.HandleFunc(deprecatedMachinesPrefix, sh.serveMachines)
  51. mux.HandleFunc(adminMembersPrefix, sh.serveAdminMembers)
  52. mux.HandleFunc("/", http.NotFound)
  53. return mux
  54. }
  55. // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
  56. func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
  57. sh := &serverHandler{
  58. server: server,
  59. stats: server,
  60. }
  61. mux := http.NewServeMux()
  62. mux.HandleFunc(raftPrefix, sh.serveRaft)
  63. mux.HandleFunc("/", http.NotFound)
  64. return mux
  65. }
  66. // serverHandler provides http.Handlers for etcd client and raft communication.
  67. type serverHandler struct {
  68. timeout time.Duration
  69. server etcdserver.Server
  70. stats etcdserver.ServerStats
  71. storestats etcdserver.StoreStats
  72. timer etcdserver.RaftTimer
  73. clusterStore etcdserver.ClusterStore
  74. }
  75. func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
  76. if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
  77. return
  78. }
  79. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  80. defer cancel()
  81. rr, err := parseRequest(r, etcdserver.GenID())
  82. if err != nil {
  83. writeError(w, err)
  84. return
  85. }
  86. resp, err := h.server.Do(ctx, rr)
  87. if err != nil {
  88. writeError(w, err)
  89. return
  90. }
  91. switch {
  92. case resp.Event != nil:
  93. if err := writeEvent(w, resp.Event, h.timer); err != nil {
  94. // Should never be reached
  95. log.Printf("error writing event: %v", err)
  96. }
  97. case resp.Watcher != nil:
  98. ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
  99. defer cancel()
  100. handleWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
  101. default:
  102. writeError(w, errors.New("received response with no Event/Watcher!"))
  103. }
  104. }
  105. // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
  106. func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
  107. if !allowMethod(w, r.Method, "GET", "HEAD") {
  108. return
  109. }
  110. endpoints := h.clusterStore.Get().ClientURLs()
  111. w.Write([]byte(strings.Join(endpoints, ", ")))
  112. }
  113. func (h serverHandler) serveAdminMembers(w http.ResponseWriter, r *http.Request) {
  114. if !allowMethod(w, r.Method, "PUT", "DELETE") {
  115. return
  116. }
  117. ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout)
  118. defer cancel()
  119. idStr := strings.TrimPrefix(r.URL.Path, adminMembersPrefix)
  120. id, err := strconv.ParseUint(idStr, 16, 64)
  121. if err != nil {
  122. http.Error(w, err.Error(), http.StatusBadRequest)
  123. return
  124. }
  125. switch r.Method {
  126. case "PUT":
  127. if err := r.ParseForm(); err != nil {
  128. http.Error(w, err.Error(), http.StatusBadRequest)
  129. return
  130. }
  131. peerURLs := r.PostForm["PeerURLs"]
  132. log.Printf("etcdhttp: add node %x with peer urls %v", id, peerURLs)
  133. m := etcdserver.Member{
  134. ID: id,
  135. RaftAttributes: etcdserver.RaftAttributes{
  136. PeerURLs: peerURLs,
  137. },
  138. }
  139. if err := h.server.AddMember(ctx, m); err != nil {
  140. log.Printf("etcdhttp: error adding node %x: %v", id, err)
  141. writeError(w, err)
  142. return
  143. }
  144. w.WriteHeader(http.StatusCreated)
  145. case "DELETE":
  146. log.Printf("etcdhttp: remove node %x", id)
  147. if err := h.server.RemoveMember(ctx, id); err != nil {
  148. log.Printf("etcdhttp: error removing node %x: %v", id, err)
  149. writeError(w, err)
  150. return
  151. }
  152. w.WriteHeader(http.StatusNoContent)
  153. }
  154. }
  155. func (h serverHandler) serveStoreStats(w http.ResponseWriter, r *http.Request) {
  156. if !allowMethod(w, r.Method, "GET") {
  157. return
  158. }
  159. w.Header().Set("Content-Type", "application/json")
  160. w.Write(h.storestats.JSON())
  161. }
  162. func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) {
  163. if !allowMethod(w, r.Method, "GET") {
  164. return
  165. }
  166. s := h.stats.SelfStats()
  167. b, err := json.Marshal(s)
  168. if err != nil {
  169. log.Printf("error marshalling stats: %v\n", err)
  170. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  171. return
  172. }
  173. w.Header().Set("Content-Type", "application/json")
  174. w.Write(b)
  175. }
  176. func (h serverHandler) serveLeaderStats(w http.ResponseWriter, r *http.Request) {
  177. if !allowMethod(w, r.Method, "GET") {
  178. return
  179. }
  180. s := h.stats.LeaderStats()
  181. b, err := json.Marshal(s)
  182. if err != nil {
  183. log.Printf("error marshalling stats: %v\n", err)
  184. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  185. return
  186. }
  187. w.Header().Set("Content-Type", "application/json")
  188. w.Write(b)
  189. }
  190. func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
  191. if !allowMethod(w, r.Method, "POST") {
  192. return
  193. }
  194. b, err := ioutil.ReadAll(r.Body)
  195. if err != nil {
  196. log.Println("etcdhttp: error reading raft message:", err)
  197. http.Error(w, "error reading raft message", http.StatusBadRequest)
  198. return
  199. }
  200. var m raftpb.Message
  201. if err := m.Unmarshal(b); err != nil {
  202. log.Println("etcdhttp: error unmarshaling raft message:", err)
  203. http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
  204. return
  205. }
  206. log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
  207. if m.Type == raftpb.MsgApp {
  208. // TODO(jonboulle): standardize id uint-->string process: always base 16?
  209. h.stats.SelfStats().RecvAppendReq(strconv.FormatUint(m.From, 16), int(r.ContentLength))
  210. }
  211. if err := h.server.Process(context.TODO(), m); err != nil {
  212. log.Println("etcdhttp: error processing raft message:", err)
  213. writeError(w, err)
  214. return
  215. }
  216. w.WriteHeader(http.StatusNoContent)
  217. }
  218. // parseRequest converts a received http.Request to a server Request,
  219. // performing validation of supplied fields as appropriate.
  220. // If any validation fails, an empty Request and non-nil error is returned.
  221. func parseRequest(r *http.Request, id uint64) (etcdserverpb.Request, error) {
  222. emptyReq := etcdserverpb.Request{}
  223. err := r.ParseForm()
  224. if err != nil {
  225. return emptyReq, etcdErr.NewRequestError(
  226. etcdErr.EcodeInvalidForm,
  227. err.Error(),
  228. )
  229. }
  230. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  231. return emptyReq, etcdErr.NewRequestError(
  232. etcdErr.EcodeInvalidForm,
  233. "incorrect key prefix",
  234. )
  235. }
  236. p := r.URL.Path[len(keysPrefix):]
  237. var pIdx, wIdx uint64
  238. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  239. return emptyReq, etcdErr.NewRequestError(
  240. etcdErr.EcodeIndexNaN,
  241. `invalid value for "prevIndex"`,
  242. )
  243. }
  244. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  245. return emptyReq, etcdErr.NewRequestError(
  246. etcdErr.EcodeIndexNaN,
  247. `invalid value for "waitIndex"`,
  248. )
  249. }
  250. var rec, sort, wait, dir, stream bool
  251. if rec, err = getBool(r.Form, "recursive"); err != nil {
  252. return emptyReq, etcdErr.NewRequestError(
  253. etcdErr.EcodeInvalidField,
  254. `invalid value for "recursive"`,
  255. )
  256. }
  257. if sort, err = getBool(r.Form, "sorted"); err != nil {
  258. return emptyReq, etcdErr.NewRequestError(
  259. etcdErr.EcodeInvalidField,
  260. `invalid value for "sorted"`,
  261. )
  262. }
  263. if wait, err = getBool(r.Form, "wait"); err != nil {
  264. return emptyReq, etcdErr.NewRequestError(
  265. etcdErr.EcodeInvalidField,
  266. `invalid value for "wait"`,
  267. )
  268. }
  269. // TODO(jonboulle): define what parameters dir is/isn't compatible with?
  270. if dir, err = getBool(r.Form, "dir"); err != nil {
  271. return emptyReq, etcdErr.NewRequestError(
  272. etcdErr.EcodeInvalidField,
  273. `invalid value for "dir"`,
  274. )
  275. }
  276. if stream, err = getBool(r.Form, "stream"); err != nil {
  277. return emptyReq, etcdErr.NewRequestError(
  278. etcdErr.EcodeInvalidField,
  279. `invalid value for "stream"`,
  280. )
  281. }
  282. if wait && r.Method != "GET" {
  283. return emptyReq, etcdErr.NewRequestError(
  284. etcdErr.EcodeInvalidField,
  285. `"wait" can only be used with GET requests`,
  286. )
  287. }
  288. pV := r.FormValue("prevValue")
  289. if _, ok := r.Form["prevValue"]; ok && pV == "" {
  290. return emptyReq, etcdErr.NewRequestError(
  291. etcdErr.EcodeInvalidField,
  292. `"prevValue" cannot be empty`,
  293. )
  294. }
  295. // TTL is nullable, so leave it null if not specified
  296. // or an empty string
  297. var ttl *uint64
  298. if len(r.FormValue("ttl")) > 0 {
  299. i, err := getUint64(r.Form, "ttl")
  300. if err != nil {
  301. return emptyReq, etcdErr.NewRequestError(
  302. etcdErr.EcodeTTLNaN,
  303. `invalid value for "ttl"`,
  304. )
  305. }
  306. ttl = &i
  307. }
  308. // prevExist is nullable, so leave it null if not specified
  309. var pe *bool
  310. if _, ok := r.Form["prevExist"]; ok {
  311. bv, err := getBool(r.Form, "prevExist")
  312. if err != nil {
  313. return emptyReq, etcdErr.NewRequestError(
  314. etcdErr.EcodeInvalidField,
  315. "invalid value for prevExist",
  316. )
  317. }
  318. pe = &bv
  319. }
  320. rr := etcdserverpb.Request{
  321. ID: id,
  322. Method: r.Method,
  323. Path: p,
  324. Val: r.FormValue("value"),
  325. Dir: dir,
  326. PrevValue: pV,
  327. PrevIndex: pIdx,
  328. PrevExist: pe,
  329. Recursive: rec,
  330. Since: wIdx,
  331. Sorted: sort,
  332. Stream: stream,
  333. Wait: wait,
  334. }
  335. if pe != nil {
  336. rr.PrevExist = pe
  337. }
  338. // Null TTL is equivalent to unset Expiration
  339. // TODO(jonboulle): use fake clock instead of time module
  340. // https://github.com/coreos/etcd/issues/1021
  341. if ttl != nil {
  342. expr := time.Duration(*ttl) * time.Second
  343. rr.Expiration = time.Now().Add(expr).UnixNano()
  344. }
  345. return rr, nil
  346. }
  347. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  348. // not exist in the form, 0 is returned. If the key exists but the value is
  349. // badly formed, an error is returned. If multiple values are present only the
  350. // first is considered.
  351. func getUint64(form url.Values, key string) (i uint64, err error) {
  352. if vals, ok := form[key]; ok {
  353. i, err = strconv.ParseUint(vals[0], 10, 64)
  354. }
  355. return
  356. }
  357. // getBool extracts a bool by the given key from a Form. If the key does not
  358. // exist in the form, false is returned. If the key exists but the value is
  359. // badly formed, an error is returned. If multiple values are present only the
  360. // first is considered.
  361. func getBool(form url.Values, key string) (b bool, err error) {
  362. if vals, ok := form[key]; ok {
  363. b, err = strconv.ParseBool(vals[0])
  364. }
  365. return
  366. }
  367. // writeError logs and writes the given Error to the ResponseWriter
  368. // If Error is an etcdErr, it is rendered to the ResponseWriter
  369. // Otherwise, it is assumed to be an InternalServerError
  370. func writeError(w http.ResponseWriter, err error) {
  371. if err == nil {
  372. return
  373. }
  374. log.Println(err)
  375. if e, ok := err.(*etcdErr.Error); ok {
  376. e.Write(w)
  377. } else {
  378. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  379. }
  380. }
  381. // writeEvent serializes a single Event and writes the resulting
  382. // JSON to the given ResponseWriter, along with the appropriate
  383. // headers
  384. func writeEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
  385. if ev == nil {
  386. return errors.New("cannot write empty Event!")
  387. }
  388. w.Header().Set("Content-Type", "application/json")
  389. w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
  390. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  391. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  392. if ev.IsCreated() {
  393. w.WriteHeader(http.StatusCreated)
  394. }
  395. return json.NewEncoder(w).Encode(ev)
  396. }
  397. func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
  398. defer wa.Remove()
  399. ech := wa.EventChan()
  400. var nch <-chan bool
  401. if x, ok := w.(http.CloseNotifier); ok {
  402. nch = x.CloseNotify()
  403. }
  404. w.Header().Set("Content-Type", "application/json")
  405. w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
  406. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  407. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  408. w.WriteHeader(http.StatusOK)
  409. // Ensure headers are flushed early, in case of long polling
  410. w.(http.Flusher).Flush()
  411. for {
  412. select {
  413. case <-nch:
  414. // Client closed connection. Nothing to do.
  415. return
  416. case <-ctx.Done():
  417. // Timed out. net/http will close the connection for us, so nothing to do.
  418. return
  419. case ev, ok := <-ech:
  420. if !ok {
  421. // If the channel is closed this may be an indication of
  422. // that notifications are much more than we are able to
  423. // send to the client in time. Then we simply end streaming.
  424. return
  425. }
  426. if err := json.NewEncoder(w).Encode(ev); err != nil {
  427. // Should never be reached
  428. log.Println("error writing event: %v", err)
  429. return
  430. }
  431. if !stream {
  432. return
  433. }
  434. w.(http.Flusher).Flush()
  435. }
  436. }
  437. }
  438. // allowMethod verifies that the given method is one of the allowed methods,
  439. // and if not, it writes an error to w. A boolean is returned indicating
  440. // whether or not the method is allowed.
  441. func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
  442. for _, meth := range ms {
  443. if m == meth {
  444. return true
  445. }
  446. }
  447. w.Header().Set("Allow", strings.Join(ms, ","))
  448. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  449. return false
  450. }