client.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdhttp
  14. import (
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "io/ioutil"
  19. "log"
  20. "net/http"
  21. "net/url"
  22. "path"
  23. "strconv"
  24. "strings"
  25. "time"
  26. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  27. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  28. etcdErr "github.com/coreos/etcd/error"
  29. "github.com/coreos/etcd/etcdserver"
  30. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  31. "github.com/coreos/etcd/pkg/types"
  32. "github.com/coreos/etcd/store"
  33. "github.com/coreos/etcd/version"
  34. )
  35. const (
  36. keysPrefix = "/v2/keys"
  37. deprecatedMachinesPrefix = "/v2/machines"
  38. adminMembersPrefix = "/v2/admin/members/"
  39. statsPrefix = "/v2/stats"
  40. versionPrefix = "/version"
  41. )
  42. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  43. func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
  44. kh := &keysHandler{
  45. server: server,
  46. clusterInfo: server.Cluster,
  47. timer: server,
  48. timeout: defaultServerTimeout,
  49. }
  50. sh := &statsHandler{
  51. stats: server,
  52. }
  53. amh := &adminMembersHandler{
  54. server: server,
  55. clusterInfo: server.Cluster,
  56. clock: clockwork.NewRealClock(),
  57. }
  58. dmh := &deprecatedMachinesHandler{
  59. clusterInfo: server.Cluster,
  60. }
  61. mux := http.NewServeMux()
  62. mux.HandleFunc("/", http.NotFound)
  63. mux.HandleFunc(versionPrefix, serveVersion)
  64. mux.Handle(keysPrefix, kh)
  65. mux.Handle(keysPrefix+"/", kh)
  66. mux.HandleFunc(statsPrefix+"/store", sh.serveStore)
  67. mux.HandleFunc(statsPrefix+"/self", sh.serveSelf)
  68. mux.HandleFunc(statsPrefix+"/leader", sh.serveLeader)
  69. mux.Handle(adminMembersPrefix, amh)
  70. mux.Handle(deprecatedMachinesPrefix, dmh)
  71. return mux
  72. }
  73. type keysHandler struct {
  74. server etcdserver.Server
  75. clusterInfo etcdserver.ClusterInfo
  76. timer etcdserver.RaftTimer
  77. timeout time.Duration
  78. }
  79. func (h *keysHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  80. if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
  81. return
  82. }
  83. cid := strconv.FormatUint(h.clusterInfo.ID(), 16)
  84. w.Header().Set("X-Etcd-Cluster-ID", cid)
  85. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  86. defer cancel()
  87. rr, err := parseKeyRequest(r, etcdserver.GenID(), clockwork.NewRealClock())
  88. if err != nil {
  89. writeError(w, err)
  90. return
  91. }
  92. resp, err := h.server.Do(ctx, rr)
  93. if err != nil {
  94. err = trimErrorPrefix(err, etcdserver.StoreKeysPrefix)
  95. writeError(w, err)
  96. return
  97. }
  98. switch {
  99. case resp.Event != nil:
  100. if err := writeKeyEvent(w, resp.Event, h.timer); err != nil {
  101. // Should never be reached
  102. log.Printf("error writing event: %v", err)
  103. }
  104. case resp.Watcher != nil:
  105. ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
  106. defer cancel()
  107. handleKeyWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
  108. default:
  109. writeError(w, errors.New("received response with no Event/Watcher!"))
  110. }
  111. }
  112. type deprecatedMachinesHandler struct {
  113. clusterInfo etcdserver.ClusterInfo
  114. }
  115. func (h *deprecatedMachinesHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  116. if !allowMethod(w, r.Method, "GET", "HEAD") {
  117. return
  118. }
  119. endpoints := h.clusterInfo.ClientURLs()
  120. w.Write([]byte(strings.Join(endpoints, ", ")))
  121. }
  122. type adminMembersHandler struct {
  123. server etcdserver.Server
  124. clusterInfo etcdserver.ClusterInfo
  125. clock clockwork.Clock
  126. }
  127. func (h *adminMembersHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  128. if !allowMethod(w, r.Method, "GET", "POST", "DELETE") {
  129. return
  130. }
  131. cid := strconv.FormatUint(h.clusterInfo.ID(), 16)
  132. w.Header().Set("X-Etcd-Cluster-ID", cid)
  133. ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout)
  134. defer cancel()
  135. switch r.Method {
  136. case "GET":
  137. if s := strings.TrimPrefix(r.URL.Path, adminMembersPrefix); s != "" {
  138. http.NotFound(w, r)
  139. return
  140. }
  141. ms := struct {
  142. Members []*etcdserver.Member `json:"members"`
  143. }{
  144. Members: h.clusterInfo.Members(),
  145. }
  146. w.Header().Set("Content-Type", "application/json")
  147. if err := json.NewEncoder(w).Encode(ms); err != nil {
  148. log.Printf("etcdhttp: %v", err)
  149. }
  150. case "POST":
  151. ctype := r.Header.Get("Content-Type")
  152. if ctype != "application/json" {
  153. http.Error(w, fmt.Sprintf("bad Content-Type %s, accept application/json", ctype), http.StatusBadRequest)
  154. return
  155. }
  156. b, err := ioutil.ReadAll(r.Body)
  157. if err != nil {
  158. http.Error(w, err.Error(), http.StatusBadRequest)
  159. return
  160. }
  161. raftAttr := etcdserver.RaftAttributes{}
  162. if err := json.Unmarshal(b, &raftAttr); err != nil {
  163. http.Error(w, err.Error(), http.StatusBadRequest)
  164. return
  165. }
  166. validURLs, err := types.NewURLs(raftAttr.PeerURLs)
  167. if err != nil {
  168. http.Error(w, "bad peer urls", http.StatusBadRequest)
  169. return
  170. }
  171. now := h.clock.Now()
  172. m := etcdserver.NewMember("", validURLs, "", &now)
  173. if err := h.server.AddMember(ctx, *m); err != nil {
  174. log.Printf("etcdhttp: error adding node %x: %v", m.ID, err)
  175. writeError(w, err)
  176. return
  177. }
  178. log.Printf("etcdhttp: added node %x with peer urls %v", m.ID, raftAttr.PeerURLs)
  179. w.Header().Set("Content-Type", "application/json")
  180. w.WriteHeader(http.StatusCreated)
  181. if err := json.NewEncoder(w).Encode(m); err != nil {
  182. log.Printf("etcdhttp: %v", err)
  183. }
  184. case "DELETE":
  185. idStr := strings.TrimPrefix(r.URL.Path, adminMembersPrefix)
  186. id, err := strconv.ParseUint(idStr, 16, 64)
  187. if err != nil {
  188. http.Error(w, err.Error(), http.StatusBadRequest)
  189. return
  190. }
  191. log.Printf("etcdhttp: remove node %x", id)
  192. if err := h.server.RemoveMember(ctx, id); err != nil {
  193. log.Printf("etcdhttp: error removing node %x: %v", id, err)
  194. writeError(w, err)
  195. return
  196. }
  197. w.WriteHeader(http.StatusNoContent)
  198. }
  199. }
  200. type statsHandler struct {
  201. stats etcdserver.Stats
  202. }
  203. func (h *statsHandler) serveStore(w http.ResponseWriter, r *http.Request) {
  204. if !allowMethod(w, r.Method, "GET") {
  205. return
  206. }
  207. w.Header().Set("Content-Type", "application/json")
  208. w.Write(h.stats.StoreStats())
  209. }
  210. func (h *statsHandler) serveSelf(w http.ResponseWriter, r *http.Request) {
  211. if !allowMethod(w, r.Method, "GET") {
  212. return
  213. }
  214. w.Header().Set("Content-Type", "application/json")
  215. w.Write(h.stats.SelfStats())
  216. }
  217. func (h *statsHandler) serveLeader(w http.ResponseWriter, r *http.Request) {
  218. if !allowMethod(w, r.Method, "GET") {
  219. return
  220. }
  221. w.Header().Set("Content-Type", "application/json")
  222. w.Write(h.stats.LeaderStats())
  223. }
  224. func serveVersion(w http.ResponseWriter, r *http.Request) {
  225. if !allowMethod(w, r.Method, "GET") {
  226. return
  227. }
  228. w.Write([]byte("etcd " + version.Version))
  229. }
  230. // parseKeyRequest converts a received http.Request on keysPrefix to
  231. // a server Request, performing validation of supplied fields as appropriate.
  232. // If any validation fails, an empty Request and non-nil error is returned.
  233. func parseKeyRequest(r *http.Request, id uint64, clock clockwork.Clock) (etcdserverpb.Request, error) {
  234. emptyReq := etcdserverpb.Request{}
  235. err := r.ParseForm()
  236. if err != nil {
  237. return emptyReq, etcdErr.NewRequestError(
  238. etcdErr.EcodeInvalidForm,
  239. err.Error(),
  240. )
  241. }
  242. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  243. return emptyReq, etcdErr.NewRequestError(
  244. etcdErr.EcodeInvalidForm,
  245. "incorrect key prefix",
  246. )
  247. }
  248. p := path.Join(etcdserver.StoreKeysPrefix, r.URL.Path[len(keysPrefix):])
  249. var pIdx, wIdx uint64
  250. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  251. return emptyReq, etcdErr.NewRequestError(
  252. etcdErr.EcodeIndexNaN,
  253. `invalid value for "prevIndex"`,
  254. )
  255. }
  256. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  257. return emptyReq, etcdErr.NewRequestError(
  258. etcdErr.EcodeIndexNaN,
  259. `invalid value for "waitIndex"`,
  260. )
  261. }
  262. var rec, sort, wait, dir, quorum, stream bool
  263. if rec, err = getBool(r.Form, "recursive"); err != nil {
  264. return emptyReq, etcdErr.NewRequestError(
  265. etcdErr.EcodeInvalidField,
  266. `invalid value for "recursive"`,
  267. )
  268. }
  269. if sort, err = getBool(r.Form, "sorted"); err != nil {
  270. return emptyReq, etcdErr.NewRequestError(
  271. etcdErr.EcodeInvalidField,
  272. `invalid value for "sorted"`,
  273. )
  274. }
  275. if wait, err = getBool(r.Form, "wait"); err != nil {
  276. return emptyReq, etcdErr.NewRequestError(
  277. etcdErr.EcodeInvalidField,
  278. `invalid value for "wait"`,
  279. )
  280. }
  281. // TODO(jonboulle): define what parameters dir is/isn't compatible with?
  282. if dir, err = getBool(r.Form, "dir"); err != nil {
  283. return emptyReq, etcdErr.NewRequestError(
  284. etcdErr.EcodeInvalidField,
  285. `invalid value for "dir"`,
  286. )
  287. }
  288. if quorum, err = getBool(r.Form, "quorum"); err != nil {
  289. return emptyReq, etcdErr.NewRequestError(
  290. etcdErr.EcodeInvalidField,
  291. `invalid value for "quorum"`,
  292. )
  293. }
  294. if stream, err = getBool(r.Form, "stream"); err != nil {
  295. return emptyReq, etcdErr.NewRequestError(
  296. etcdErr.EcodeInvalidField,
  297. `invalid value for "stream"`,
  298. )
  299. }
  300. if wait && r.Method != "GET" {
  301. return emptyReq, etcdErr.NewRequestError(
  302. etcdErr.EcodeInvalidField,
  303. `"wait" can only be used with GET requests`,
  304. )
  305. }
  306. pV := r.FormValue("prevValue")
  307. if _, ok := r.Form["prevValue"]; ok && pV == "" {
  308. return emptyReq, etcdErr.NewRequestError(
  309. etcdErr.EcodeInvalidField,
  310. `"prevValue" cannot be empty`,
  311. )
  312. }
  313. // TTL is nullable, so leave it null if not specified
  314. // or an empty string
  315. var ttl *uint64
  316. if len(r.FormValue("ttl")) > 0 {
  317. i, err := getUint64(r.Form, "ttl")
  318. if err != nil {
  319. return emptyReq, etcdErr.NewRequestError(
  320. etcdErr.EcodeTTLNaN,
  321. `invalid value for "ttl"`,
  322. )
  323. }
  324. ttl = &i
  325. }
  326. // prevExist is nullable, so leave it null if not specified
  327. var pe *bool
  328. if _, ok := r.Form["prevExist"]; ok {
  329. bv, err := getBool(r.Form, "prevExist")
  330. if err != nil {
  331. return emptyReq, etcdErr.NewRequestError(
  332. etcdErr.EcodeInvalidField,
  333. "invalid value for prevExist",
  334. )
  335. }
  336. pe = &bv
  337. }
  338. rr := etcdserverpb.Request{
  339. ID: id,
  340. Method: r.Method,
  341. Path: p,
  342. Val: r.FormValue("value"),
  343. Dir: dir,
  344. PrevValue: pV,
  345. PrevIndex: pIdx,
  346. PrevExist: pe,
  347. Wait: wait,
  348. Since: wIdx,
  349. Recursive: rec,
  350. Sorted: sort,
  351. Quorum: quorum,
  352. Stream: stream,
  353. }
  354. if pe != nil {
  355. rr.PrevExist = pe
  356. }
  357. // Null TTL is equivalent to unset Expiration
  358. if ttl != nil {
  359. expr := time.Duration(*ttl) * time.Second
  360. rr.Expiration = clock.Now().Add(expr).UnixNano()
  361. }
  362. return rr, nil
  363. }
  364. // writeKeyEvent trims the prefix of key path in a single Event under
  365. // StoreKeysPrefix, serializes it and writes the resulting JSON to the given
  366. // ResponseWriter, along with the appropriate headers.
  367. func writeKeyEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
  368. if ev == nil {
  369. return errors.New("cannot write empty Event!")
  370. }
  371. w.Header().Set("Content-Type", "application/json")
  372. w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
  373. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  374. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  375. if ev.IsCreated() {
  376. w.WriteHeader(http.StatusCreated)
  377. }
  378. ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
  379. return json.NewEncoder(w).Encode(ev)
  380. }
  381. func handleKeyWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
  382. defer wa.Remove()
  383. ech := wa.EventChan()
  384. var nch <-chan bool
  385. if x, ok := w.(http.CloseNotifier); ok {
  386. nch = x.CloseNotify()
  387. }
  388. w.Header().Set("Content-Type", "application/json")
  389. w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
  390. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  391. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  392. w.WriteHeader(http.StatusOK)
  393. // Ensure headers are flushed early, in case of long polling
  394. w.(http.Flusher).Flush()
  395. for {
  396. select {
  397. case <-nch:
  398. // Client closed connection. Nothing to do.
  399. return
  400. case <-ctx.Done():
  401. // Timed out. net/http will close the connection for us, so nothing to do.
  402. return
  403. case ev, ok := <-ech:
  404. if !ok {
  405. // If the channel is closed this may be an indication of
  406. // that notifications are much more than we are able to
  407. // send to the client in time. Then we simply end streaming.
  408. return
  409. }
  410. ev = trimEventPrefix(ev, etcdserver.StoreKeysPrefix)
  411. if err := json.NewEncoder(w).Encode(ev); err != nil {
  412. // Should never be reached
  413. log.Printf("error writing event: %v\n", err)
  414. return
  415. }
  416. if !stream {
  417. return
  418. }
  419. w.(http.Flusher).Flush()
  420. }
  421. }
  422. }
  423. func trimEventPrefix(ev *store.Event, prefix string) *store.Event {
  424. if ev == nil {
  425. return nil
  426. }
  427. ev.Node = trimNodeExternPrefix(ev.Node, prefix)
  428. ev.PrevNode = trimNodeExternPrefix(ev.PrevNode, prefix)
  429. return ev
  430. }
  431. func trimNodeExternPrefix(n *store.NodeExtern, prefix string) *store.NodeExtern {
  432. if n == nil {
  433. return nil
  434. }
  435. n.Key = strings.TrimPrefix(n.Key, prefix)
  436. for _, nn := range n.Nodes {
  437. nn = trimNodeExternPrefix(nn, prefix)
  438. }
  439. return n
  440. }
  441. func trimErrorPrefix(err error, prefix string) error {
  442. if e, ok := err.(*etcdErr.Error); ok {
  443. e.Cause = strings.TrimPrefix(e.Cause, prefix)
  444. }
  445. return err
  446. }
  447. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  448. // not exist in the form, 0 is returned. If the key exists but the value is
  449. // badly formed, an error is returned. If multiple values are present only the
  450. // first is considered.
  451. func getUint64(form url.Values, key string) (i uint64, err error) {
  452. if vals, ok := form[key]; ok {
  453. i, err = strconv.ParseUint(vals[0], 10, 64)
  454. }
  455. return
  456. }
  457. // getBool extracts a bool by the given key from a Form. If the key does not
  458. // exist in the form, false is returned. If the key exists but the value is
  459. // badly formed, an error is returned. If multiple values are present only the
  460. // first is considered.
  461. func getBool(form url.Values, key string) (b bool, err error) {
  462. if vals, ok := form[key]; ok {
  463. b, err = strconv.ParseBool(vals[0])
  464. }
  465. return
  466. }