client.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdhttp
  14. import (
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "io/ioutil"
  19. "log"
  20. "net/http"
  21. "net/url"
  22. "path"
  23. "strconv"
  24. "strings"
  25. "time"
  26. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  27. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  28. etcdErr "github.com/coreos/etcd/error"
  29. "github.com/coreos/etcd/etcdserver"
  30. "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
  31. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  32. "github.com/coreos/etcd/etcdserver/stats"
  33. "github.com/coreos/etcd/pkg/types"
  34. "github.com/coreos/etcd/store"
  35. "github.com/coreos/etcd/version"
  36. )
  37. const (
  38. keysPrefix = "/v2/keys"
  39. deprecatedMachinesPrefix = "/v2/machines"
  40. membersPrefix = "/v2/members"
  41. statsPrefix = "/v2/stats"
  42. versionPrefix = "/version"
  43. )
  44. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  45. func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
  46. kh := &keysHandler{
  47. server: server,
  48. clusterInfo: server.Cluster,
  49. timer: server,
  50. timeout: defaultServerTimeout,
  51. }
  52. sh := &statsHandler{
  53. stats: server,
  54. }
  55. mh := &membersHandler{
  56. server: server,
  57. clusterInfo: server.Cluster,
  58. clock: clockwork.NewRealClock(),
  59. }
  60. dmh := &deprecatedMachinesHandler{
  61. clusterInfo: server.Cluster,
  62. }
  63. mux := http.NewServeMux()
  64. mux.HandleFunc("/", http.NotFound)
  65. mux.HandleFunc(versionPrefix, serveVersion)
  66. mux.Handle(keysPrefix, kh)
  67. mux.Handle(keysPrefix+"/", kh)
  68. mux.HandleFunc(statsPrefix+"/store", sh.serveStore)
  69. mux.HandleFunc(statsPrefix+"/self", sh.serveSelf)
  70. mux.HandleFunc(statsPrefix+"/leader", sh.serveLeader)
  71. mux.Handle(membersPrefix, mh)
  72. mux.Handle(membersPrefix+"/", mh)
  73. mux.Handle(deprecatedMachinesPrefix, dmh)
  74. return mux
  75. }
  76. type keysHandler struct {
  77. server etcdserver.Server
  78. clusterInfo etcdserver.ClusterInfo
  79. timer etcdserver.RaftTimer
  80. timeout time.Duration
  81. }
  82. func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  83. if !allowMethod(w, r.Method, "HEAD", "GET", "PUT", "POST", "DELETE") {
  84. return
  85. }
  86. w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
  87. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  88. defer cancel()
  89. rr, err := parseKeyRequest(r, clockwork.NewRealClock())
  90. if err != nil {
  91. writeError(w, err)
  92. return
  93. }
  94. resp, err := h.server.Do(ctx, rr)
  95. if err != nil {
  96. err = trimErrorPrefix(err, etcdserver.StoreKeysPrefix)
  97. writeError(w, err)
  98. return
  99. }
  100. switch {
  101. case resp.Event != nil:
  102. if err := writeKeyEvent(w, resp.Event, h.timer); err != nil {
  103. // Should never be reached
  104. log.Printf("error writing event: %v", err)
  105. }
  106. case resp.Watcher != nil:
  107. ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
  108. defer cancel()
  109. handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
  110. default:
  111. writeError(w, errors.New("received response with no Event/Watcher!"))
  112. }
  113. }
  114. type deprecatedMachinesHandler struct {
  115. clusterInfo etcdserver.ClusterInfo
  116. }
  117. func (h *deprecatedMachinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  118. if !allowMethod(w, r.Method, "GET", "HEAD") {
  119. return
  120. }
  121. endpoints := h.clusterInfo.ClientURLs()
  122. w.Write([]byte(strings.Join(endpoints, ", ")))
  123. }
  124. type membersHandler struct {
  125. server etcdserver.Server
  126. clusterInfo etcdserver.ClusterInfo
  127. clock clockwork.Clock
  128. }
  129. func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  130. if !allowMethod(w, r.Method, "GET", "POST", "DELETE", "PUT") {
  131. return
  132. }
  133. w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
  134. ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout)
  135. defer cancel()
  136. switch r.Method {
  137. case "GET":
  138. if trimPrefix(r.URL.Path, membersPrefix) != "" {
  139. writeError(w, httptypes.NewHTTPError(http.StatusNotFound, "Not found"))
  140. return
  141. }
  142. mc := newMemberCollection(h.clusterInfo.Members())
  143. w.Header().Set("Content-Type", "application/json")
  144. if err := json.NewEncoder(w).Encode(mc); err != nil {
  145. log.Printf("etcdhttp: %v", err)
  146. }
  147. case "POST":
  148. req := httptypes.MemberCreateRequest{}
  149. if ok := unmarshalRequest(r, &req, w); !ok {
  150. return
  151. }
  152. now := h.clock.Now()
  153. m := etcdserver.NewMember("", req.PeerURLs, "", &now)
  154. err := h.server.AddMember(ctx, *m)
  155. switch {
  156. case err == etcdserver.ErrIDExists || err == etcdserver.ErrPeerURLexists:
  157. writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error()))
  158. return
  159. case err != nil:
  160. log.Printf("etcdhttp: error adding node %s: %v", m.ID, err)
  161. writeError(w, err)
  162. return
  163. }
  164. res := newMember(m)
  165. w.Header().Set("Content-Type", "application/json")
  166. w.WriteHeader(http.StatusCreated)
  167. if err := json.NewEncoder(w).Encode(res); err != nil {
  168. log.Printf("etcdhttp: %v", err)
  169. }
  170. case "DELETE":
  171. id, ok := getID(r.URL.Path, w)
  172. if !ok {
  173. return
  174. }
  175. err := h.server.RemoveMember(ctx, uint64(id))
  176. switch {
  177. case err == etcdserver.ErrIDRemoved:
  178. writeError(w, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id)))
  179. case err == etcdserver.ErrIDNotFound:
  180. writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id)))
  181. case err != nil:
  182. log.Printf("etcdhttp: error removing node %s: %v", id, err)
  183. writeError(w, err)
  184. default:
  185. w.WriteHeader(http.StatusNoContent)
  186. }
  187. case "PUT":
  188. id, ok := getID(r.URL.Path, w)
  189. if !ok {
  190. return
  191. }
  192. req := httptypes.MemberUpdateRequest{}
  193. if ok := unmarshalRequest(r, &req, w); !ok {
  194. return
  195. }
  196. m := etcdserver.Member{
  197. ID: id,
  198. RaftAttributes: etcdserver.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()},
  199. }
  200. err := h.server.UpdateMember(ctx, m)
  201. switch {
  202. case err == etcdserver.ErrPeerURLexists:
  203. writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error()))
  204. case err == etcdserver.ErrIDNotFound:
  205. writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id)))
  206. case err != nil:
  207. log.Printf("etcdhttp: error updating node %s: %v", m.ID, err)
  208. writeError(w, err)
  209. default:
  210. w.WriteHeader(http.StatusNoContent)
  211. }
  212. }
  213. }
  214. type statsHandler struct {
  215. stats stats.Stats
  216. }
  217. func (h *statsHandler) serveStore(w http.ResponseWriter, r *http.Request) {
  218. if !allowMethod(w, r.Method, "GET") {
  219. return
  220. }
  221. w.Header().Set("Content-Type", "application/json")
  222. w.Write(h.stats.StoreStats())
  223. }
  224. func (h *statsHandler) serveSelf(w http.ResponseWriter, r *http.Request) {
  225. if !allowMethod(w, r.Method, "GET") {
  226. return
  227. }
  228. w.Header().Set("Content-Type", "application/json")
  229. w.Write(h.stats.SelfStats())
  230. }
  231. func (h *statsHandler) serveLeader(w http.ResponseWriter, r *http.Request) {
  232. if !allowMethod(w, r.Method, "GET") {
  233. return
  234. }
  235. stats := h.stats.LeaderStats()
  236. if stats == nil {
  237. writeError(w, httptypes.NewHTTPError(http.StatusForbidden, "not current leader"))
  238. return
  239. }
  240. w.Header().Set("Content-Type", "application/json")
  241. w.Write(stats)
  242. }
  243. func serveVersion(w http.ResponseWriter, r *http.Request) {
  244. if !allowMethod(w, r.Method, "GET") {
  245. return
  246. }
  247. w.Write([]byte("etcd " + version.Version))
  248. }
  249. // parseKeyRequest converts a received http.Request on keysPrefix to
  250. // a server Request, performing validation of supplied fields as appropriate.
  251. // If any validation fails, an empty Request and non-nil error is returned.
  252. func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Request, error) {
  253. emptyReq := etcdserverpb.Request{}
  254. err := r.ParseForm()
  255. if err != nil {
  256. return emptyReq, etcdErr.NewRequestError(
  257. etcdErr.EcodeInvalidForm,
  258. err.Error(),
  259. )
  260. }
  261. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  262. return emptyReq, etcdErr.NewRequestError(
  263. etcdErr.EcodeInvalidForm,
  264. "incorrect key prefix",
  265. )
  266. }
  267. p := path.Join(etcdserver.StoreKeysPrefix, r.URL.Path[len(keysPrefix):])
  268. var pIdx, wIdx uint64
  269. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  270. return emptyReq, etcdErr.NewRequestError(
  271. etcdErr.EcodeIndexNaN,
  272. `invalid value for "prevIndex"`,
  273. )
  274. }
  275. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  276. return emptyReq, etcdErr.NewRequestError(
  277. etcdErr.EcodeIndexNaN,
  278. `invalid value for "waitIndex"`,
  279. )
  280. }
  281. var rec, sort, wait, dir, quorum, stream bool
  282. if rec, err = getBool(r.Form, "recursive"); err != nil {
  283. return emptyReq, etcdErr.NewRequestError(
  284. etcdErr.EcodeInvalidField,
  285. `invalid value for "recursive"`,
  286. )
  287. }
  288. if sort, err = getBool(r.Form, "sorted"); err != nil {
  289. return emptyReq, etcdErr.NewRequestError(
  290. etcdErr.EcodeInvalidField,
  291. `invalid value for "sorted"`,
  292. )
  293. }
  294. if wait, err = getBool(r.Form, "wait"); err != nil {
  295. return emptyReq, etcdErr.NewRequestError(
  296. etcdErr.EcodeInvalidField,
  297. `invalid value for "wait"`,
  298. )
  299. }
  300. // TODO(jonboulle): define what parameters dir is/isn't compatible with?
  301. if dir, err = getBool(r.Form, "dir"); err != nil {
  302. return emptyReq, etcdErr.NewRequestError(
  303. etcdErr.EcodeInvalidField,
  304. `invalid value for "dir"`,
  305. )
  306. }
  307. if quorum, err = getBool(r.Form, "quorum"); err != nil {
  308. return emptyReq, etcdErr.NewRequestError(
  309. etcdErr.EcodeInvalidField,
  310. `invalid value for "quorum"`,
  311. )
  312. }
  313. if stream, err = getBool(r.Form, "stream"); err != nil {
  314. return emptyReq, etcdErr.NewRequestError(
  315. etcdErr.EcodeInvalidField,
  316. `invalid value for "stream"`,
  317. )
  318. }
  319. if wait && r.Method != "GET" {
  320. return emptyReq, etcdErr.NewRequestError(
  321. etcdErr.EcodeInvalidField,
  322. `"wait" can only be used with GET requests`,
  323. )
  324. }
  325. pV := r.FormValue("prevValue")
  326. if _, ok := r.Form["prevValue"]; ok && pV == "" {
  327. return emptyReq, etcdErr.NewRequestError(
  328. etcdErr.EcodePrevValueRequired,
  329. `"prevValue" cannot be empty`,
  330. )
  331. }
  332. // TTL is nullable, so leave it null if not specified
  333. // or an empty string
  334. var ttl *uint64
  335. if len(r.FormValue("ttl")) > 0 {
  336. i, err := getUint64(r.Form, "ttl")
  337. if err != nil {
  338. return emptyReq, etcdErr.NewRequestError(
  339. etcdErr.EcodeTTLNaN,
  340. `invalid value for "ttl"`,
  341. )
  342. }
  343. ttl = &i
  344. }
  345. // prevExist is nullable, so leave it null if not specified
  346. var pe *bool
  347. if _, ok := r.Form["prevExist"]; ok {
  348. bv, err := getBool(r.Form, "prevExist")
  349. if err != nil {
  350. return emptyReq, etcdErr.NewRequestError(
  351. etcdErr.EcodeInvalidField,
  352. "invalid value for prevExist",
  353. )
  354. }
  355. pe = &bv
  356. }
  357. rr := etcdserverpb.Request{
  358. Method: r.Method,
  359. Path: p,
  360. Val: r.FormValue("value"),
  361. Dir: dir,
  362. PrevValue: pV,
  363. PrevIndex: pIdx,
  364. PrevExist: pe,
  365. Wait: wait,
  366. Since: wIdx,
  367. Recursive: rec,
  368. Sorted: sort,
  369. Quorum: quorum,
  370. Stream: stream,
  371. }
  372. if pe != nil {
  373. rr.PrevExist = pe
  374. }
  375. // Null TTL is equivalent to unset Expiration
  376. if ttl != nil {
  377. expr := time.Duration(*ttl) * time.Second
  378. rr.Expiration = clock.Now().Add(expr).UnixNano()
  379. }
  380. return rr, nil
  381. }
  382. // writeKeyEvent trims the prefix of key path in a single Event under
  383. // StoreKeysPrefix, serializes it and writes the resulting JSON to the given
  384. // ResponseWriter, along with the appropriate headers.
  385. func writeKeyEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
  386. if ev == nil {
  387. return errors.New("cannot write empty Event!")
  388. }
  389. w.Header().Set("Content-Type", "application/json")
  390. w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
  391. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  392. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  393. if ev.IsCreated() {
  394. w.WriteHeader(http.StatusCreated)
  395. }
  396. ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
  397. return json.NewEncoder(w).Encode(ev)
  398. }
  399. func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
  400. defer wa.Remove()
  401. ech := wa.EventChan()
  402. var nch <-chan bool
  403. if x, ok := w.(http.CloseNotifier); ok {
  404. nch = x.CloseNotify()
  405. }
  406. w.Header().Set("Content-Type", "application/json")
  407. w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
  408. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  409. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  410. w.WriteHeader(http.StatusOK)
  411. // Ensure headers are flushed early, in case of long polling
  412. w.(http.Flusher).Flush()
  413. for {
  414. select {
  415. case <-nch:
  416. // Client closed connection. Nothing to do.
  417. return
  418. case <-ctx.Done():
  419. // Timed out. net/http will close the connection for us, so nothing to do.
  420. return
  421. case ev, ok := <-ech:
  422. if !ok {
  423. // If the channel is closed this may be an indication of
  424. // that notifications are much more than we are able to
  425. // send to the client in time. Then we simply end streaming.
  426. return
  427. }
  428. ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
  429. if err := json.NewEncoder(w).Encode(ev); err != nil {
  430. // Should never be reached
  431. log.Printf("error writing event: %v\n", err)
  432. return
  433. }
  434. if !stream {
  435. return
  436. }
  437. w.(http.Flusher).Flush()
  438. }
  439. }
  440. }
  441. func trimEventPrefix(ev *store.Event, prefix string) *store.Event {
  442. if ev == nil {
  443. return nil
  444. }
  445. // Since the *Event may reference one in the store history
  446. // history, we must copy it before modifying
  447. e := ev.Clone()
  448. e.Node = trimNodeExternPrefix(e.Node, prefix)
  449. e.PrevNode = trimNodeExternPrefix(e.PrevNode, prefix)
  450. return e
  451. }
  452. func trimNodeExternPrefix(n *store.NodeExtern, prefix string) *store.NodeExtern {
  453. if n == nil {
  454. return nil
  455. }
  456. n.Key = strings.TrimPrefix(n.Key, prefix)
  457. for _, nn := range n.Nodes {
  458. nn = trimNodeExternPrefix(nn, prefix)
  459. }
  460. return n
  461. }
  462. func trimErrorPrefix(err error, prefix string) error {
  463. if e, ok := err.(*etcdErr.Error); ok {
  464. e.Cause = strings.TrimPrefix(e.Cause, prefix)
  465. }
  466. return err
  467. }
  468. func unmarshalRequest(r *http.Request, req json.Unmarshaler, w http.ResponseWriter) bool {
  469. ctype := r.Header.Get("Content-Type")
  470. if ctype != "application/json" {
  471. writeError(w, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype)))
  472. return false
  473. }
  474. b, err := ioutil.ReadAll(r.Body)
  475. if err != nil {
  476. writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
  477. return false
  478. }
  479. if err := req.UnmarshalJSON(b); err != nil {
  480. writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
  481. return false
  482. }
  483. return true
  484. }
  485. func getID(p string, w http.ResponseWriter) (types.ID, bool) {
  486. idStr := trimPrefix(p, membersPrefix)
  487. if idStr == "" {
  488. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  489. return 0, false
  490. }
  491. id, err := types.IDFromString(idStr)
  492. if err != nil {
  493. writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr)))
  494. return 0, false
  495. }
  496. return id, true
  497. }
  498. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  499. // not exist in the form, 0 is returned. If the key exists but the value is
  500. // badly formed, an error is returned. If multiple values are present only the
  501. // first is considered.
  502. func getUint64(form url.Values, key string) (i uint64, err error) {
  503. if vals, ok := form[key]; ok {
  504. i, err = strconv.ParseUint(vals[0], 10, 64)
  505. }
  506. return
  507. }
  508. // getBool extracts a bool by the given key from a Form. If the key does not
  509. // exist in the form, false is returned. If the key exists but the value is
  510. // badly formed, an error is returned. If multiple values are present only the
  511. // first is considered.
  512. func getBool(form url.Values, key string) (b bool, err error) {
  513. if vals, ok := form[key]; ok {
  514. b, err = strconv.ParseBool(vals[0])
  515. }
  516. return
  517. }
  518. // trimPrefix removes a given prefix and any slash following the prefix
  519. // e.g.: trimPrefix("foo", "foo") == trimPrefix("foo/", "foo") == ""
  520. func trimPrefix(p, prefix string) (s string) {
  521. s = strings.TrimPrefix(p, prefix)
  522. s = strings.TrimPrefix(s, "/")
  523. return
  524. }
  525. func newMemberCollection(ms []*etcdserver.Member) *httptypes.MemberCollection {
  526. c := httptypes.MemberCollection(make([]httptypes.Member, len(ms)))
  527. for i, m := range ms {
  528. c[i] = newMember(m)
  529. }
  530. return &c
  531. }
  532. func newMember(m *etcdserver.Member) httptypes.Member {
  533. tm := httptypes.Member{
  534. ID: m.ID.String(),
  535. Name: m.Name,
  536. PeerURLs: make([]string, len(m.PeerURLs)),
  537. ClientURLs: make([]string, len(m.ClientURLs)),
  538. }
  539. copy(tm.PeerURLs, m.PeerURLs)
  540. copy(tm.ClientURLs, m.ClientURLs)
  541. return tm
  542. }