client.go 18 KB


  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdhttp
  14. import (
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "io/ioutil"
  19. "log"
  20. "net/http"
  21. "net/url"
  22. "path"
  23. "strconv"
  24. "strings"
  25. "time"
  26. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  27. "github.com/coreos/etcd/Godeps/_workspace/src/golang.org/x/net/context"
  28. etcdErr "github.com/coreos/etcd/error"
  29. "github.com/coreos/etcd/etcdserver"
  30. "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
  31. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  32. "github.com/coreos/etcd/etcdserver/stats"
  33. "github.com/coreos/etcd/pkg/types"
  34. "github.com/coreos/etcd/raft"
  35. "github.com/coreos/etcd/store"
  36. "github.com/coreos/etcd/version"
  37. )
  38. const (
  39. keysPrefix = "/v2/keys"
  40. deprecatedMachinesPrefix = "/v2/machines"
  41. membersPrefix = "/v2/members"
  42. statsPrefix = "/v2/stats"
  43. healthPath = "/health"
  44. versionPath = "/version"
  45. )
  46. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  47. func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
  48. kh := &keysHandler{
  49. server: server,
  50. clusterInfo: server.Cluster,
  51. timer: server,
  52. timeout: defaultServerTimeout,
  53. }
  54. sh := &statsHandler{
  55. stats: server,
  56. }
  57. mh := &membersHandler{
  58. server: server,
  59. clusterInfo: server.Cluster,
  60. clock: clockwork.NewRealClock(),
  61. }
  62. dmh := &deprecatedMachinesHandler{
  63. clusterInfo: server.Cluster,
  64. }
  65. mux := http.NewServeMux()
  66. mux.HandleFunc("/", http.NotFound)
  67. mux.Handle(healthPath, healthHandler(server))
  68. mux.HandleFunc(versionPath, serveVersion)
  69. mux.Handle(keysPrefix, kh)
  70. mux.Handle(keysPrefix+"/", kh)
  71. mux.HandleFunc(statsPrefix+"/store", sh.serveStore)
  72. mux.HandleFunc(statsPrefix+"/self", sh.serveSelf)
  73. mux.HandleFunc(statsPrefix+"/leader", sh.serveLeader)
  74. mux.Handle(membersPrefix, mh)
  75. mux.Handle(membersPrefix+"/", mh)
  76. mux.Handle(deprecatedMachinesPrefix, dmh)
  77. return mux
  78. }
  79. type keysHandler struct {
  80. server etcdserver.Server
  81. clusterInfo etcdserver.ClusterInfo
  82. timer etcdserver.RaftTimer
  83. timeout time.Duration
  84. }
  85. func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  86. if !allowMethod(w, r.Method, "HEAD", "GET", "PUT", "POST", "DELETE") {
  87. return
  88. }
  89. w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
  90. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  91. defer cancel()
  92. rr, err := parseKeyRequest(r, clockwork.NewRealClock())
  93. if err != nil {
  94. writeError(w, err)
  95. return
  96. }
  97. resp, err := h.server.Do(ctx, rr)
  98. if err != nil {
  99. err = trimErrorPrefix(err, etcdserver.StoreKeysPrefix)
  100. writeError(w, err)
  101. return
  102. }
  103. switch {
  104. case resp.Event != nil:
  105. if err := writeKeyEvent(w, resp.Event, h.timer); err != nil {
  106. // Should never be reached
  107. log.Printf("error writing event: %v", err)
  108. }
  109. case resp.Watcher != nil:
  110. ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
  111. defer cancel()
  112. handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
  113. default:
  114. writeError(w, errors.New("received response with no Event/Watcher!"))
  115. }
  116. }
  117. type deprecatedMachinesHandler struct {
  118. clusterInfo etcdserver.ClusterInfo
  119. }
  120. func (h *deprecatedMachinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  121. if !allowMethod(w, r.Method, "GET", "HEAD") {
  122. return
  123. }
  124. endpoints := h.clusterInfo.ClientURLs()
  125. w.Write([]byte(strings.Join(endpoints, ", ")))
  126. }
  127. type membersHandler struct {
  128. server etcdserver.Server
  129. clusterInfo etcdserver.ClusterInfo
  130. clock clockwork.Clock
  131. }
  132. func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  133. if !allowMethod(w, r.Method, "GET", "POST", "DELETE", "PUT") {
  134. return
  135. }
  136. w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
  137. ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout)
  138. defer cancel()
  139. switch r.Method {
  140. case "GET":
  141. switch trimPrefix(r.URL.Path, membersPrefix) {
  142. case "":
  143. mc := newMemberCollection(h.clusterInfo.Members())
  144. w.Header().Set("Content-Type", "application/json")
  145. if err := json.NewEncoder(w).Encode(mc); err != nil {
  146. log.Printf("etcdhttp: %v", err)
  147. }
  148. case "leader":
  149. id := h.server.Leader()
  150. if id == 0 {
  151. writeError(w, httptypes.NewHTTPError(http.StatusServiceUnavailable, "During election"))
  152. return
  153. }
  154. m := newMember(h.clusterInfo.Member(id))
  155. w.Header().Set("Content-Type", "application/json")
  156. if err := json.NewEncoder(w).Encode(m); err != nil {
  157. log.Printf("etcdhttp: %v", err)
  158. }
  159. default:
  160. writeError(w, httptypes.NewHTTPError(http.StatusNotFound, "Not found"))
  161. }
  162. case "POST":
  163. req := httptypes.MemberCreateRequest{}
  164. if ok := unmarshalRequest(r, &req, w); !ok {
  165. return
  166. }
  167. now := h.clock.Now()
  168. m := etcdserver.NewMember("", req.PeerURLs, "", &now)
  169. err := h.server.AddMember(ctx, *m)
  170. switch {
  171. case err == etcdserver.ErrIDExists || err == etcdserver.ErrPeerURLexists:
  172. writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error()))
  173. return
  174. case err != nil:
  175. log.Printf("etcdhttp: error adding node %s: %v", m.ID, err)
  176. writeError(w, err)
  177. return
  178. }
  179. res := newMember(m)
  180. w.Header().Set("Content-Type", "application/json")
  181. w.WriteHeader(http.StatusCreated)
  182. if err := json.NewEncoder(w).Encode(res); err != nil {
  183. log.Printf("etcdhttp: %v", err)
  184. }
  185. case "DELETE":
  186. id, ok := getID(r.URL.Path, w)
  187. if !ok {
  188. return
  189. }
  190. err := h.server.RemoveMember(ctx, uint64(id))
  191. switch {
  192. case err == etcdserver.ErrIDRemoved:
  193. writeError(w, httptypes.NewHTTPError(http.StatusGone, fmt.Sprintf("Member permanently removed: %s", id)))
  194. case err == etcdserver.ErrIDNotFound:
  195. writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id)))
  196. case err != nil:
  197. log.Printf("etcdhttp: error removing node %s: %v", id, err)
  198. writeError(w, err)
  199. default:
  200. w.WriteHeader(http.StatusNoContent)
  201. }
  202. case "PUT":
  203. id, ok := getID(r.URL.Path, w)
  204. if !ok {
  205. return
  206. }
  207. req := httptypes.MemberUpdateRequest{}
  208. if ok := unmarshalRequest(r, &req, w); !ok {
  209. return
  210. }
  211. m := etcdserver.Member{
  212. ID: id,
  213. RaftAttributes: etcdserver.RaftAttributes{PeerURLs: req.PeerURLs.StringSlice()},
  214. }
  215. err := h.server.UpdateMember(ctx, m)
  216. switch {
  217. case err == etcdserver.ErrPeerURLexists:
  218. writeError(w, httptypes.NewHTTPError(http.StatusConflict, err.Error()))
  219. case err == etcdserver.ErrIDNotFound:
  220. writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", id)))
  221. case err != nil:
  222. log.Printf("etcdhttp: error updating node %s: %v", m.ID, err)
  223. writeError(w, err)
  224. default:
  225. w.WriteHeader(http.StatusNoContent)
  226. }
  227. }
  228. }
  229. type statsHandler struct {
  230. stats stats.Stats
  231. }
  232. func (h *statsHandler) serveStore(w http.ResponseWriter, r *http.Request) {
  233. if !allowMethod(w, r.Method, "GET") {
  234. return
  235. }
  236. w.Header().Set("Content-Type", "application/json")
  237. w.Write(h.stats.StoreStats())
  238. }
  239. func (h *statsHandler) serveSelf(w http.ResponseWriter, r *http.Request) {
  240. if !allowMethod(w, r.Method, "GET") {
  241. return
  242. }
  243. w.Header().Set("Content-Type", "application/json")
  244. w.Write(h.stats.SelfStats())
  245. }
  246. func (h *statsHandler) serveLeader(w http.ResponseWriter, r *http.Request) {
  247. if !allowMethod(w, r.Method, "GET") {
  248. return
  249. }
  250. stats := h.stats.LeaderStats()
  251. if stats == nil {
  252. writeError(w, httptypes.NewHTTPError(http.StatusForbidden, "not current leader"))
  253. return
  254. }
  255. w.Header().Set("Content-Type", "application/json")
  256. w.Write(stats)
  257. }
  258. // TODO: change etcdserver to raft interface when we have it.
  259. // add test for healthHeadler when we have the interface ready.
  260. func healthHandler(server *etcdserver.EtcdServer) http.HandlerFunc {
  261. return func(w http.ResponseWriter, r *http.Request) {
  262. if !allowMethod(w, r.Method, "GET") {
  263. return
  264. }
  265. if uint64(server.Leader()) == raft.None {
  266. http.Error(w, `{"health": "false"}`, http.StatusServiceUnavailable)
  267. return
  268. }
  269. // wait for raft's progress
  270. index := server.Index()
  271. for i := 0; i < 3; i++ {
  272. time.Sleep(250 * time.Millisecond)
  273. if server.Index() > index {
  274. w.WriteHeader(http.StatusOK)
  275. w.Write([]byte(`{"health": "true"}`))
  276. return
  277. }
  278. }
  279. http.Error(w, `{"health": "false"}`, http.StatusServiceUnavailable)
  280. return
  281. }
  282. }
  283. func serveVersion(w http.ResponseWriter, r *http.Request) {
  284. if !allowMethod(w, r.Method, "GET") {
  285. return
  286. }
  287. w.Write([]byte("etcd " + version.Version))
  288. }
  289. // parseKeyRequest converts a received http.Request on keysPrefix to
  290. // a server Request, performing validation of supplied fields as appropriate.
  291. // If any validation fails, an empty Request and non-nil error is returned.
  292. func parseKeyRequest(r *http.Request, clock clockwork.Clock) (etcdserverpb.Request, error) {
  293. emptyReq := etcdserverpb.Request{}
  294. err := r.ParseForm()
  295. if err != nil {
  296. return emptyReq, etcdErr.NewRequestError(
  297. etcdErr.EcodeInvalidForm,
  298. err.Error(),
  299. )
  300. }
  301. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  302. return emptyReq, etcdErr.NewRequestError(
  303. etcdErr.EcodeInvalidForm,
  304. "incorrect key prefix",
  305. )
  306. }
  307. p := path.Join(etcdserver.StoreKeysPrefix, r.URL.Path[len(keysPrefix):])
  308. var pIdx, wIdx uint64
  309. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  310. return emptyReq, etcdErr.NewRequestError(
  311. etcdErr.EcodeIndexNaN,
  312. `invalid value for "prevIndex"`,
  313. )
  314. }
  315. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  316. return emptyReq, etcdErr.NewRequestError(
  317. etcdErr.EcodeIndexNaN,
  318. `invalid value for "waitIndex"`,
  319. )
  320. }
  321. var rec, sort, wait, dir, quorum, stream bool
  322. if rec, err = getBool(r.Form, "recursive"); err != nil {
  323. return emptyReq, etcdErr.NewRequestError(
  324. etcdErr.EcodeInvalidField,
  325. `invalid value for "recursive"`,
  326. )
  327. }
  328. if sort, err = getBool(r.Form, "sorted"); err != nil {
  329. return emptyReq, etcdErr.NewRequestError(
  330. etcdErr.EcodeInvalidField,
  331. `invalid value for "sorted"`,
  332. )
  333. }
  334. if wait, err = getBool(r.Form, "wait"); err != nil {
  335. return emptyReq, etcdErr.NewRequestError(
  336. etcdErr.EcodeInvalidField,
  337. `invalid value for "wait"`,
  338. )
  339. }
  340. // TODO(jonboulle): define what parameters dir is/isn't compatible with?
  341. if dir, err = getBool(r.Form, "dir"); err != nil {
  342. return emptyReq, etcdErr.NewRequestError(
  343. etcdErr.EcodeInvalidField,
  344. `invalid value for "dir"`,
  345. )
  346. }
  347. if quorum, err = getBool(r.Form, "quorum"); err != nil {
  348. return emptyReq, etcdErr.NewRequestError(
  349. etcdErr.EcodeInvalidField,
  350. `invalid value for "quorum"`,
  351. )
  352. }
  353. if stream, err = getBool(r.Form, "stream"); err != nil {
  354. return emptyReq, etcdErr.NewRequestError(
  355. etcdErr.EcodeInvalidField,
  356. `invalid value for "stream"`,
  357. )
  358. }
  359. if wait && r.Method != "GET" {
  360. return emptyReq, etcdErr.NewRequestError(
  361. etcdErr.EcodeInvalidField,
  362. `"wait" can only be used with GET requests`,
  363. )
  364. }
  365. pV := r.FormValue("prevValue")
  366. if _, ok := r.Form["prevValue"]; ok && pV == "" {
  367. return emptyReq, etcdErr.NewRequestError(
  368. etcdErr.EcodePrevValueRequired,
  369. `"prevValue" cannot be empty`,
  370. )
  371. }
  372. // TTL is nullable, so leave it null if not specified
  373. // or an empty string
  374. var ttl *uint64
  375. if len(r.FormValue("ttl")) > 0 {
  376. i, err := getUint64(r.Form, "ttl")
  377. if err != nil {
  378. return emptyReq, etcdErr.NewRequestError(
  379. etcdErr.EcodeTTLNaN,
  380. `invalid value for "ttl"`,
  381. )
  382. }
  383. ttl = &i
  384. }
  385. // prevExist is nullable, so leave it null if not specified
  386. var pe *bool
  387. if _, ok := r.Form["prevExist"]; ok {
  388. bv, err := getBool(r.Form, "prevExist")
  389. if err != nil {
  390. return emptyReq, etcdErr.NewRequestError(
  391. etcdErr.EcodeInvalidField,
  392. "invalid value for prevExist",
  393. )
  394. }
  395. pe = &bv
  396. }
  397. rr := etcdserverpb.Request{
  398. Method: r.Method,
  399. Path: p,
  400. Val: r.FormValue("value"),
  401. Dir: dir,
  402. PrevValue: pV,
  403. PrevIndex: pIdx,
  404. PrevExist: pe,
  405. Wait: wait,
  406. Since: wIdx,
  407. Recursive: rec,
  408. Sorted: sort,
  409. Quorum: quorum,
  410. Stream: stream,
  411. }
  412. if pe != nil {
  413. rr.PrevExist = pe
  414. }
  415. // Null TTL is equivalent to unset Expiration
  416. if ttl != nil {
  417. expr := time.Duration(*ttl) * time.Second
  418. rr.Expiration = clock.Now().Add(expr).UnixNano()
  419. }
  420. return rr, nil
  421. }
  422. // writeKeyEvent trims the prefix of key path in a single Event under
  423. // StoreKeysPrefix, serializes it and writes the resulting JSON to the given
  424. // ResponseWriter, along with the appropriate headers.
  425. func writeKeyEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
  426. if ev == nil {
  427. return errors.New("cannot write empty Event!")
  428. }
  429. w.Header().Set("Content-Type", "application/json")
  430. w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
  431. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  432. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  433. if ev.IsCreated() {
  434. w.WriteHeader(http.StatusCreated)
  435. }
  436. ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
  437. return json.NewEncoder(w).Encode(ev)
  438. }
  439. func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
  440. defer wa.Remove()
  441. ech := wa.EventChan()
  442. var nch <-chan bool
  443. if x, ok := w.(http.CloseNotifier); ok {
  444. nch = x.CloseNotify()
  445. }
  446. w.Header().Set("Content-Type", "application/json")
  447. w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
  448. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  449. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  450. w.WriteHeader(http.StatusOK)
  451. // Ensure headers are flushed early, in case of long polling
  452. w.(http.Flusher).Flush()
  453. for {
  454. select {
  455. case <-nch:
  456. // Client closed connection. Nothing to do.
  457. return
  458. case <-ctx.Done():
  459. // Timed out. net/http will close the connection for us, so nothing to do.
  460. return
  461. case ev, ok := <-ech:
  462. if !ok {
  463. // If the channel is closed this may be an indication of
  464. // that notifications are much more than we are able to
  465. // send to the client in time. Then we simply end streaming.
  466. return
  467. }
  468. ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
  469. if err := json.NewEncoder(w).Encode(ev); err != nil {
  470. // Should never be reached
  471. log.Printf("error writing event: %v\n", err)
  472. return
  473. }
  474. if !stream {
  475. return
  476. }
  477. w.(http.Flusher).Flush()
  478. }
  479. }
  480. }
  481. func trimEventPrefix(ev *store.Event, prefix string) *store.Event {
  482. if ev == nil {
  483. return nil
  484. }
  485. // Since the *Event may reference one in the store history
  486. // history, we must copy it before modifying
  487. e := ev.Clone()
  488. e.Node = trimNodeExternPrefix(e.Node, prefix)
  489. e.PrevNode = trimNodeExternPrefix(e.PrevNode, prefix)
  490. return e
  491. }
  492. func trimNodeExternPrefix(n *store.NodeExtern, prefix string) *store.NodeExtern {
  493. if n == nil {
  494. return nil
  495. }
  496. n.Key = strings.TrimPrefix(n.Key, prefix)
  497. for _, nn := range n.Nodes {
  498. nn = trimNodeExternPrefix(nn, prefix)
  499. }
  500. return n
  501. }
  502. func trimErrorPrefix(err error, prefix string) error {
  503. if e, ok := err.(*etcdErr.Error); ok {
  504. e.Cause = strings.TrimPrefix(e.Cause, prefix)
  505. }
  506. return err
  507. }
  508. func unmarshalRequest(r *http.Request, req json.Unmarshaler, w http.ResponseWriter) bool {
  509. ctype := r.Header.Get("Content-Type")
  510. if ctype != "application/json" {
  511. writeError(w, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype)))
  512. return false
  513. }
  514. b, err := ioutil.ReadAll(r.Body)
  515. if err != nil {
  516. writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
  517. return false
  518. }
  519. if err := req.UnmarshalJSON(b); err != nil {
  520. writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
  521. return false
  522. }
  523. return true
  524. }
  525. func getID(p string, w http.ResponseWriter) (types.ID, bool) {
  526. idStr := trimPrefix(p, membersPrefix)
  527. if idStr == "" {
  528. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  529. return 0, false
  530. }
  531. id, err := types.IDFromString(idStr)
  532. if err != nil {
  533. writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr)))
  534. return 0, false
  535. }
  536. return id, true
  537. }
  538. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  539. // not exist in the form, 0 is returned. If the key exists but the value is
  540. // badly formed, an error is returned. If multiple values are present only the
  541. // first is considered.
  542. func getUint64(form url.Values, key string) (i uint64, err error) {
  543. if vals, ok := form[key]; ok {
  544. i, err = strconv.ParseUint(vals[0], 10, 64)
  545. }
  546. return
  547. }
  548. // getBool extracts a bool by the given key from a Form. If the key does not
  549. // exist in the form, false is returned. If the key exists but the value is
  550. // badly formed, an error is returned. If multiple values are present only the
  551. // first is considered.
  552. func getBool(form url.Values, key string) (b bool, err error) {
  553. if vals, ok := form[key]; ok {
  554. b, err = strconv.ParseBool(vals[0])
  555. }
  556. return
  557. }
  558. // trimPrefix removes a given prefix and any slash following the prefix
  559. // e.g.: trimPrefix("foo", "foo") == trimPrefix("foo/", "foo") == ""
  560. func trimPrefix(p, prefix string) (s string) {
  561. s = strings.TrimPrefix(p, prefix)
  562. s = strings.TrimPrefix(s, "/")
  563. return
  564. }
  565. func newMemberCollection(ms []*etcdserver.Member) *httptypes.MemberCollection {
  566. c := httptypes.MemberCollection(make([]httptypes.Member, len(ms)))
  567. for i, m := range ms {
  568. c[i] = newMember(m)
  569. }
  570. return &c
  571. }
  572. func newMember(m *etcdserver.Member) httptypes.Member {
  573. tm := httptypes.Member{
  574. ID: m.ID.String(),
  575. Name: m.Name,
  576. PeerURLs: make([]string, len(m.PeerURLs)),
  577. ClientURLs: make([]string, len(m.ClientURLs)),
  578. }
  579. copy(tm.PeerURLs, m.PeerURLs)
  580. copy(tm.ClientURLs, m.ClientURLs)
  581. return tm
  582. }