http.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdhttp
  14. import (
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "io/ioutil"
  19. "log"
  20. "net/http"
  21. "net/url"
  22. "path"
  23. "strconv"
  24. "strings"
  25. "time"
  26. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  27. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  28. etcdErr "github.com/coreos/etcd/error"
  29. "github.com/coreos/etcd/etcdserver"
  30. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  31. "github.com/coreos/etcd/pkg/types"
  32. "github.com/coreos/etcd/raft/raftpb"
  33. "github.com/coreos/etcd/store"
  34. )
  35. const (
  36. // prefixes of client endpoint
  37. keysPrefix = "/v2/keys"
  38. deprecatedMachinesPrefix = "/v2/machines"
  39. adminMembersPrefix = "/v2/admin/members/"
  40. statsPrefix = "/v2/stats"
  41. // prefixes of peer endpoint
  42. raftPrefix = "/raft"
  43. membersPrefix = "/members"
  44. // time to wait for response from EtcdServer requests
  45. defaultServerTimeout = 500 * time.Millisecond
  46. // time to wait for a Watch request
  47. defaultWatchTimeout = 5 * time.Minute
  48. )
  49. var errClosed = errors.New("etcdhttp: client closed connection")
  50. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  51. func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
  52. sh := &serverHandler{
  53. server: server,
  54. clusterInfo: server.Cluster,
  55. stats: server,
  56. timer: server,
  57. timeout: defaultServerTimeout,
  58. clock: clockwork.NewRealClock(),
  59. }
  60. mux := http.NewServeMux()
  61. mux.HandleFunc(keysPrefix, sh.serveKeys)
  62. mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
  63. mux.HandleFunc(statsPrefix+"/store", sh.serveStoreStats)
  64. mux.HandleFunc(statsPrefix+"/self", sh.serveSelfStats)
  65. mux.HandleFunc(statsPrefix+"/leader", sh.serveLeaderStats)
  66. // TODO: dynamic configuration may make this outdated. take care of it.
  67. // TODO: dynamic configuration may introduce race also.
  68. // TODO: add serveMembers
  69. mux.HandleFunc(deprecatedMachinesPrefix, sh.serveMachines)
  70. mux.HandleFunc(adminMembersPrefix, sh.serveAdminMembers)
  71. mux.HandleFunc("/", http.NotFound)
  72. return mux
  73. }
  74. // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
  75. func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
  76. sh := &serverHandler{
  77. server: server,
  78. stats: server,
  79. clusterInfo: server.Cluster,
  80. clock: clockwork.NewRealClock(),
  81. }
  82. mux := http.NewServeMux()
  83. mux.HandleFunc(raftPrefix, sh.serveRaft)
  84. mux.HandleFunc(membersPrefix, sh.serveMembers)
  85. mux.HandleFunc("/", http.NotFound)
  86. return mux
  87. }
  88. // serverHandler provides http.Handlers for etcd client and raft communication.
  89. type serverHandler struct {
  90. timeout time.Duration
  91. server etcdserver.Server
  92. stats etcdserver.Stats
  93. timer etcdserver.RaftTimer
  94. clusterInfo etcdserver.ClusterInfo
  95. clock clockwork.Clock
  96. }
  97. func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
  98. if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
  99. return
  100. }
  101. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  102. defer cancel()
  103. rr, err := parseKeyRequest(r, etcdserver.GenID(), clockwork.NewRealClock())
  104. if err != nil {
  105. writeError(w, err)
  106. return
  107. }
  108. resp, err := h.server.Do(ctx, rr)
  109. if err != nil {
  110. writeError(w, err)
  111. return
  112. }
  113. switch {
  114. case resp.Event != nil:
  115. if err := writeKeyEvent(w, resp.Event, h.timer); err != nil {
  116. // Should never be reached
  117. log.Printf("error writing event: %v", err)
  118. }
  119. case resp.Watcher != nil:
  120. ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
  121. defer cancel()
  122. handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
  123. default:
  124. writeError(w, errors.New("received response with no Event/Watcher!"))
  125. }
  126. }
  127. // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
  128. func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
  129. if !allowMethod(w, r.Method, "GET", "HEAD") {
  130. return
  131. }
  132. endpoints := h.clusterInfo.ClientURLs()
  133. w.Write([]byte(strings.Join(endpoints, ", ")))
  134. }
  135. func (h serverHandler) serveAdminMembers(w http.ResponseWriter, r *http.Request) {
  136. if !allowMethod(w, r.Method, "GET", "POST", "DELETE") {
  137. return
  138. }
  139. ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout)
  140. defer cancel()
  141. switch r.Method {
  142. case "GET":
  143. idStr := strings.TrimPrefix(r.URL.Path, adminMembersPrefix)
  144. if idStr == "" {
  145. ms := h.clusterInfo.Members()
  146. w.Header().Set("Content-Type", "application/json")
  147. if err := json.NewEncoder(w).Encode(ms); err != nil {
  148. log.Printf("etcdhttp: %v", err)
  149. }
  150. return
  151. }
  152. id, err := strconv.ParseUint(idStr, 16, 64)
  153. if err != nil {
  154. http.Error(w, err.Error(), http.StatusBadRequest)
  155. return
  156. }
  157. m := h.clusterInfo.Member(id)
  158. if m == nil {
  159. http.Error(w, "member not found", http.StatusNotFound)
  160. return
  161. }
  162. w.Header().Set("Content-Type", "application/json")
  163. if err := json.NewEncoder(w).Encode(m); err != nil {
  164. log.Printf("etcdhttp: %v", err)
  165. }
  166. return
  167. case "POST":
  168. ctype := r.Header.Get("Content-Type")
  169. if ctype != "application/json" {
  170. http.Error(w, fmt.Sprintf("bad Content-Type %s, accept application/json", ctype), http.StatusBadRequest)
  171. return
  172. }
  173. b, err := ioutil.ReadAll(r.Body)
  174. if err != nil {
  175. http.Error(w, err.Error(), http.StatusBadRequest)
  176. return
  177. }
  178. raftAttr := etcdserver.RaftAttributes{}
  179. if err := json.Unmarshal(b, &raftAttr); err != nil {
  180. http.Error(w, err.Error(), http.StatusBadRequest)
  181. return
  182. }
  183. validURLs, err := types.NewURLs(raftAttr.PeerURLs)
  184. if err != nil {
  185. http.Error(w, "bad peer urls", http.StatusBadRequest)
  186. return
  187. }
  188. now := h.clock.Now()
  189. m := etcdserver.NewMember("", validURLs, "", &now)
  190. if err := h.server.AddMember(ctx, *m); err != nil {
  191. log.Printf("etcdhttp: error adding node %x: %v", m.ID, err)
  192. writeError(w, err)
  193. return
  194. }
  195. log.Printf("etcdhttp: added node %x with peer urls %v", m.ID, raftAttr.PeerURLs)
  196. w.Header().Set("Content-Type", "application/json")
  197. w.WriteHeader(http.StatusCreated)
  198. if err := json.NewEncoder(w).Encode(m); err != nil {
  199. log.Printf("etcdhttp: %v", err)
  200. }
  201. case "DELETE":
  202. idStr := strings.TrimPrefix(r.URL.Path, adminMembersPrefix)
  203. id, err := strconv.ParseUint(idStr, 16, 64)
  204. if err != nil {
  205. http.Error(w, err.Error(), http.StatusBadRequest)
  206. return
  207. }
  208. log.Printf("etcdhttp: remove node %x", id)
  209. if err := h.server.RemoveMember(ctx, id); err != nil {
  210. log.Printf("etcdhttp: error removing node %x: %v", id, err)
  211. writeError(w, err)
  212. return
  213. }
  214. w.WriteHeader(http.StatusNoContent)
  215. }
  216. }
  217. func (h serverHandler) serveStoreStats(w http.ResponseWriter, r *http.Request) {
  218. if !allowMethod(w, r.Method, "GET") {
  219. return
  220. }
  221. w.Header().Set("Content-Type", "application/json")
  222. w.Write(h.stats.StoreStats())
  223. }
  224. func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) {
  225. if !allowMethod(w, r.Method, "GET") {
  226. return
  227. }
  228. w.Header().Set("Content-Type", "application/json")
  229. w.Write(h.stats.SelfStats())
  230. }
  231. func (h serverHandler) serveLeaderStats(w http.ResponseWriter, r *http.Request) {
  232. if !allowMethod(w, r.Method, "GET") {
  233. return
  234. }
  235. w.Header().Set("Content-Type", "application/json")
  236. w.Write(h.stats.LeaderStats())
  237. }
  238. func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
  239. if !allowMethod(w, r.Method, "POST") {
  240. return
  241. }
  242. wcid := strconv.FormatUint(h.clusterInfo.ID(), 16)
  243. w.Header().Set("X-Etcd-Cluster-ID", wcid)
  244. gcid := r.Header.Get("X-Etcd-Cluster-ID")
  245. if gcid != wcid {
  246. log.Printf("etcdhttp: request ignored due to cluster ID mismatch got %s want %s", gcid, wcid)
  247. http.Error(w, "clusterID mismatch", http.StatusPreconditionFailed)
  248. return
  249. }
  250. b, err := ioutil.ReadAll(r.Body)
  251. if err != nil {
  252. log.Println("etcdhttp: error reading raft message:", err)
  253. http.Error(w, "error reading raft message", http.StatusBadRequest)
  254. return
  255. }
  256. var m raftpb.Message
  257. if err := m.Unmarshal(b); err != nil {
  258. log.Println("etcdhttp: error unmarshaling raft message:", err)
  259. http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
  260. return
  261. }
  262. log.Printf("etcdhttp: raft recv message from %x: %+v", m.From, m)
  263. if err := h.server.Process(context.TODO(), m); err != nil {
  264. log.Println("etcdhttp: error processing raft message:", err)
  265. switch err {
  266. case etcdserver.ErrRemoved:
  267. http.Error(w, "cannot process message from removed node", http.StatusForbidden)
  268. default:
  269. writeError(w, err)
  270. }
  271. return
  272. }
  273. if m.Type == raftpb.MsgApp {
  274. h.stats.UpdateRecvApp(m.From, r.ContentLength)
  275. }
  276. w.WriteHeader(http.StatusNoContent)
  277. }
  278. func (h serverHandler) serveMembers(w http.ResponseWriter, r *http.Request) {
  279. if !allowMethod(w, r.Method, "GET") {
  280. return
  281. }
  282. cid := strconv.FormatUint(h.clusterInfo.ID(), 16)
  283. w.Header().Set("X-Etcd-Cluster-ID", cid)
  284. if r.URL.Path != membersPrefix {
  285. http.Error(w, "bad path", http.StatusBadRequest)
  286. return
  287. }
  288. ms := h.clusterInfo.Members()
  289. w.Header().Set("Content-Type", "application/json")
  290. if err := json.NewEncoder(w).Encode(ms); err != nil {
  291. log.Printf("etcdhttp: %v", err)
  292. }
  293. }
  294. // parseKeyRequest converts a received http.Request on keysPrefix to
  295. // a server Request, performing validation of supplied fields as appropriate.
  296. // If any validation fails, an empty Request and non-nil error is returned.
  297. func parseKeyRequest(r *http.Request, id uint64, clock clockwork.Clock) (etcdserverpb.Request, error) {
  298. emptyReq := etcdserverpb.Request{}
  299. err := r.ParseForm()
  300. if err != nil {
  301. return emptyReq, etcdErr.NewRequestError(
  302. etcdErr.EcodeInvalidForm,
  303. err.Error(),
  304. )
  305. }
  306. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  307. return emptyReq, etcdErr.NewRequestError(
  308. etcdErr.EcodeInvalidForm,
  309. "incorrect key prefix",
  310. )
  311. }
  312. p := path.Join(etcdserver.StoreKeysPrefix, r.URL.Path[len(keysPrefix):])
  313. var pIdx, wIdx uint64
  314. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  315. return emptyReq, etcdErr.NewRequestError(
  316. etcdErr.EcodeIndexNaN,
  317. `invalid value for "prevIndex"`,
  318. )
  319. }
  320. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  321. return emptyReq, etcdErr.NewRequestError(
  322. etcdErr.EcodeIndexNaN,
  323. `invalid value for "waitIndex"`,
  324. )
  325. }
  326. var rec, sort, wait, dir, stream bool
  327. if rec, err = getBool(r.Form, "recursive"); err != nil {
  328. return emptyReq, etcdErr.NewRequestError(
  329. etcdErr.EcodeInvalidField,
  330. `invalid value for "recursive"`,
  331. )
  332. }
  333. if sort, err = getBool(r.Form, "sorted"); err != nil {
  334. return emptyReq, etcdErr.NewRequestError(
  335. etcdErr.EcodeInvalidField,
  336. `invalid value for "sorted"`,
  337. )
  338. }
  339. if wait, err = getBool(r.Form, "wait"); err != nil {
  340. return emptyReq, etcdErr.NewRequestError(
  341. etcdErr.EcodeInvalidField,
  342. `invalid value for "wait"`,
  343. )
  344. }
  345. // TODO(jonboulle): define what parameters dir is/isn't compatible with?
  346. if dir, err = getBool(r.Form, "dir"); err != nil {
  347. return emptyReq, etcdErr.NewRequestError(
  348. etcdErr.EcodeInvalidField,
  349. `invalid value for "dir"`,
  350. )
  351. }
  352. if stream, err = getBool(r.Form, "stream"); err != nil {
  353. return emptyReq, etcdErr.NewRequestError(
  354. etcdErr.EcodeInvalidField,
  355. `invalid value for "stream"`,
  356. )
  357. }
  358. if wait && r.Method != "GET" {
  359. return emptyReq, etcdErr.NewRequestError(
  360. etcdErr.EcodeInvalidField,
  361. `"wait" can only be used with GET requests`,
  362. )
  363. }
  364. pV := r.FormValue("prevValue")
  365. if _, ok := r.Form["prevValue"]; ok && pV == "" {
  366. return emptyReq, etcdErr.NewRequestError(
  367. etcdErr.EcodeInvalidField,
  368. `"prevValue" cannot be empty`,
  369. )
  370. }
  371. // TTL is nullable, so leave it null if not specified
  372. // or an empty string
  373. var ttl *uint64
  374. if len(r.FormValue("ttl")) > 0 {
  375. i, err := getUint64(r.Form, "ttl")
  376. if err != nil {
  377. return emptyReq, etcdErr.NewRequestError(
  378. etcdErr.EcodeTTLNaN,
  379. `invalid value for "ttl"`,
  380. )
  381. }
  382. ttl = &i
  383. }
  384. // prevExist is nullable, so leave it null if not specified
  385. var pe *bool
  386. if _, ok := r.Form["prevExist"]; ok {
  387. bv, err := getBool(r.Form, "prevExist")
  388. if err != nil {
  389. return emptyReq, etcdErr.NewRequestError(
  390. etcdErr.EcodeInvalidField,
  391. "invalid value for prevExist",
  392. )
  393. }
  394. pe = &bv
  395. }
  396. rr := etcdserverpb.Request{
  397. ID: id,
  398. Method: r.Method,
  399. Path: p,
  400. Val: r.FormValue("value"),
  401. Dir: dir,
  402. PrevValue: pV,
  403. PrevIndex: pIdx,
  404. PrevExist: pe,
  405. Recursive: rec,
  406. Since: wIdx,
  407. Sorted: sort,
  408. Stream: stream,
  409. Wait: wait,
  410. }
  411. if pe != nil {
  412. rr.PrevExist = pe
  413. }
  414. // Null TTL is equivalent to unset Expiration
  415. if ttl != nil {
  416. expr := time.Duration(*ttl) * time.Second
  417. rr.Expiration = clock.Now().Add(expr).UnixNano()
  418. }
  419. return rr, nil
  420. }
  421. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  422. // not exist in the form, 0 is returned. If the key exists but the value is
  423. // badly formed, an error is returned. If multiple values are present only the
  424. // first is considered.
  425. func getUint64(form url.Values, key string) (i uint64, err error) {
  426. if vals, ok := form[key]; ok {
  427. i, err = strconv.ParseUint(vals[0], 10, 64)
  428. }
  429. return
  430. }
  431. // getBool extracts a bool by the given key from a Form. If the key does not
  432. // exist in the form, false is returned. If the key exists but the value is
  433. // badly formed, an error is returned. If multiple values are present only the
  434. // first is considered.
  435. func getBool(form url.Values, key string) (b bool, err error) {
  436. if vals, ok := form[key]; ok {
  437. b, err = strconv.ParseBool(vals[0])
  438. }
  439. return
  440. }
  441. // writeError logs and writes the given Error to the ResponseWriter
  442. // If Error is an etcdErr, it is rendered to the ResponseWriter
  443. // Otherwise, it is assumed to be an InternalServerError
  444. func writeError(w http.ResponseWriter, err error) {
  445. if err == nil {
  446. return
  447. }
  448. log.Println(err)
  449. if e, ok := err.(*etcdErr.Error); ok {
  450. e.Write(w)
  451. } else {
  452. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  453. }
  454. }
  455. // writeKeyEvent trims the prefix of key path in a single Event under
  456. // StoreKeysPrefix, serializes it and writes the resulting JSON to the given
  457. // ResponseWriter, along with the appropriate headers.
  458. func writeKeyEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
  459. if ev == nil {
  460. return errors.New("cannot write empty Event!")
  461. }
  462. w.Header().Set("Content-Type", "application/json")
  463. w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
  464. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  465. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  466. if ev.IsCreated() {
  467. w.WriteHeader(http.StatusCreated)
  468. }
  469. ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
  470. return json.NewEncoder(w).Encode(ev)
  471. }
  472. func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
  473. defer wa.Remove()
  474. ech := wa.EventChan()
  475. var nch <-chan bool
  476. if x, ok := w.(http.CloseNotifier); ok {
  477. nch = x.CloseNotify()
  478. }
  479. w.Header().Set("Content-Type", "application/json")
  480. w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
  481. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  482. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  483. w.WriteHeader(http.StatusOK)
  484. // Ensure headers are flushed early, in case of long polling
  485. w.(http.Flusher).Flush()
  486. for {
  487. select {
  488. case <-nch:
  489. // Client closed connection. Nothing to do.
  490. return
  491. case <-ctx.Done():
  492. // Timed out. net/http will close the connection for us, so nothing to do.
  493. return
  494. case ev, ok := <-ech:
  495. if !ok {
  496. // If the channel is closed this may be an indication of
  497. // that notifications are much more than we are able to
  498. // send to the client in time. Then we simply end streaming.
  499. return
  500. }
  501. ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
  502. if err := json.NewEncoder(w).Encode(ev); err != nil {
  503. // Should never be reached
  504. log.Printf("error writing event: %v\n", err)
  505. return
  506. }
  507. if !stream {
  508. return
  509. }
  510. w.(http.Flusher).Flush()
  511. }
  512. }
  513. }
  514. // allowMethod verifies that the given method is one of the allowed methods,
  515. // and if not, it writes an error to w. A boolean is returned indicating
  516. // whether or not the method is allowed.
  517. func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
  518. for _, meth := range ms {
  519. if m == meth {
  520. return true
  521. }
  522. }
  523. w.Header().Set("Allow", strings.Join(ms, ","))
  524. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  525. return false
  526. }
  527. func trimEventPrefix(ev *store.Event, prefix string) *store.Event {
  528. if ev == nil {
  529. return nil
  530. }
  531. ev.Node = trimNodeExternPrefix(ev.Node, prefix)
  532. ev.PrevNode = trimNodeExternPrefix(ev.PrevNode, prefix)
  533. return ev
  534. }
  535. func trimNodeExternPrefix(n *store.NodeExtern, prefix string) *store.NodeExtern {
  536. if n == nil {
  537. return nil
  538. }
  539. n.Key = strings.TrimPrefix(n.Key, prefix)
  540. for _, nn := range n.Nodes {
  541. nn = trimNodeExternPrefix(nn, prefix)
  542. }
  543. return n
  544. }