http.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400
  1. package etcdhttp
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. "net/http"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "time"
  13. etcdErr "github.com/coreos/etcd/error"
  14. "github.com/coreos/etcd/etcdserver"
  15. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  16. "github.com/coreos/etcd/raft/raftpb"
  17. "github.com/coreos/etcd/store"
  18. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  19. )
  20. const (
  21. keysPrefix = "/v2/keys"
  22. machinesPrefix = "/v2/machines"
  23. raftPrefix = "/raft"
  24. // time to wait for response from EtcdServer requests
  25. defaultServerTimeout = 500 * time.Millisecond
  26. // time to wait for a Watch request
  27. defaultWatchTimeout = 5 * time.Minute
  28. )
  29. var errClosed = errors.New("etcdhttp: client closed connection")
  30. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  31. func NewClientHandler(server *etcdserver.EtcdServer) http.Handler {
  32. sh := &serverHandler{
  33. server: server,
  34. clusterStore: server.ClusterStore,
  35. timer: server,
  36. timeout: defaultServerTimeout,
  37. }
  38. mux := http.NewServeMux()
  39. mux.HandleFunc(keysPrefix, sh.serveKeys)
  40. mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
  41. // TODO: dynamic configuration may make this outdated. take care of it.
  42. // TODO: dynamic configuration may introduce race also.
  43. mux.HandleFunc(machinesPrefix, sh.serveMachines)
  44. mux.HandleFunc("/", http.NotFound)
  45. return mux
  46. }
  47. // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
  48. func NewPeerHandler(server etcdserver.Server) http.Handler {
  49. sh := &serverHandler{
  50. server: server,
  51. }
  52. mux := http.NewServeMux()
  53. mux.HandleFunc(raftPrefix, sh.serveRaft)
  54. mux.HandleFunc("/", http.NotFound)
  55. return mux
  56. }
  57. // serverHandler provides http.Handlers for etcd client and raft communication.
  58. type serverHandler struct {
  59. timeout time.Duration
  60. server etcdserver.Server
  61. timer etcdserver.RaftTimer
  62. clusterStore etcdserver.ClusterStore
  63. }
  64. func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
  65. if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
  66. return
  67. }
  68. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  69. defer cancel()
  70. rr, err := parseRequest(r, etcdserver.GenID())
  71. if err != nil {
  72. writeError(w, err)
  73. return
  74. }
  75. resp, err := h.server.Do(ctx, rr)
  76. if err != nil {
  77. writeError(w, err)
  78. return
  79. }
  80. switch {
  81. case resp.Event != nil:
  82. if err := writeEvent(w, resp.Event, h.timer); err != nil {
  83. // Should never be reached
  84. log.Printf("error writing event: %v", err)
  85. }
  86. case resp.Watcher != nil:
  87. ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
  88. defer cancel()
  89. handleWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
  90. default:
  91. writeError(w, errors.New("received response with no Event/Watcher!"))
  92. }
  93. }
  94. // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
  95. // TODO: rethink the format of machine list because it is not json format.
  96. func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
  97. if !allowMethod(w, r.Method, "GET", "HEAD") {
  98. return
  99. }
  100. endpoints := h.clusterStore.Get().ClientURLs()
  101. w.Write([]byte(strings.Join(endpoints, ", ")))
  102. }
  103. func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
  104. if !allowMethod(w, r.Method, "POST") {
  105. return
  106. }
  107. b, err := ioutil.ReadAll(r.Body)
  108. if err != nil {
  109. log.Println("etcdhttp: error reading raft message:", err)
  110. http.Error(w, "error reading raft message", http.StatusBadRequest)
  111. return
  112. }
  113. var m raftpb.Message
  114. if err := m.Unmarshal(b); err != nil {
  115. log.Println("etcdhttp: error unmarshaling raft message:", err)
  116. http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
  117. return
  118. }
  119. log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
  120. if err := h.server.Process(context.TODO(), m); err != nil {
  121. log.Println("etcdhttp: error processing raft message:", err)
  122. writeError(w, err)
  123. return
  124. }
  125. w.WriteHeader(http.StatusNoContent)
  126. }
  127. // parseRequest converts a received http.Request to a server Request,
  128. // performing validation of supplied fields as appropriate.
  129. // If any validation fails, an empty Request and non-nil error is returned.
  130. func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
  131. emptyReq := etcdserverpb.Request{}
  132. err := r.ParseForm()
  133. if err != nil {
  134. return emptyReq, etcdErr.NewRequestError(
  135. etcdErr.EcodeInvalidForm,
  136. err.Error(),
  137. )
  138. }
  139. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  140. return emptyReq, etcdErr.NewRequestError(
  141. etcdErr.EcodeInvalidForm,
  142. "incorrect key prefix",
  143. )
  144. }
  145. p := r.URL.Path[len(keysPrefix):]
  146. var pIdx, wIdx uint64
  147. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  148. return emptyReq, etcdErr.NewRequestError(
  149. etcdErr.EcodeIndexNaN,
  150. `invalid value for "prevIndex"`,
  151. )
  152. }
  153. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  154. return emptyReq, etcdErr.NewRequestError(
  155. etcdErr.EcodeIndexNaN,
  156. `invalid value for "waitIndex"`,
  157. )
  158. }
  159. var rec, sort, wait, dir, stream bool
  160. if rec, err = getBool(r.Form, "recursive"); err != nil {
  161. return emptyReq, etcdErr.NewRequestError(
  162. etcdErr.EcodeInvalidField,
  163. `invalid value for "recursive"`,
  164. )
  165. }
  166. if sort, err = getBool(r.Form, "sorted"); err != nil {
  167. return emptyReq, etcdErr.NewRequestError(
  168. etcdErr.EcodeInvalidField,
  169. `invalid value for "sorted"`,
  170. )
  171. }
  172. if wait, err = getBool(r.Form, "wait"); err != nil {
  173. return emptyReq, etcdErr.NewRequestError(
  174. etcdErr.EcodeInvalidField,
  175. `invalid value for "wait"`,
  176. )
  177. }
  178. // TODO(jonboulle): define what parameters dir is/isn't compatible with?
  179. if dir, err = getBool(r.Form, "dir"); err != nil {
  180. return emptyReq, etcdErr.NewRequestError(
  181. etcdErr.EcodeInvalidField,
  182. `invalid value for "dir"`,
  183. )
  184. }
  185. if stream, err = getBool(r.Form, "stream"); err != nil {
  186. return emptyReq, etcdErr.NewRequestError(
  187. etcdErr.EcodeInvalidField,
  188. `invalid value for "stream"`,
  189. )
  190. }
  191. if wait && r.Method != "GET" {
  192. return emptyReq, etcdErr.NewRequestError(
  193. etcdErr.EcodeInvalidField,
  194. `"wait" can only be used with GET requests`,
  195. )
  196. }
  197. pV := r.FormValue("prevValue")
  198. if _, ok := r.Form["prevValue"]; ok && pV == "" {
  199. return emptyReq, etcdErr.NewRequestError(
  200. etcdErr.EcodeInvalidField,
  201. `"prevValue" cannot be empty`,
  202. )
  203. }
  204. // TTL is nullable, so leave it null if not specified
  205. // or an empty string
  206. var ttl *uint64
  207. if len(r.FormValue("ttl")) > 0 {
  208. i, err := getUint64(r.Form, "ttl")
  209. if err != nil {
  210. return emptyReq, etcdErr.NewRequestError(
  211. etcdErr.EcodeTTLNaN,
  212. `invalid value for "ttl"`,
  213. )
  214. }
  215. ttl = &i
  216. }
  217. // prevExist is nullable, so leave it null if not specified
  218. var pe *bool
  219. if _, ok := r.Form["prevExist"]; ok {
  220. bv, err := getBool(r.Form, "prevExist")
  221. if err != nil {
  222. return emptyReq, etcdErr.NewRequestError(
  223. etcdErr.EcodeInvalidField,
  224. "invalid value for prevExist",
  225. )
  226. }
  227. pe = &bv
  228. }
  229. rr := etcdserverpb.Request{
  230. ID: id,
  231. Method: r.Method,
  232. Path: p,
  233. Val: r.FormValue("value"),
  234. Dir: dir,
  235. PrevValue: pV,
  236. PrevIndex: pIdx,
  237. PrevExist: pe,
  238. Recursive: rec,
  239. Since: wIdx,
  240. Sorted: sort,
  241. Stream: stream,
  242. Wait: wait,
  243. }
  244. if pe != nil {
  245. rr.PrevExist = pe
  246. }
  247. // Null TTL is equivalent to unset Expiration
  248. // TODO(jonboulle): use fake clock instead of time module
  249. // https://github.com/coreos/etcd/issues/1021
  250. if ttl != nil {
  251. expr := time.Duration(*ttl) * time.Second
  252. rr.Expiration = time.Now().Add(expr).UnixNano()
  253. }
  254. return rr, nil
  255. }
  256. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  257. // not exist in the form, 0 is returned. If the key exists but the value is
  258. // badly formed, an error is returned. If multiple values are present only the
  259. // first is considered.
  260. func getUint64(form url.Values, key string) (i uint64, err error) {
  261. if vals, ok := form[key]; ok {
  262. i, err = strconv.ParseUint(vals[0], 10, 64)
  263. }
  264. return
  265. }
  266. // getBool extracts a bool by the given key from a Form. If the key does not
  267. // exist in the form, false is returned. If the key exists but the value is
  268. // badly formed, an error is returned. If multiple values are present only the
  269. // first is considered.
  270. func getBool(form url.Values, key string) (b bool, err error) {
  271. if vals, ok := form[key]; ok {
  272. b, err = strconv.ParseBool(vals[0])
  273. }
  274. return
  275. }
  276. // writeError logs and writes the given Error to the ResponseWriter
  277. // If Error is an etcdErr, it is rendered to the ResponseWriter
  278. // Otherwise, it is assumed to be an InternalServerError
  279. func writeError(w http.ResponseWriter, err error) {
  280. if err == nil {
  281. return
  282. }
  283. log.Println(err)
  284. if e, ok := err.(*etcdErr.Error); ok {
  285. e.Write(w)
  286. } else {
  287. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  288. }
  289. }
  290. // writeEvent serializes a single Event and writes the resulting
  291. // JSON to the given ResponseWriter, along with the appropriate
  292. // headers
  293. func writeEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
  294. if ev == nil {
  295. return errors.New("cannot write empty Event!")
  296. }
  297. w.Header().Set("Content-Type", "application/json")
  298. w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
  299. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  300. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  301. if ev.IsCreated() {
  302. w.WriteHeader(http.StatusCreated)
  303. }
  304. return json.NewEncoder(w).Encode(ev)
  305. }
  306. func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
  307. defer wa.Remove()
  308. ech := wa.EventChan()
  309. var nch <-chan bool
  310. if x, ok := w.(http.CloseNotifier); ok {
  311. nch = x.CloseNotify()
  312. }
  313. w.Header().Set("Content-Type", "application/json")
  314. w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
  315. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  316. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  317. w.WriteHeader(http.StatusOK)
  318. // Ensure headers are flushed early, in case of long polling
  319. w.(http.Flusher).Flush()
  320. for {
  321. select {
  322. case <-nch:
  323. // Client closed connection. Nothing to do.
  324. return
  325. case <-ctx.Done():
  326. // Timed out. net/http will close the connection for us, so nothing to do.
  327. return
  328. case ev, ok := <-ech:
  329. if !ok {
  330. // If the channel is closed this may be an indication of
  331. // that notifications are much more than we are able to
  332. // send to the client in time. Then we simply end streaming.
  333. return
  334. }
  335. if err := json.NewEncoder(w).Encode(ev); err != nil {
  336. // Should never be reached
  337. log.Println("error writing event: %v", err)
  338. return
  339. }
  340. if !stream {
  341. return
  342. }
  343. w.(http.Flusher).Flush()
  344. }
  345. }
  346. }
  347. // allowMethod verifies that the given method is one of the allowed methods,
  348. // and if not, it writes an error to w. A boolean is returned indicating
  349. // whether or not the method is allowed.
  350. func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
  351. for _, meth := range ms {
  352. if m == meth {
  353. return true
  354. }
  355. }
  356. w.Header().Set("Allow", strings.Join(ms, ","))
  357. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  358. return false
  359. }