client.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdhttp
  14. import (
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "io/ioutil"
  19. "log"
  20. "net/http"
  21. "net/url"
  22. "path"
  23. "strconv"
  24. "strings"
  25. "time"
  26. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  27. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  28. etcdErr "github.com/coreos/etcd/error"
  29. "github.com/coreos/etcd/etcdserver"
  30. "github.com/coreos/etcd/etcdserver/etcdhttp/httptypes"
  31. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  32. "github.com/coreos/etcd/pkg/types"
  33. "github.com/coreos/etcd/store"
  34. "github.com/coreos/etcd/version"
  35. )
  36. const (
  37. keysPrefix = "/v2/keys"
  38. deprecatedMachinesPrefix = "/v2/machines"
  39. membersPrefix = "/v2/members"
  40. statsPrefix = "/v2/stats"
  41. versionPrefix = "/version"
  42. )
  43. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  44. func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
  45. kh := &keysHandler{
  46. server: server,
  47. clusterInfo: server.Cluster,
  48. timer: server,
  49. timeout: defaultServerTimeout,
  50. }
  51. sh := &statsHandler{
  52. stats: server,
  53. }
  54. mh := &membersHandler{
  55. server: server,
  56. clusterInfo: server.Cluster,
  57. clock: clockwork.NewRealClock(),
  58. }
  59. dmh := &deprecatedMachinesHandler{
  60. clusterInfo: server.Cluster,
  61. }
  62. mux := http.NewServeMux()
  63. mux.HandleFunc("/", http.NotFound)
  64. mux.HandleFunc(versionPrefix, serveVersion)
  65. mux.Handle(keysPrefix, kh)
  66. mux.Handle(keysPrefix+"/", kh)
  67. mux.HandleFunc(statsPrefix+"/store", sh.serveStore)
  68. mux.HandleFunc(statsPrefix+"/self", sh.serveSelf)
  69. mux.HandleFunc(statsPrefix+"/leader", sh.serveLeader)
  70. mux.Handle(membersPrefix, mh)
  71. mux.Handle(membersPrefix+"/", mh)
  72. mux.Handle(deprecatedMachinesPrefix, dmh)
  73. return mux
  74. }
  75. type keysHandler struct {
  76. server etcdserver.Server
  77. clusterInfo etcdserver.ClusterInfo
  78. timer etcdserver.RaftTimer
  79. timeout time.Duration
  80. }
  81. func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  82. if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
  83. return
  84. }
  85. w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
  86. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  87. defer cancel()
  88. rr, err := parseKeyRequest(r, etcdserver.GenID(), clockwork.NewRealClock())
  89. if err != nil {
  90. writeError(w, err)
  91. return
  92. }
  93. resp, err := h.server.Do(ctx, rr)
  94. if err != nil {
  95. err = trimErrorPrefix(err, etcdserver.StoreKeysPrefix)
  96. writeError(w, err)
  97. return
  98. }
  99. switch {
  100. case resp.Event != nil:
  101. if err := writeKeyEvent(w, resp.Event, h.timer); err != nil {
  102. // Should never be reached
  103. log.Printf("error writing event: %v", err)
  104. }
  105. case resp.Watcher != nil:
  106. ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
  107. defer cancel()
  108. handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
  109. default:
  110. writeError(w, errors.New("received response with no Event/Watcher!"))
  111. }
  112. }
  113. type deprecatedMachinesHandler struct {
  114. clusterInfo etcdserver.ClusterInfo
  115. }
  116. func (h *deprecatedMachinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  117. if !allowMethod(w, r.Method, "GET", "HEAD") {
  118. return
  119. }
  120. endpoints := h.clusterInfo.ClientURLs()
  121. w.Write([]byte(strings.Join(endpoints, ", ")))
  122. }
  123. type membersHandler struct {
  124. server etcdserver.Server
  125. clusterInfo etcdserver.ClusterInfo
  126. clock clockwork.Clock
  127. }
  128. func (h *membersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  129. if !allowMethod(w, r.Method, "GET", "POST", "DELETE") {
  130. return
  131. }
  132. w.Header().Set("X-Etcd-Cluster-ID", h.clusterInfo.ID().String())
  133. ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout)
  134. defer cancel()
  135. switch r.Method {
  136. case "GET":
  137. if trimPrefix(r.URL.Path, membersPrefix) != "" {
  138. writeError(w, httptypes.NewHTTPError(http.StatusNotFound, "Not found"))
  139. return
  140. }
  141. mc := newMemberCollection(h.clusterInfo.Members())
  142. w.Header().Set("Content-Type", "application/json")
  143. if err := json.NewEncoder(w).Encode(mc); err != nil {
  144. log.Printf("etcdhttp: %v", err)
  145. }
  146. case "POST":
  147. ctype := r.Header.Get("Content-Type")
  148. if ctype != "application/json" {
  149. writeError(w, httptypes.NewHTTPError(http.StatusUnsupportedMediaType, fmt.Sprintf("Bad Content-Type %s, accept application/json", ctype)))
  150. return
  151. }
  152. b, err := ioutil.ReadAll(r.Body)
  153. if err != nil {
  154. writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
  155. return
  156. }
  157. req := httptypes.MemberCreateRequest{}
  158. if err := json.Unmarshal(b, &req); err != nil {
  159. writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
  160. return
  161. }
  162. now := h.clock.Now()
  163. m := etcdserver.NewMember("", req.PeerURLs, "", &now)
  164. if err := h.server.AddMember(ctx, *m); err != nil {
  165. log.Printf("etcdhttp: error adding node %s: %v", m.ID, err)
  166. writeError(w, err)
  167. return
  168. }
  169. res := newMember(m)
  170. w.Header().Set("Content-Type", "application/json")
  171. w.WriteHeader(http.StatusCreated)
  172. if err := json.NewEncoder(w).Encode(res); err != nil {
  173. log.Printf("etcdhttp: %v", err)
  174. }
  175. case "DELETE":
  176. idStr := trimPrefix(r.URL.Path, membersPrefix)
  177. if idStr == "" {
  178. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  179. return
  180. }
  181. id, err := types.IDFromString(idStr)
  182. if err != nil {
  183. writeError(w, httptypes.NewHTTPError(http.StatusBadRequest, err.Error()))
  184. return
  185. }
  186. err = h.server.RemoveMember(ctx, uint64(id))
  187. switch {
  188. case err == etcdserver.ErrIDNotFound:
  189. writeError(w, httptypes.NewHTTPError(http.StatusNotFound, fmt.Sprintf("No such member: %s", idStr)))
  190. case err != nil:
  191. log.Printf("etcdhttp: error removing node %s: %v", id, err)
  192. writeError(w, err)
  193. default:
  194. w.WriteHeader(http.StatusNoContent)
  195. }
  196. }
  197. }
  198. type statsHandler struct {
  199. stats etcdserver.Stats
  200. }
  201. func (h *statsHandler) serveStore(w http.ResponseWriter, r *http.Request) {
  202. if !allowMethod(w, r.Method, "GET") {
  203. return
  204. }
  205. w.Header().Set("Content-Type", "application/json")
  206. w.Write(h.stats.StoreStats())
  207. }
  208. func (h *statsHandler) serveSelf(w http.ResponseWriter, r *http.Request) {
  209. if !allowMethod(w, r.Method, "GET") {
  210. return
  211. }
  212. w.Header().Set("Content-Type", "application/json")
  213. w.Write(h.stats.SelfStats())
  214. }
  215. func (h *statsHandler) serveLeader(w http.ResponseWriter, r *http.Request) {
  216. if !allowMethod(w, r.Method, "GET") {
  217. return
  218. }
  219. w.Header().Set("Content-Type", "application/json")
  220. w.Write(h.stats.LeaderStats())
  221. }
  222. func serveVersion(w http.ResponseWriter, r *http.Request) {
  223. if !allowMethod(w, r.Method, "GET") {
  224. return
  225. }
  226. w.Write([]byte("etcd " + version.Version))
  227. }
  228. // parseKeyRequest converts a received http.Request on keysPrefix to
  229. // a server Request, performing validation of supplied fields as appropriate.
  230. // If any validation fails, an empty Request and non-nil error is returned.
  231. func parseKeyRequest(r *http.Request, id uint64, clock clockwork.Clock) (etcdserverpb.Request, error) {
  232. emptyReq := etcdserverpb.Request{}
  233. err := r.ParseForm()
  234. if err != nil {
  235. return emptyReq, etcdErr.NewRequestError(
  236. etcdErr.EcodeInvalidForm,
  237. err.Error(),
  238. )
  239. }
  240. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  241. return emptyReq, etcdErr.NewRequestError(
  242. etcdErr.EcodeInvalidForm,
  243. "incorrect key prefix",
  244. )
  245. }
  246. p := path.Join(etcdserver.StoreKeysPrefix, r.URL.Path[len(keysPrefix):])
  247. var pIdx, wIdx uint64
  248. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  249. return emptyReq, etcdErr.NewRequestError(
  250. etcdErr.EcodeIndexNaN,
  251. `invalid value for "prevIndex"`,
  252. )
  253. }
  254. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  255. return emptyReq, etcdErr.NewRequestError(
  256. etcdErr.EcodeIndexNaN,
  257. `invalid value for "waitIndex"`,
  258. )
  259. }
  260. var rec, sort, wait, dir, quorum, stream bool
  261. if rec, err = getBool(r.Form, "recursive"); err != nil {
  262. return emptyReq, etcdErr.NewRequestError(
  263. etcdErr.EcodeInvalidField,
  264. `invalid value for "recursive"`,
  265. )
  266. }
  267. if sort, err = getBool(r.Form, "sorted"); err != nil {
  268. return emptyReq, etcdErr.NewRequestError(
  269. etcdErr.EcodeInvalidField,
  270. `invalid value for "sorted"`,
  271. )
  272. }
  273. if wait, err = getBool(r.Form, "wait"); err != nil {
  274. return emptyReq, etcdErr.NewRequestError(
  275. etcdErr.EcodeInvalidField,
  276. `invalid value for "wait"`,
  277. )
  278. }
  279. // TODO(jonboulle): define what parameters dir is/isn't compatible with?
  280. if dir, err = getBool(r.Form, "dir"); err != nil {
  281. return emptyReq, etcdErr.NewRequestError(
  282. etcdErr.EcodeInvalidField,
  283. `invalid value for "dir"`,
  284. )
  285. }
  286. if quorum, err = getBool(r.Form, "quorum"); err != nil {
  287. return emptyReq, etcdErr.NewRequestError(
  288. etcdErr.EcodeInvalidField,
  289. `invalid value for "quorum"`,
  290. )
  291. }
  292. if stream, err = getBool(r.Form, "stream"); err != nil {
  293. return emptyReq, etcdErr.NewRequestError(
  294. etcdErr.EcodeInvalidField,
  295. `invalid value for "stream"`,
  296. )
  297. }
  298. if wait && r.Method != "GET" {
  299. return emptyReq, etcdErr.NewRequestError(
  300. etcdErr.EcodeInvalidField,
  301. `"wait" can only be used with GET requests`,
  302. )
  303. }
  304. pV := r.FormValue("prevValue")
  305. if _, ok := r.Form["prevValue"]; ok && pV == "" {
  306. return emptyReq, etcdErr.NewRequestError(
  307. etcdErr.EcodeInvalidField,
  308. `"prevValue" cannot be empty`,
  309. )
  310. }
  311. // TTL is nullable, so leave it null if not specified
  312. // or an empty string
  313. var ttl *uint64
  314. if len(r.FormValue("ttl")) > 0 {
  315. i, err := getUint64(r.Form, "ttl")
  316. if err != nil {
  317. return emptyReq, etcdErr.NewRequestError(
  318. etcdErr.EcodeTTLNaN,
  319. `invalid value for "ttl"`,
  320. )
  321. }
  322. ttl = &i
  323. }
  324. // prevExist is nullable, so leave it null if not specified
  325. var pe *bool
  326. if _, ok := r.Form["prevExist"]; ok {
  327. bv, err := getBool(r.Form, "prevExist")
  328. if err != nil {
  329. return emptyReq, etcdErr.NewRequestError(
  330. etcdErr.EcodeInvalidField,
  331. "invalid value for prevExist",
  332. )
  333. }
  334. pe = &bv
  335. }
  336. rr := etcdserverpb.Request{
  337. ID: id,
  338. Method: r.Method,
  339. Path: p,
  340. Val: r.FormValue("value"),
  341. Dir: dir,
  342. PrevValue: pV,
  343. PrevIndex: pIdx,
  344. PrevExist: pe,
  345. Wait: wait,
  346. Since: wIdx,
  347. Recursive: rec,
  348. Sorted: sort,
  349. Quorum: quorum,
  350. Stream: stream,
  351. }
  352. if pe != nil {
  353. rr.PrevExist = pe
  354. }
  355. // Null TTL is equivalent to unset Expiration
  356. if ttl != nil {
  357. expr := time.Duration(*ttl) * time.Second
  358. rr.Expiration = clock.Now().Add(expr).UnixNano()
  359. }
  360. return rr, nil
  361. }
  362. // writeKeyEvent trims the prefix of key path in a single Event under
  363. // StoreKeysPrefix, serializes it and writes the resulting JSON to the given
  364. // ResponseWriter, along with the appropriate headers.
  365. func writeKeyEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
  366. if ev == nil {
  367. return errors.New("cannot write empty Event!")
  368. }
  369. w.Header().Set("Content-Type", "application/json")
  370. w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
  371. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  372. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  373. if ev.IsCreated() {
  374. w.WriteHeader(http.StatusCreated)
  375. }
  376. ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
  377. return json.NewEncoder(w).Encode(ev)
  378. }
  379. func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
  380. defer wa.Remove()
  381. ech := wa.EventChan()
  382. var nch <-chan bool
  383. if x, ok := w.(http.CloseNotifier); ok {
  384. nch = x.CloseNotify()
  385. }
  386. w.Header().Set("Content-Type", "application/json")
  387. w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
  388. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  389. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  390. w.WriteHeader(http.StatusOK)
  391. // Ensure headers are flushed early, in case of long polling
  392. w.(http.Flusher).Flush()
  393. for {
  394. select {
  395. case <-nch:
  396. // Client closed connection. Nothing to do.
  397. return
  398. case <-ctx.Done():
  399. // Timed out. net/http will close the connection for us, so nothing to do.
  400. return
  401. case ev, ok := <-ech:
  402. if !ok {
  403. // If the channel is closed this may be an indication of
  404. // that notifications are much more than we are able to
  405. // send to the client in time. Then we simply end streaming.
  406. return
  407. }
  408. ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
  409. if err := json.NewEncoder(w).Encode(ev); err != nil {
  410. // Should never be reached
  411. log.Printf("error writing event: %v\n", err)
  412. return
  413. }
  414. if !stream {
  415. return
  416. }
  417. w.(http.Flusher).Flush()
  418. }
  419. }
  420. }
  421. func trimEventPrefix(ev *store.Event, prefix string) *store.Event {
  422. if ev == nil {
  423. return nil
  424. }
  425. // Since the *Event may reference one in the store history
  426. // history, we must copy it before modifying
  427. e := ev.Clone()
  428. e.Node = trimNodeExternPrefix(e.Node, prefix)
  429. e.PrevNode = trimNodeExternPrefix(e.PrevNode, prefix)
  430. return e
  431. }
  432. func trimNodeExternPrefix(n *store.NodeExtern, prefix string) *store.NodeExtern {
  433. if n == nil {
  434. return nil
  435. }
  436. n.Key = strings.TrimPrefix(n.Key, prefix)
  437. for _, nn := range n.Nodes {
  438. nn = trimNodeExternPrefix(nn, prefix)
  439. }
  440. return n
  441. }
  442. func trimErrorPrefix(err error, prefix string) error {
  443. if e, ok := err.(*etcdErr.Error); ok {
  444. e.Cause = strings.TrimPrefix(e.Cause, prefix)
  445. }
  446. return err
  447. }
  448. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  449. // not exist in the form, 0 is returned. If the key exists but the value is
  450. // badly formed, an error is returned. If multiple values are present only the
  451. // first is considered.
  452. func getUint64(form url.Values, key string) (i uint64, err error) {
  453. if vals, ok := form[key]; ok {
  454. i, err = strconv.ParseUint(vals[0], 10, 64)
  455. }
  456. return
  457. }
  458. // getBool extracts a bool by the given key from a Form. If the key does not
  459. // exist in the form, false is returned. If the key exists but the value is
  460. // badly formed, an error is returned. If multiple values are present only the
  461. // first is considered.
  462. func getBool(form url.Values, key string) (b bool, err error) {
  463. if vals, ok := form[key]; ok {
  464. b, err = strconv.ParseBool(vals[0])
  465. }
  466. return
  467. }
  468. // trimPrefix removes a given prefix and any slash following the prefix
  469. // e.g.: trimPrefix("foo", "foo") == trimPrefix("foo/", "foo") == ""
  470. func trimPrefix(p, prefix string) (s string) {
  471. s = strings.TrimPrefix(p, prefix)
  472. s = strings.TrimPrefix(s, "/")
  473. return
  474. }
  475. func newMemberCollection(ms []*etcdserver.Member) *httptypes.MemberCollection {
  476. c := httptypes.MemberCollection(make([]httptypes.Member, len(ms)))
  477. for i, m := range ms {
  478. c[i] = newMember(m)
  479. }
  480. return &c
  481. }
  482. func newMember(m *etcdserver.Member) httptypes.Member {
  483. tm := httptypes.Member{
  484. ID: m.ID.String(),
  485. Name: m.Name,
  486. PeerURLs: make([]string, len(m.PeerURLs)),
  487. ClientURLs: make([]string, len(m.ClientURLs)),
  488. }
  489. copy(tm.PeerURLs, m.PeerURLs)
  490. copy(tm.ClientURLs, m.ClientURLs)
  491. return tm
  492. }