http.go 11 KB


  1. package etcdhttp
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. "net/http"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "time"
  13. etcdErr "github.com/coreos/etcd/error"
  14. "github.com/coreos/etcd/etcdserver"
  15. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  16. "github.com/coreos/etcd/raft/raftpb"
  17. "github.com/coreos/etcd/store"
  18. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  19. )
  20. const (
  21. keysPrefix = "/v2/keys"
  22. membersPrefix = "/v2/members"
  23. deprecatedMachinesPrefix = "/v2/machines"
  24. raftPrefix = "/raft"
  25. // time to wait for response from EtcdServer requests
  26. defaultServerTimeout = 500 * time.Millisecond
  27. // time to wait for a Watch request
  28. defaultWatchTimeout = 5 * time.Minute
  29. )
  30. var errClosed = errors.New("etcdhttp: client closed connection")
  31. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  32. func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
  33. sh := &serverHandler{
  34. server: server,
  35. clusterStore: server.ClusterStore,
  36. timer: server,
  37. timeout: defaultServerTimeout,
  38. }
  39. mux := http.NewServeMux()
  40. mux.HandleFunc(keysPrefix, sh.serveKeys)
  41. mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
  42. // TODO: dynamic configuration may make this outdated. take care of it.
  43. // TODO: dynamic configuration may introduce race also.
  44. // TODO: add serveMembers
  45. mux.HandleFunc(membersPrefix, sh.serveMachines)
  46. mux.HandleFunc(deprecatedMachinesPrefix, sh.serveMachines)
  47. mux.HandleFunc("/", http.NotFound)
  48. return mux
  49. }
  50. // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
  51. func NewPeerHandler(server etcdserver.Server) http.Handler {
  52. sh := &serverHandler{
  53. server: server,
  54. }
  55. mux := http.NewServeMux()
  56. mux.HandleFunc(raftPrefix, sh.serveRaft)
  57. mux.HandleFunc("/", http.NotFound)
  58. return mux
  59. }
  60. // serverHandler provides http.Handlers for etcd client and raft communication.
  61. type serverHandler struct {
  62. timeout time.Duration
  63. server etcdserver.Server
  64. timer etcdserver.RaftTimer
  65. clusterStore etcdserver.ClusterStore
  66. }
  67. func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
  68. if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
  69. return
  70. }
  71. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  72. defer cancel()
  73. rr, err := parseRequest(r, etcdserver.GenID())
  74. if err != nil {
  75. writeError(w, err)
  76. return
  77. }
  78. resp, err := h.server.Do(ctx, rr)
  79. if err != nil {
  80. writeError(w, err)
  81. return
  82. }
  83. switch {
  84. case resp.Event != nil:
  85. if err := writeEvent(w, resp.Event, h.timer); err != nil {
  86. // Should never be reached
  87. log.Printf("error writing event: %v", err)
  88. }
  89. case resp.Watcher != nil:
  90. ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
  91. defer cancel()
  92. handleWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
  93. default:
  94. writeError(w, errors.New("received response with no Event/Watcher!"))
  95. }
  96. }
  97. // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
  98. func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
  99. if !allowMethod(w, r.Method, "GET", "HEAD") {
  100. return
  101. }
  102. endpoints := h.clusterStore.Get().ClientURLs()
  103. w.Write([]byte(strings.Join(endpoints, ", ")))
  104. }
  105. func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
  106. if !allowMethod(w, r.Method, "POST") {
  107. return
  108. }
  109. b, err := ioutil.ReadAll(r.Body)
  110. if err != nil {
  111. log.Println("etcdhttp: error reading raft message:", err)
  112. http.Error(w, "error reading raft message", http.StatusBadRequest)
  113. return
  114. }
  115. var m raftpb.Message
  116. if err := m.Unmarshal(b); err != nil {
  117. log.Println("etcdhttp: error unmarshaling raft message:", err)
  118. http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
  119. return
  120. }
  121. log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
  122. if err := h.server.Process(context.TODO(), m); err != nil {
  123. log.Println("etcdhttp: error processing raft message:", err)
  124. writeError(w, err)
  125. return
  126. }
  127. w.WriteHeader(http.StatusNoContent)
  128. }
  129. // parseRequest converts a received http.Request to a server Request,
  130. // performing validation of supplied fields as appropriate.
  131. // If any validation fails, an empty Request and non-nil error is returned.
  132. func parseRequest(r *http.Request, id uint64) (etcdserverpb.Request, error) {
  133. emptyReq := etcdserverpb.Request{}
  134. err := r.ParseForm()
  135. if err != nil {
  136. return emptyReq, etcdErr.NewRequestError(
  137. etcdErr.EcodeInvalidForm,
  138. err.Error(),
  139. )
  140. }
  141. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  142. return emptyReq, etcdErr.NewRequestError(
  143. etcdErr.EcodeInvalidForm,
  144. "incorrect key prefix",
  145. )
  146. }
  147. p := r.URL.Path[len(keysPrefix):]
  148. var pIdx, wIdx uint64
  149. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  150. return emptyReq, etcdErr.NewRequestError(
  151. etcdErr.EcodeIndexNaN,
  152. `invalid value for "prevIndex"`,
  153. )
  154. }
  155. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  156. return emptyReq, etcdErr.NewRequestError(
  157. etcdErr.EcodeIndexNaN,
  158. `invalid value for "waitIndex"`,
  159. )
  160. }
  161. var rec, sort, wait, dir, stream bool
  162. if rec, err = getBool(r.Form, "recursive"); err != nil {
  163. return emptyReq, etcdErr.NewRequestError(
  164. etcdErr.EcodeInvalidField,
  165. `invalid value for "recursive"`,
  166. )
  167. }
  168. if sort, err = getBool(r.Form, "sorted"); err != nil {
  169. return emptyReq, etcdErr.NewRequestError(
  170. etcdErr.EcodeInvalidField,
  171. `invalid value for "sorted"`,
  172. )
  173. }
  174. if wait, err = getBool(r.Form, "wait"); err != nil {
  175. return emptyReq, etcdErr.NewRequestError(
  176. etcdErr.EcodeInvalidField,
  177. `invalid value for "wait"`,
  178. )
  179. }
  180. // TODO(jonboulle): define what parameters dir is/isn't compatible with?
  181. if dir, err = getBool(r.Form, "dir"); err != nil {
  182. return emptyReq, etcdErr.NewRequestError(
  183. etcdErr.EcodeInvalidField,
  184. `invalid value for "dir"`,
  185. )
  186. }
  187. if stream, err = getBool(r.Form, "stream"); err != nil {
  188. return emptyReq, etcdErr.NewRequestError(
  189. etcdErr.EcodeInvalidField,
  190. `invalid value for "stream"`,
  191. )
  192. }
  193. if wait && r.Method != "GET" {
  194. return emptyReq, etcdErr.NewRequestError(
  195. etcdErr.EcodeInvalidField,
  196. `"wait" can only be used with GET requests`,
  197. )
  198. }
  199. pV := r.FormValue("prevValue")
  200. if _, ok := r.Form["prevValue"]; ok && pV == "" {
  201. return emptyReq, etcdErr.NewRequestError(
  202. etcdErr.EcodeInvalidField,
  203. `"prevValue" cannot be empty`,
  204. )
  205. }
  206. // TTL is nullable, so leave it null if not specified
  207. // or an empty string
  208. var ttl *uint64
  209. if len(r.FormValue("ttl")) > 0 {
  210. i, err := getUint64(r.Form, "ttl")
  211. if err != nil {
  212. return emptyReq, etcdErr.NewRequestError(
  213. etcdErr.EcodeTTLNaN,
  214. `invalid value for "ttl"`,
  215. )
  216. }
  217. ttl = &i
  218. }
  219. // prevExist is nullable, so leave it null if not specified
  220. var pe *bool
  221. if _, ok := r.Form["prevExist"]; ok {
  222. bv, err := getBool(r.Form, "prevExist")
  223. if err != nil {
  224. return emptyReq, etcdErr.NewRequestError(
  225. etcdErr.EcodeInvalidField,
  226. "invalid value for prevExist",
  227. )
  228. }
  229. pe = &bv
  230. }
  231. rr := etcdserverpb.Request{
  232. ID: id,
  233. Method: r.Method,
  234. Path: p,
  235. Val: r.FormValue("value"),
  236. Dir: dir,
  237. PrevValue: pV,
  238. PrevIndex: pIdx,
  239. PrevExist: pe,
  240. Recursive: rec,
  241. Since: wIdx,
  242. Sorted: sort,
  243. Stream: stream,
  244. Wait: wait,
  245. }
  246. if pe != nil {
  247. rr.PrevExist = pe
  248. }
  249. // Null TTL is equivalent to unset Expiration
  250. // TODO(jonboulle): use fake clock instead of time module
  251. // https://github.com/coreos/etcd/issues/1021
  252. if ttl != nil {
  253. expr := time.Duration(*ttl) * time.Second
  254. rr.Expiration = time.Now().Add(expr).UnixNano()
  255. }
  256. return rr, nil
  257. }
  258. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  259. // not exist in the form, 0 is returned. If the key exists but the value is
  260. // badly formed, an error is returned. If multiple values are present only the
  261. // first is considered.
  262. func getUint64(form url.Values, key string) (i uint64, err error) {
  263. if vals, ok := form[key]; ok {
  264. i, err = strconv.ParseUint(vals[0], 10, 64)
  265. }
  266. return
  267. }
  268. // getBool extracts a bool by the given key from a Form. If the key does not
  269. // exist in the form, false is returned. If the key exists but the value is
  270. // badly formed, an error is returned. If multiple values are present only the
  271. // first is considered.
  272. func getBool(form url.Values, key string) (b bool, err error) {
  273. if vals, ok := form[key]; ok {
  274. b, err = strconv.ParseBool(vals[0])
  275. }
  276. return
  277. }
  278. // writeError logs and writes the given Error to the ResponseWriter
  279. // If Error is an etcdErr, it is rendered to the ResponseWriter
  280. // Otherwise, it is assumed to be an InternalServerError
  281. func writeError(w http.ResponseWriter, err error) {
  282. if err == nil {
  283. return
  284. }
  285. log.Println(err)
  286. if e, ok := err.(*etcdErr.Error); ok {
  287. e.Write(w)
  288. } else {
  289. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  290. }
  291. }
  292. // writeEvent serializes a single Event and writes the resulting
  293. // JSON to the given ResponseWriter, along with the appropriate
  294. // headers
  295. func writeEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
  296. if ev == nil {
  297. return errors.New("cannot write empty Event!")
  298. }
  299. w.Header().Set("Content-Type", "application/json")
  300. w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
  301. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  302. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  303. if ev.IsCreated() {
  304. w.WriteHeader(http.StatusCreated)
  305. }
  306. return json.NewEncoder(w).Encode(ev)
  307. }
  308. func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
  309. defer wa.Remove()
  310. ech := wa.EventChan()
  311. var nch <-chan bool
  312. if x, ok := w.(http.CloseNotifier); ok {
  313. nch = x.CloseNotify()
  314. }
  315. w.Header().Set("Content-Type", "application/json")
  316. w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
  317. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  318. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  319. w.WriteHeader(http.StatusOK)
  320. // Ensure headers are flushed early, in case of long polling
  321. w.(http.Flusher).Flush()
  322. for {
  323. select {
  324. case <-nch:
  325. // Client closed connection. Nothing to do.
  326. return
  327. case <-ctx.Done():
  328. // Timed out. net/http will close the connection for us, so nothing to do.
  329. return
  330. case ev, ok := <-ech:
  331. if !ok {
  332. // If the channel is closed this may be an indication of
  333. // that notifications are much more than we are able to
  334. // send to the client in time. Then we simply end streaming.
  335. return
  336. }
  337. if err := json.NewEncoder(w).Encode(ev); err != nil {
  338. // Should never be reached
  339. log.Println("error writing event: %v", err)
  340. return
  341. }
  342. if !stream {
  343. return
  344. }
  345. w.(http.Flusher).Flush()
  346. }
  347. }
  348. }
  349. // allowMethod verifies that the given method is one of the allowed methods,
  350. // and if not, it writes an error to w. A boolean is returned indicating
  351. // whether or not the method is allowed.
  352. func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
  353. for _, meth := range ms {
  354. if m == meth {
  355. return true
  356. }
  357. }
  358. w.Header().Set("Allow", strings.Join(ms, ","))
  359. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  360. return false
  361. }