http.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396
  1. package etcdhttp
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. "net/http"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "time"
  13. etcdErr "github.com/coreos/etcd/error"
  14. "github.com/coreos/etcd/etcdserver"
  15. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  16. "github.com/coreos/etcd/raft/raftpb"
  17. "github.com/coreos/etcd/store"
  18. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  19. )
  20. const (
  21. keysPrefix = "/v2/keys"
  22. machinesPrefix = "/v2/machines"
  23. raftPrefix = "/raft"
  24. // time to wait for response from EtcdServer requests
  25. defaultServerTimeout = 500 * time.Millisecond
  26. // time to wait for a Watch request
  27. defaultWatchTimeout = 5 * time.Minute
  28. )
  29. var errClosed = errors.New("etcdhttp: client closed connection")
  30. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  31. func NewClientHandler(server etcdserver.Server, peers Peers, timeout time.Duration) http.Handler {
  32. sh := &serverHandler{
  33. server: server,
  34. peers: peers,
  35. timeout: timeout,
  36. }
  37. if sh.timeout == 0 {
  38. sh.timeout = defaultServerTimeout
  39. }
  40. mux := http.NewServeMux()
  41. mux.HandleFunc(keysPrefix, sh.serveKeys)
  42. mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
  43. // TODO: dynamic configuration may make this outdated. take care of it.
  44. // TODO: dynamic configuration may introduce race also.
  45. mux.HandleFunc(machinesPrefix, sh.serveMachines)
  46. mux.HandleFunc("/", http.NotFound)
  47. return mux
  48. }
  49. // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
  50. func NewPeerHandler(server etcdserver.Server) http.Handler {
  51. sh := &serverHandler{
  52. server: server,
  53. }
  54. mux := http.NewServeMux()
  55. mux.HandleFunc(raftPrefix, sh.serveRaft)
  56. mux.HandleFunc("/", http.NotFound)
  57. return mux
  58. }
  59. // serverHandler provides http.Handlers for etcd client and raft communication.
  60. type serverHandler struct {
  61. timeout time.Duration
  62. server etcdserver.Server
  63. peers Peers
  64. }
  65. func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
  66. if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
  67. return
  68. }
  69. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  70. defer cancel()
  71. rr, err := parseRequest(r, etcdserver.GenID())
  72. if err != nil {
  73. writeError(w, err)
  74. return
  75. }
  76. resp, err := h.server.Do(ctx, rr)
  77. if err != nil {
  78. writeError(w, err)
  79. return
  80. }
  81. switch {
  82. case resp.Event != nil:
  83. if err := writeEvent(w, resp.Event); err != nil {
  84. // Should never be reached
  85. log.Println("error writing event: %v", err)
  86. }
  87. case resp.Watcher != nil:
  88. ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
  89. defer cancel()
  90. handleWatch(ctx, w, resp.Watcher, rr.Stream)
  91. default:
  92. writeError(w, errors.New("received response with no Event/Watcher!"))
  93. }
  94. }
  95. // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
  96. // TODO: rethink the format of machine list because it is not json format.
  97. func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
  98. if !allowMethod(w, r.Method, "GET", "HEAD") {
  99. return
  100. }
  101. endpoints := h.peers.Endpoints()
  102. w.Write([]byte(strings.Join(endpoints, ", ")))
  103. }
  104. func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
  105. if !allowMethod(w, r.Method, "POST") {
  106. return
  107. }
  108. b, err := ioutil.ReadAll(r.Body)
  109. if err != nil {
  110. log.Println("etcdhttp: error reading raft message:", err)
  111. http.Error(w, "error reading raft message", http.StatusBadRequest)
  112. return
  113. }
  114. var m raftpb.Message
  115. if err := m.Unmarshal(b); err != nil {
  116. log.Println("etcdhttp: error unmarshaling raft message:", err)
  117. http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
  118. return
  119. }
  120. log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
  121. if err := h.server.Process(context.TODO(), m); err != nil {
  122. log.Println("etcdhttp: error processing raft message:", err)
  123. writeError(w, err)
  124. return
  125. }
  126. w.WriteHeader(http.StatusNoContent)
  127. }
  128. // parseRequest converts a received http.Request to a server Request,
  129. // performing validation of supplied fields as appropriate.
  130. // If any validation fails, an empty Request and non-nil error is returned.
  131. func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
  132. emptyReq := etcdserverpb.Request{}
  133. err := r.ParseForm()
  134. if err != nil {
  135. return emptyReq, etcdErr.NewRequestError(
  136. etcdErr.EcodeInvalidForm,
  137. err.Error(),
  138. )
  139. }
  140. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  141. return emptyReq, etcdErr.NewRequestError(
  142. etcdErr.EcodeInvalidForm,
  143. "incorrect key prefix",
  144. )
  145. }
  146. p := r.URL.Path[len(keysPrefix):]
  147. var pIdx, wIdx uint64
  148. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  149. return emptyReq, etcdErr.NewRequestError(
  150. etcdErr.EcodeIndexNaN,
  151. `invalid value for "prevIndex"`,
  152. )
  153. }
  154. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  155. return emptyReq, etcdErr.NewRequestError(
  156. etcdErr.EcodeIndexNaN,
  157. `invalid value for "waitIndex"`,
  158. )
  159. }
  160. var rec, sort, wait, dir, stream bool
  161. if rec, err = getBool(r.Form, "recursive"); err != nil {
  162. return emptyReq, etcdErr.NewRequestError(
  163. etcdErr.EcodeInvalidField,
  164. `invalid value for "recursive"`,
  165. )
  166. }
  167. if sort, err = getBool(r.Form, "sorted"); err != nil {
  168. return emptyReq, etcdErr.NewRequestError(
  169. etcdErr.EcodeInvalidField,
  170. `invalid value for "sorted"`,
  171. )
  172. }
  173. if wait, err = getBool(r.Form, "wait"); err != nil {
  174. return emptyReq, etcdErr.NewRequestError(
  175. etcdErr.EcodeInvalidField,
  176. `invalid value for "wait"`,
  177. )
  178. }
  179. // TODO(jonboulle): define what parameters dir is/isn't compatible with?
  180. if dir, err = getBool(r.Form, "dir"); err != nil {
  181. return emptyReq, etcdErr.NewRequestError(
  182. etcdErr.EcodeInvalidField,
  183. `invalid value for "dir"`,
  184. )
  185. }
  186. if stream, err = getBool(r.Form, "stream"); err != nil {
  187. return emptyReq, etcdErr.NewRequestError(
  188. etcdErr.EcodeInvalidField,
  189. `invalid value for "stream"`,
  190. )
  191. }
  192. if wait && r.Method != "GET" {
  193. return emptyReq, etcdErr.NewRequestError(
  194. etcdErr.EcodeInvalidField,
  195. `"wait" can only be used with GET requests`,
  196. )
  197. }
  198. pV := r.FormValue("prevValue")
  199. if _, ok := r.Form["prevValue"]; ok && pV == "" {
  200. return emptyReq, etcdErr.NewRequestError(
  201. etcdErr.EcodeInvalidField,
  202. `"prevValue" cannot be empty`,
  203. )
  204. }
  205. // TTL is nullable, so leave it null if not specified
  206. // or an empty string
  207. var ttl *uint64
  208. if len(r.FormValue("ttl")) > 0 {
  209. i, err := getUint64(r.Form, "ttl")
  210. if err != nil {
  211. return emptyReq, etcdErr.NewRequestError(
  212. etcdErr.EcodeTTLNaN,
  213. `invalid value for "ttl"`,
  214. )
  215. }
  216. ttl = &i
  217. }
  218. // prevExist is nullable, so leave it null if not specified
  219. var pe *bool
  220. if _, ok := r.Form["prevExist"]; ok {
  221. bv, err := getBool(r.Form, "prevExist")
  222. if err != nil {
  223. return emptyReq, etcdErr.NewRequestError(
  224. etcdErr.EcodeInvalidField,
  225. "invalid value for prevExist",
  226. )
  227. }
  228. pe = &bv
  229. }
  230. rr := etcdserverpb.Request{
  231. Id: id,
  232. Method: r.Method,
  233. Path: p,
  234. Val: r.FormValue("value"),
  235. Dir: dir,
  236. PrevValue: pV,
  237. PrevIndex: pIdx,
  238. PrevExist: pe,
  239. Recursive: rec,
  240. Since: wIdx,
  241. Sorted: sort,
  242. Stream: stream,
  243. Wait: wait,
  244. }
  245. if pe != nil {
  246. rr.PrevExist = pe
  247. }
  248. // Null TTL is equivalent to unset Expiration
  249. // TODO(jonboulle): use fake clock instead of time module
  250. // https://github.com/coreos/etcd/issues/1021
  251. if ttl != nil {
  252. expr := time.Duration(*ttl) * time.Second
  253. rr.Expiration = time.Now().Add(expr).UnixNano()
  254. }
  255. return rr, nil
  256. }
  257. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  258. // not exist in the form, 0 is returned. If the key exists but the value is
  259. // badly formed, an error is returned. If multiple values are present only the
  260. // first is considered.
  261. func getUint64(form url.Values, key string) (i uint64, err error) {
  262. if vals, ok := form[key]; ok {
  263. i, err = strconv.ParseUint(vals[0], 10, 64)
  264. }
  265. return
  266. }
  267. // getBool extracts a bool by the given key from a Form. If the key does not
  268. // exist in the form, false is returned. If the key exists but the value is
  269. // badly formed, an error is returned. If multiple values are present only the
  270. // first is considered.
  271. func getBool(form url.Values, key string) (b bool, err error) {
  272. if vals, ok := form[key]; ok {
  273. b, err = strconv.ParseBool(vals[0])
  274. }
  275. return
  276. }
  277. // writeError logs and writes the given Error to the ResponseWriter
  278. // If Error is an etcdErr, it is rendered to the ResponseWriter
  279. // Otherwise, it is assumed to be an InternalServerError
  280. func writeError(w http.ResponseWriter, err error) {
  281. if err == nil {
  282. return
  283. }
  284. log.Println(err)
  285. if e, ok := err.(*etcdErr.Error); ok {
  286. e.Write(w)
  287. } else {
  288. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  289. }
  290. }
  291. // writeEvent serializes a single Event and writes the resulting
  292. // JSON to the given ResponseWriter, along with the appropriate
  293. // headers
  294. func writeEvent(w http.ResponseWriter, ev *store.Event) error {
  295. if ev == nil {
  296. return errors.New("cannot write empty Event!")
  297. }
  298. w.Header().Set("Content-Type", "application/json")
  299. w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
  300. if ev.IsCreated() {
  301. w.WriteHeader(http.StatusCreated)
  302. }
  303. return json.NewEncoder(w).Encode(ev)
  304. }
  305. func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool) {
  306. defer wa.Remove()
  307. ech := wa.EventChan()
  308. var nch <-chan bool
  309. if x, ok := w.(http.CloseNotifier); ok {
  310. nch = x.CloseNotify()
  311. }
  312. w.Header().Set("Content-Type", "application/json")
  313. w.WriteHeader(http.StatusOK)
  314. // Ensure headers are flushed early, in case of long polling
  315. w.(http.Flusher).Flush()
  316. for {
  317. select {
  318. case <-nch:
  319. // Client closed connection. Nothing to do.
  320. return
  321. case <-ctx.Done():
  322. // Timed out. net/http will close the connection for us, so nothing to do.
  323. return
  324. case ev, ok := <-ech:
  325. if !ok {
  326. // If the channel is closed this may be an indication of
  327. // that notifications are much more than we are able to
  328. // send to the client in time. Then we simply end streaming.
  329. return
  330. }
  331. if err := json.NewEncoder(w).Encode(ev); err != nil {
  332. // Should never be reached
  333. log.Println("error writing event: %v", err)
  334. return
  335. }
  336. if !stream {
  337. return
  338. }
  339. w.(http.Flusher).Flush()
  340. }
  341. }
  342. }
  343. // allowMethod verifies that the given method is one of the allowed methods,
  344. // and if not, it writes an error to w. A boolean is returned indicating
  345. // whether or not the method is allowed.
  346. func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
  347. for _, meth := range ms {
  348. if m == meth {
  349. return true
  350. }
  351. }
  352. w.Header().Set("Allow", strings.Join(ms, ","))
  353. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  354. return false
  355. }