http.go 14 KB


  1. package etcdhttp
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. "net/http"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "time"
  13. "github.com/coreos/etcd/Godeps/_workspace/src/code.google.com/p/go.net/context"
  14. etcdErr "github.com/coreos/etcd/error"
  15. "github.com/coreos/etcd/etcdserver"
  16. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  17. "github.com/coreos/etcd/raft/raftpb"
  18. "github.com/coreos/etcd/store"
  19. )
  20. const (
  21. keysPrefix = "/v2/keys"
  22. deprecatedMachinesPrefix = "/v2/machines"
  23. adminMembersPrefix = "/v2/admin/members/"
  24. raftPrefix = "/raft"
  25. statsPrefix = "/v2/stats"
  26. // time to wait for response from EtcdServer requests
  27. defaultServerTimeout = 500 * time.Millisecond
  28. // time to wait for a Watch request
  29. defaultWatchTimeout = 5 * time.Minute
  30. )
  31. var errClosed = errors.New("etcdhttp: client closed connection")
  32. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  33. func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
  34. sh := &serverHandler{
  35. server: server,
  36. clusterStore: server.ClusterStore,
  37. stats: server,
  38. timer: server,
  39. timeout: defaultServerTimeout,
  40. }
  41. mux := http.NewServeMux()
  42. mux.HandleFunc(keysPrefix, sh.serveKeys)
  43. mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
  44. mux.HandleFunc(statsPrefix+"/store", sh.serveStoreStats)
  45. mux.HandleFunc(statsPrefix+"/self", sh.serveSelfStats)
  46. mux.HandleFunc(statsPrefix+"/leader", sh.serveLeaderStats)
  47. // TODO: dynamic configuration may make this outdated. take care of it.
  48. // TODO: dynamic configuration may introduce race also.
  49. // TODO: add serveMembers
  50. mux.HandleFunc(deprecatedMachinesPrefix, sh.serveMachines)
  51. mux.HandleFunc(adminMembersPrefix, sh.serveAdminMembers)
  52. mux.HandleFunc("/", http.NotFound)
  53. return mux
  54. }
  55. // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
  56. func NewPeerHandler(server etcdserver.Server) http.Handler {
  57. sh := &serverHandler{
  58. server: server,
  59. }
  60. mux := http.NewServeMux()
  61. mux.HandleFunc(raftPrefix, sh.serveRaft)
  62. mux.HandleFunc("/", http.NotFound)
  63. return mux
  64. }
  65. // serverHandler provides http.Handlers for etcd client and raft communication.
  66. type serverHandler struct {
  67. timeout time.Duration
  68. server etcdserver.Server
  69. stats etcdserver.ServerStats
  70. storestats etcdserver.StoreStats
  71. timer etcdserver.RaftTimer
  72. clusterStore etcdserver.ClusterStore
  73. }
  74. func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
  75. if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
  76. return
  77. }
  78. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  79. defer cancel()
  80. rr, err := parseRequest(r, etcdserver.GenID())
  81. if err != nil {
  82. writeError(w, err)
  83. return
  84. }
  85. resp, err := h.server.Do(ctx, rr)
  86. if err != nil {
  87. writeError(w, err)
  88. return
  89. }
  90. switch {
  91. case resp.Event != nil:
  92. if err := writeEvent(w, resp.Event, h.timer); err != nil {
  93. // Should never be reached
  94. log.Printf("error writing event: %v", err)
  95. }
  96. case resp.Watcher != nil:
  97. ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
  98. defer cancel()
  99. handleWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
  100. default:
  101. writeError(w, errors.New("received response with no Event/Watcher!"))
  102. }
  103. }
  104. // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
  105. func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
  106. if !allowMethod(w, r.Method, "GET", "HEAD") {
  107. return
  108. }
  109. endpoints := h.clusterStore.Get().ClientURLs()
  110. w.Write([]byte(strings.Join(endpoints, ", ")))
  111. }
  112. func (h serverHandler) serveAdminMembers(w http.ResponseWriter, r *http.Request) {
  113. if !allowMethod(w, r.Method, "PUT", "DELETE") {
  114. return
  115. }
  116. ctx, cancel := context.WithTimeout(context.Background(), defaultServerTimeout)
  117. defer cancel()
  118. idStr := strings.TrimPrefix(r.URL.Path, adminMembersPrefix)
  119. id, err := strconv.ParseUint(idStr, 16, 64)
  120. if err != nil {
  121. http.Error(w, err.Error(), http.StatusBadRequest)
  122. return
  123. }
  124. switch r.Method {
  125. case "PUT":
  126. if err := r.ParseForm(); err != nil {
  127. http.Error(w, err.Error(), http.StatusBadRequest)
  128. return
  129. }
  130. peerURLs := r.PostForm["PeerURLs"]
  131. log.Printf("etcdhttp: add node %x with peer urls %v", id, peerURLs)
  132. m := etcdserver.Member{
  133. ID: id,
  134. RaftAttributes: etcdserver.RaftAttributes{
  135. PeerURLs: peerURLs,
  136. },
  137. }
  138. if err := h.server.AddMember(ctx, m); err != nil {
  139. log.Printf("etcdhttp: error adding node %x: %v", id, err)
  140. writeError(w, err)
  141. return
  142. }
  143. w.WriteHeader(http.StatusCreated)
  144. case "DELETE":
  145. log.Printf("etcdhttp: remove node %x", id)
  146. if err := h.server.RemoveMember(ctx, id); err != nil {
  147. log.Printf("etcdhttp: error removing node %x: %v", id, err)
  148. writeError(w, err)
  149. return
  150. }
  151. w.WriteHeader(http.StatusNoContent)
  152. }
  153. }
  154. func (h serverHandler) serveStoreStats(w http.ResponseWriter, r *http.Request) {
  155. if !allowMethod(w, r.Method, "GET") {
  156. return
  157. }
  158. w.Header().Set("Content-Type", "application/json")
  159. w.Write(h.storestats.JSON())
  160. }
  161. func (h serverHandler) serveSelfStats(w http.ResponseWriter, r *http.Request) {
  162. if !allowMethod(w, r.Method, "GET") {
  163. return
  164. }
  165. s := h.stats.SelfStats()
  166. b, err := json.Marshal(s)
  167. if err != nil {
  168. log.Printf("error marshalling stats: %v\n", err)
  169. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  170. return
  171. }
  172. w.Header().Set("Content-Type", "application/json")
  173. w.Write(b)
  174. }
  175. func (h serverHandler) serveLeaderStats(w http.ResponseWriter, r *http.Request) {
  176. if !allowMethod(w, r.Method, "GET") {
  177. return
  178. }
  179. s := h.stats.LeaderStats()
  180. b, err := json.Marshal(s)
  181. if err != nil {
  182. log.Printf("error marshalling stats: %v\n", err)
  183. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  184. return
  185. }
  186. w.Header().Set("Content-Type", "application/json")
  187. w.Write(b)
  188. }
  189. func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
  190. if !allowMethod(w, r.Method, "POST") {
  191. return
  192. }
  193. b, err := ioutil.ReadAll(r.Body)
  194. if err != nil {
  195. log.Println("etcdhttp: error reading raft message:", err)
  196. http.Error(w, "error reading raft message", http.StatusBadRequest)
  197. return
  198. }
  199. var m raftpb.Message
  200. if err := m.Unmarshal(b); err != nil {
  201. log.Println("etcdhttp: error unmarshaling raft message:", err)
  202. http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
  203. return
  204. }
  205. log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
  206. if m.Type == raftpb.MsgApp {
  207. // TODO(jonboulle): standardize id uint-->string process: always base 16?
  208. h.stats.SelfStats().RecvAppendReq(strconv.FormatUint(m.From, 16), int(r.ContentLength))
  209. }
  210. if err := h.server.Process(context.TODO(), m); err != nil {
  211. log.Println("etcdhttp: error processing raft message:", err)
  212. writeError(w, err)
  213. return
  214. }
  215. w.WriteHeader(http.StatusNoContent)
  216. }
  217. // parseRequest converts a received http.Request to a server Request,
  218. // performing validation of supplied fields as appropriate.
  219. // If any validation fails, an empty Request and non-nil error is returned.
  220. func parseRequest(r *http.Request, id uint64) (etcdserverpb.Request, error) {
  221. emptyReq := etcdserverpb.Request{}
  222. err := r.ParseForm()
  223. if err != nil {
  224. return emptyReq, etcdErr.NewRequestError(
  225. etcdErr.EcodeInvalidForm,
  226. err.Error(),
  227. )
  228. }
  229. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  230. return emptyReq, etcdErr.NewRequestError(
  231. etcdErr.EcodeInvalidForm,
  232. "incorrect key prefix",
  233. )
  234. }
  235. p := r.URL.Path[len(keysPrefix):]
  236. var pIdx, wIdx uint64
  237. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  238. return emptyReq, etcdErr.NewRequestError(
  239. etcdErr.EcodeIndexNaN,
  240. `invalid value for "prevIndex"`,
  241. )
  242. }
  243. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  244. return emptyReq, etcdErr.NewRequestError(
  245. etcdErr.EcodeIndexNaN,
  246. `invalid value for "waitIndex"`,
  247. )
  248. }
  249. var rec, sort, wait, dir, stream bool
  250. if rec, err = getBool(r.Form, "recursive"); err != nil {
  251. return emptyReq, etcdErr.NewRequestError(
  252. etcdErr.EcodeInvalidField,
  253. `invalid value for "recursive"`,
  254. )
  255. }
  256. if sort, err = getBool(r.Form, "sorted"); err != nil {
  257. return emptyReq, etcdErr.NewRequestError(
  258. etcdErr.EcodeInvalidField,
  259. `invalid value for "sorted"`,
  260. )
  261. }
  262. if wait, err = getBool(r.Form, "wait"); err != nil {
  263. return emptyReq, etcdErr.NewRequestError(
  264. etcdErr.EcodeInvalidField,
  265. `invalid value for "wait"`,
  266. )
  267. }
  268. // TODO(jonboulle): define what parameters dir is/isn't compatible with?
  269. if dir, err = getBool(r.Form, "dir"); err != nil {
  270. return emptyReq, etcdErr.NewRequestError(
  271. etcdErr.EcodeInvalidField,
  272. `invalid value for "dir"`,
  273. )
  274. }
  275. if stream, err = getBool(r.Form, "stream"); err != nil {
  276. return emptyReq, etcdErr.NewRequestError(
  277. etcdErr.EcodeInvalidField,
  278. `invalid value for "stream"`,
  279. )
  280. }
  281. if wait && r.Method != "GET" {
  282. return emptyReq, etcdErr.NewRequestError(
  283. etcdErr.EcodeInvalidField,
  284. `"wait" can only be used with GET requests`,
  285. )
  286. }
  287. pV := r.FormValue("prevValue")
  288. if _, ok := r.Form["prevValue"]; ok && pV == "" {
  289. return emptyReq, etcdErr.NewRequestError(
  290. etcdErr.EcodeInvalidField,
  291. `"prevValue" cannot be empty`,
  292. )
  293. }
  294. // TTL is nullable, so leave it null if not specified
  295. // or an empty string
  296. var ttl *uint64
  297. if len(r.FormValue("ttl")) > 0 {
  298. i, err := getUint64(r.Form, "ttl")
  299. if err != nil {
  300. return emptyReq, etcdErr.NewRequestError(
  301. etcdErr.EcodeTTLNaN,
  302. `invalid value for "ttl"`,
  303. )
  304. }
  305. ttl = &i
  306. }
  307. // prevExist is nullable, so leave it null if not specified
  308. var pe *bool
  309. if _, ok := r.Form["prevExist"]; ok {
  310. bv, err := getBool(r.Form, "prevExist")
  311. if err != nil {
  312. return emptyReq, etcdErr.NewRequestError(
  313. etcdErr.EcodeInvalidField,
  314. "invalid value for prevExist",
  315. )
  316. }
  317. pe = &bv
  318. }
  319. rr := etcdserverpb.Request{
  320. ID: id,
  321. Method: r.Method,
  322. Path: p,
  323. Val: r.FormValue("value"),
  324. Dir: dir,
  325. PrevValue: pV,
  326. PrevIndex: pIdx,
  327. PrevExist: pe,
  328. Recursive: rec,
  329. Since: wIdx,
  330. Sorted: sort,
  331. Stream: stream,
  332. Wait: wait,
  333. }
  334. if pe != nil {
  335. rr.PrevExist = pe
  336. }
  337. // Null TTL is equivalent to unset Expiration
  338. // TODO(jonboulle): use fake clock instead of time module
  339. // https://github.com/coreos/etcd/issues/1021
  340. if ttl != nil {
  341. expr := time.Duration(*ttl) * time.Second
  342. rr.Expiration = time.Now().Add(expr).UnixNano()
  343. }
  344. return rr, nil
  345. }
  346. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  347. // not exist in the form, 0 is returned. If the key exists but the value is
  348. // badly formed, an error is returned. If multiple values are present only the
  349. // first is considered.
  350. func getUint64(form url.Values, key string) (i uint64, err error) {
  351. if vals, ok := form[key]; ok {
  352. i, err = strconv.ParseUint(vals[0], 10, 64)
  353. }
  354. return
  355. }
  356. // getBool extracts a bool by the given key from a Form. If the key does not
  357. // exist in the form, false is returned. If the key exists but the value is
  358. // badly formed, an error is returned. If multiple values are present only the
  359. // first is considered.
  360. func getBool(form url.Values, key string) (b bool, err error) {
  361. if vals, ok := form[key]; ok {
  362. b, err = strconv.ParseBool(vals[0])
  363. }
  364. return
  365. }
  366. // writeError logs and writes the given Error to the ResponseWriter
  367. // If Error is an etcdErr, it is rendered to the ResponseWriter
  368. // Otherwise, it is assumed to be an InternalServerError
  369. func writeError(w http.ResponseWriter, err error) {
  370. if err == nil {
  371. return
  372. }
  373. log.Println(err)
  374. if e, ok := err.(*etcdErr.Error); ok {
  375. e.Write(w)
  376. } else {
  377. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  378. }
  379. }
  380. // writeEvent serializes a single Event and writes the resulting
  381. // JSON to the given ResponseWriter, along with the appropriate
  382. // headers
  383. func writeEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
  384. if ev == nil {
  385. return errors.New("cannot write empty Event!")
  386. }
  387. w.Header().Set("Content-Type", "application/json")
  388. w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
  389. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  390. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  391. if ev.IsCreated() {
  392. w.WriteHeader(http.StatusCreated)
  393. }
  394. return json.NewEncoder(w).Encode(ev)
  395. }
  396. func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
  397. defer wa.Remove()
  398. ech := wa.EventChan()
  399. var nch <-chan bool
  400. if x, ok := w.(http.CloseNotifier); ok {
  401. nch = x.CloseNotify()
  402. }
  403. w.Header().Set("Content-Type", "application/json")
  404. w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
  405. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  406. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  407. w.WriteHeader(http.StatusOK)
  408. // Ensure headers are flushed early, in case of long polling
  409. w.(http.Flusher).Flush()
  410. for {
  411. select {
  412. case <-nch:
  413. // Client closed connection. Nothing to do.
  414. return
  415. case <-ctx.Done():
  416. // Timed out. net/http will close the connection for us, so nothing to do.
  417. return
  418. case ev, ok := <-ech:
  419. if !ok {
  420. // If the channel is closed this may be an indication of
  421. // that notifications are much more than we are able to
  422. // send to the client in time. Then we simply end streaming.
  423. return
  424. }
  425. if err := json.NewEncoder(w).Encode(ev); err != nil {
  426. // Should never be reached
  427. log.Println("error writing event: %v", err)
  428. return
  429. }
  430. if !stream {
  431. return
  432. }
  433. w.(http.Flusher).Flush()
  434. }
  435. }
  436. }
  437. // allowMethod verifies that the given method is one of the allowed methods,
  438. // and if not, it writes an error to w. A boolean is returned indicating
  439. // whether or not the method is allowed.
  440. func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
  441. for _, meth := range ms {
  442. if m == meth {
  443. return true
  444. }
  445. }
  446. w.Header().Set("Allow", strings.Join(ms, ","))
  447. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  448. return false
  449. }