http.go 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495
  1. /*
  2. Copyright 2014 CoreOS, Inc.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package etcdhttp
  14. import (
  15. "encoding/json"
  16. "errors"
  17. "fmt"
  18. "io/ioutil"
  19. "log"
  20. "net/http"
  21. "net/url"
  22. "strconv"
  23. "strings"
  24. "time"
  25. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  26. "github.com/coreos/etcd/Godeps/_workspace/src/github.com/jonboulle/clockwork"
  27. etcdErr "github.com/coreos/etcd/error"
  28. "github.com/coreos/etcd/etcdserver"
  29. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  30. "github.com/coreos/etcd/raft/raftpb"
  31. "github.com/coreos/etcd/store"
  32. )
  33. const (
  34. keysPrefix = "/v2/keys"
  35. deprecatedMachinesPrefix = "/v2/machines"
  36. adminMembersPrefix = "/v2/admin/members/"
  37. raftPrefix = "/raft"
  38. statsPrefix = "/v2/stats"
  39. // time to wait for response from EtcdServer requests
  40. defaultServerTimeout = 500 * time.Millisecond
  41. // time to wait for a Watch request
  42. defaultWatchTimeout = 5 * time.Minute
  43. )
  44. var errClosed = errors.New("etcdhttp: client closed connection")
  45. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  46. func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
  47. sh := &serverHandler{
  48. server: server,
  49. clusterStore: server.ClusterStore,
  50. stats: server,
  51. timer: server,
  52. timeout: defaultServerTimeout,
  53. }
  54. mux := http.NewServeMux()
  55. mux.HandleFunc(keysPrefix, sh.serveKeys)
  56. mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
  57. mux.HandleFunc(statsPrefix+"/store", sh.serveStoreStats)
  58. mux.HandleFunc(statsPrefix+"/self", sh.serveSelfStats)
  59. mux.HandleFunc(statsPrefix+"/leader", sh.serveLeaderStats)
  60. // TODO: dynamic configuration may make this outdated. take care of it.
  61. // TODO: dynamic configuration may introduce race also.
  62. // TODO: add serveMembers
  63. mux.HandleFunc(deprecatedMachinesPrefix, sh.serveMachines)
  64. mux.HandleFunc(adminMembersPrefix, sh.serveAdminMembers)
  65. mux.HandleFunc("/", http.NotFound)
  66. return mux
  67. }
  68. // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
  69. func NewPeerHandler(server *etcdserver.EtcdServer) http.Handler {
  70. sh := &serverHandler{
  71. server: server,
  72. stats: server,
  73. }
  74. mux := http.NewServeMux()
  75. mux.HandleFunc(raftPrefix, sh.serveRaft)
  76. mux.HandleFunc("/", http.NotFound)
  77. return mux
  78. }
  79. // serverHandler provides http.Handlers for etcd client and raft communication.
  80. type serverHandler struct {
  81. timeout time.Duration
  82. server etcdserver.Server
  83. stats etcdserver.Stats
  84. timer etcdserver.RaftTimer
  85. clusterStore etcdserver.ClusterStore
  86. }
  87. func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
  88. if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
  89. return
  90. }
  91. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  92. defer cancel()
  93. rr, err := parseRequest(r, etcdserver.GenID(), clockwork.NewRealClock())
  94. if err != nil {
  95. writeError(w, err)
  96. return
  97. }
  98. resp, err := h.server.Do(ctx, rr)
  99. if err != nil {
  100. writeError(w, err)
  101. return
  102. }
  103. switch {
  104. case resp.Event != nil:
  105. if err := writeEvent(w, resp.Event, h.timer); err != nil {
  106. // Should never be reached
  107. log.Printf("error writing event: %v", err)
  108. }
  109. case resp.Watcher != nil:
  110. ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
  111. defer cancel()
  112. handleWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
  113. default:
  114. writeError(w, errors.New("received response with no Event/Watcher!"))
  115. }
  116. }
  117. // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
  118. func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
  119. if !allowMethod(w, r.Method, "GET", "HEAD") {
  120. return
  121. }
  122. endpoints := h.clusterStore.Get().ClientURLs()
  123. w.Write([]byte(strings.Join(endpoints, ", ")))
  124. }
  125. func (h serverHandler) serveAdminMembers(w http.ResponseWriter, r *http.Request) {
  126. if !allowMethod(w, r.Method, "PUT", "DELETE") {
  127. return
  128. }
  129. ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout)
  130. defer cancel()
  131. idStr := strings.TrimPrefix(r.URL.Path, adminMembersPrefix)
  132. id, err := strconv.ParseUint(idStr, 16, 64)
  133. if err != nil {
  134. http.Error(w, err.Error(), http.StatusBadRequest)
  135. return
  136. }
  137. switch r.Method {
  138. case "PUT":
  139. if err := r.ParseForm(); err != nil {
  140. http.Error(w, err.Error(), http.StatusBadRequest)
  141. return
  142. }
  143. peerURLs := r.PostForm["PeerURLs"]
  144. log.Printf("etcdhttp: add node %x with peer urls %v", id, peerURLs)
  145. m := etcdserver.Member{
  146. ID: id,
  147. RaftAttributes: etcdserver.RaftAttributes{
  148. PeerURLs: peerURLs,
  149. },
  150. }
  151. if err := h.server.AddMember(ctx, m); err != nil {
  152. log.Printf("etcdhttp: error adding node %x: %v", id, err)
  153. writeError(w, err)
  154. return
  155. }
  156. w.WriteHeader(http.StatusCreated)
  157. case "DELETE":
  158. log.Printf("etcdhttp: remove node %x", id)
  159. if err := h.server.RemoveMember(ctx, id); err != nil {
  160. log.Printf("etcdhttp: error removing node %x: %v", id, err)
  161. writeError(w, err)
  162. return
  163. }
  164. w.WriteHeader(http.StatusNoContent)
  165. }
  166. }
  167. func (h serverHandler) serveStoreStats(w http.ResponseWriter, r *http.Request) {
  168. if !allowMethod(w, r.Method, "GET") {
  169. return
  170. }
  171. w.Header().Set("Content-Type", "application/json")
  172. w.Write(h.stats.StoreStats())
  173. }
  174. func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) {
  175. if !allowMethod(w, r.Method, "GET") {
  176. return
  177. }
  178. w.Header().Set("Content-Type", "application/json")
  179. w.Write(h.stats.SelfStats())
  180. }
  181. func (h serverHandler) serveLeaderStats(w http.ResponseWriter, r *http.Request) {
  182. if !allowMethod(w, r.Method, "GET") {
  183. return
  184. }
  185. w.Header().Set("Content-Type", "application/json")
  186. w.Write(h.stats.LeaderStats())
  187. }
  188. func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
  189. if !allowMethod(w, r.Method, "POST") {
  190. return
  191. }
  192. b, err := ioutil.ReadAll(r.Body)
  193. if err != nil {
  194. log.Println("etcdhttp: error reading raft message:", err)
  195. http.Error(w, "error reading raft message", http.StatusBadRequest)
  196. return
  197. }
  198. var m raftpb.Message
  199. if err := m.Unmarshal(b); err != nil {
  200. log.Println("etcdhttp: error unmarshaling raft message:", err)
  201. http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
  202. return
  203. }
  204. log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
  205. if m.Type == raftpb.MsgApp {
  206. h.stats.UpdateRecvApp(m.From, r.ContentLength)
  207. }
  208. if err := h.server.Process(context.TODO(), m); err != nil {
  209. log.Println("etcdhttp: error processing raft message:", err)
  210. writeError(w, err)
  211. return
  212. }
  213. w.WriteHeader(http.StatusNoContent)
  214. }
  215. // parseRequest converts a received http.Request to a server Request,
  216. // performing validation of supplied fields as appropriate.
  217. // If any validation fails, an empty Request and non-nil error is returned.
  218. func parseRequest(r *http.Request, id uint64, clock clockwork.Clock) (etcdserverpb.Request, error) {
  219. emptyReq := etcdserverpb.Request{}
  220. err := r.ParseForm()
  221. if err != nil {
  222. return emptyReq, etcdErr.NewRequestError(
  223. etcdErr.EcodeInvalidForm,
  224. err.Error(),
  225. )
  226. }
  227. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  228. return emptyReq, etcdErr.NewRequestError(
  229. etcdErr.EcodeInvalidForm,
  230. "incorrect key prefix",
  231. )
  232. }
  233. p := r.URL.Path[len(keysPrefix):]
  234. var pIdx, wIdx uint64
  235. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  236. return emptyReq, etcdErr.NewRequestError(
  237. etcdErr.EcodeIndexNaN,
  238. `invalid value for "prevIndex"`,
  239. )
  240. }
  241. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  242. return emptyReq, etcdErr.NewRequestError(
  243. etcdErr.EcodeIndexNaN,
  244. `invalid value for "waitIndex"`,
  245. )
  246. }
  247. var rec, sort, wait, dir, stream bool
  248. if rec, err = getBool(r.Form, "recursive"); err != nil {
  249. return emptyReq, etcdErr.NewRequestError(
  250. etcdErr.EcodeInvalidField,
  251. `invalid value for "recursive"`,
  252. )
  253. }
  254. if sort, err = getBool(r.Form, "sorted"); err != nil {
  255. return emptyReq, etcdErr.NewRequestError(
  256. etcdErr.EcodeInvalidField,
  257. `invalid value for "sorted"`,
  258. )
  259. }
  260. if wait, err = getBool(r.Form, "wait"); err != nil {
  261. return emptyReq, etcdErr.NewRequestError(
  262. etcdErr.EcodeInvalidField,
  263. `invalid value for "wait"`,
  264. )
  265. }
  266. // TODO(jonboulle): define what parameters dir is/isn't compatible with?
  267. if dir, err = getBool(r.Form, "dir"); err != nil {
  268. return emptyReq, etcdErr.NewRequestError(
  269. etcdErr.EcodeInvalidField,
  270. `invalid value for "dir"`,
  271. )
  272. }
  273. if stream, err = getBool(r.Form, "stream"); err != nil {
  274. return emptyReq, etcdErr.NewRequestError(
  275. etcdErr.EcodeInvalidField,
  276. `invalid value for "stream"`,
  277. )
  278. }
  279. if wait && r.Method != "GET" {
  280. return emptyReq, etcdErr.NewRequestError(
  281. etcdErr.EcodeInvalidField,
  282. `"wait" can only be used with GET requests`,
  283. )
  284. }
  285. pV := r.FormValue("prevValue")
  286. if _, ok := r.Form["prevValue"]; ok && pV == "" {
  287. return emptyReq, etcdErr.NewRequestError(
  288. etcdErr.EcodeInvalidField,
  289. `"prevValue" cannot be empty`,
  290. )
  291. }
  292. // TTL is nullable, so leave it null if not specified
  293. // or an empty string
  294. var ttl *uint64
  295. if len(r.FormValue("ttl")) > 0 {
  296. i, err := getUint64(r.Form, "ttl")
  297. if err != nil {
  298. return emptyReq, etcdErr.NewRequestError(
  299. etcdErr.EcodeTTLNaN,
  300. `invalid value for "ttl"`,
  301. )
  302. }
  303. ttl = &i
  304. }
  305. // prevExist is nullable, so leave it null if not specified
  306. var pe *bool
  307. if _, ok := r.Form["prevExist"]; ok {
  308. bv, err := getBool(r.Form, "prevExist")
  309. if err != nil {
  310. return emptyReq, etcdErr.NewRequestError(
  311. etcdErr.EcodeInvalidField,
  312. "invalid value for prevExist",
  313. )
  314. }
  315. pe = &bv
  316. }
  317. rr := etcdserverpb.Request{
  318. ID: id,
  319. Method: r.Method,
  320. Path: p,
  321. Val: r.FormValue("value"),
  322. Dir: dir,
  323. PrevValue: pV,
  324. PrevIndex: pIdx,
  325. PrevExist: pe,
  326. Recursive: rec,
  327. Since: wIdx,
  328. Sorted: sort,
  329. Stream: stream,
  330. Wait: wait,
  331. }
  332. if pe != nil {
  333. rr.PrevExist = pe
  334. }
  335. // Null TTL is equivalent to unset Expiration
  336. if ttl != nil {
  337. expr := time.Duration(*ttl) * time.Second
  338. rr.Expiration = clock.Now().Add(expr).UnixNano()
  339. }
  340. return rr, nil
  341. }
  342. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  343. // not exist in the form, 0 is returned. If the key exists but the value is
  344. // badly formed, an error is returned. If multiple values are present only the
  345. // first is considered.
  346. func getUint64(form url.Values, key string) (i uint64, err error) {
  347. if vals, ok := form[key]; ok {
  348. i, err = strconv.ParseUint(vals[0], 10, 64)
  349. }
  350. return
  351. }
  352. // getBool extracts a bool by the given key from a Form. If the key does not
  353. // exist in the form, false is returned. If the key exists but the value is
  354. // badly formed, an error is returned. If multiple values are present only the
  355. // first is considered.
  356. func getBool(form url.Values, key string) (b bool, err error) {
  357. if vals, ok := form[key]; ok {
  358. b, err = strconv.ParseBool(vals[0])
  359. }
  360. return
  361. }
  362. // writeError logs and writes the given Error to the ResponseWriter
  363. // If Error is an etcdErr, it is rendered to the ResponseWriter
  364. // Otherwise, it is assumed to be an InternalServerError
  365. func writeError(w http.ResponseWriter, err error) {
  366. if err == nil {
  367. return
  368. }
  369. log.Println(err)
  370. if e, ok := err.(*etcdErr.Error); ok {
  371. e.Write(w)
  372. } else {
  373. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  374. }
  375. }
  376. // writeEvent serializes a single Event and writes the resulting
  377. // JSON to the given ResponseWriter, along with the appropriate
  378. // headers
  379. func writeEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
  380. if ev == nil {
  381. return errors.New("cannot write empty Event!")
  382. }
  383. w.Header().Set("Content-Type", "application/json")
  384. w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
  385. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  386. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  387. if ev.IsCreated() {
  388. w.WriteHeader(http.StatusCreated)
  389. }
  390. return json.NewEncoder(w).Encode(ev)
  391. }
  392. func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
  393. defer wa.Remove()
  394. ech := wa.EventChan()
  395. var nch <-chan bool
  396. if x, ok := w.(http.CloseNotifier); ok {
  397. nch = x.CloseNotify()
  398. }
  399. w.Header().Set("Content-Type", "application/json")
  400. w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
  401. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  402. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  403. w.WriteHeader(http.StatusOK)
  404. // Ensure headers are flushed early, in case of long polling
  405. w.(http.Flusher).Flush()
  406. for {
  407. select {
  408. case <-nch:
  409. // Client closed connection. Nothing to do.
  410. return
  411. case <-ctx.Done():
  412. // Timed out. net/http will close the connection for us, so nothing to do.
  413. return
  414. case ev, ok := <-ech:
  415. if !ok {
  416. // If the channel is closed this may be an indication of
  417. // that notifications are much more than we are able to
  418. // send to the client in time. Then we simply end streaming.
  419. return
  420. }
  421. if err := json.NewEncoder(w).Encode(ev); err != nil {
  422. // Should never be reached
  423. log.Println("error writing event: %v", err)
  424. return
  425. }
  426. if !stream {
  427. return
  428. }
  429. w.(http.Flusher).Flush()
  430. }
  431. }
  432. }
  433. // allowMethod verifies that the given method is one of the allowed methods,
  434. // and if not, it writes an error to w. A boolean is returned indicating
  435. // whether or not the method is allowed.
  436. func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
  437. for _, meth := range ms {
  438. if m == meth {
  439. return true
  440. }
  441. }
  442. w.Header().Set("Allow", strings.Join(ms, ","))
  443. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  444. return false
  445. }