http.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. package etcdhttp
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. "net/http"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "time"
  13. etcdErr "github.com/coreos/etcd/error"
  14. "github.com/coreos/etcd/etcdserver"
  15. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  16. "github.com/coreos/etcd/raft/raftpb"
  17. "github.com/coreos/etcd/store"
  18. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  19. )
  20. const (
  21. keysPrefix = "/v2/keys"
  22. machinesPrefix = "/v2/machines"
  23. raftPrefix = "/raft"
  24. // time to wait for response from EtcdServer requests
  25. defaultServerTimeout = 500 * time.Millisecond
  26. // time to wait for a Watch request
  27. defaultWatchTimeout = 5 * time.Minute
  28. )
  29. var errClosed = errors.New("etcdhttp: client closed connection")
  30. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  31. func NewClientHandler(server etcdserver.Server, peers Peers, timeout time.Duration) http.Handler {
  32. sh := &serverHandler{
  33. server: server,
  34. peers: peers,
  35. timeout: timeout,
  36. }
  37. if sh.timeout == 0 {
  38. sh.timeout = defaultServerTimeout
  39. }
  40. mux := http.NewServeMux()
  41. mux.HandleFunc(keysPrefix, sh.serveKeys)
  42. mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
  43. // TODO: dynamic configuration may make this outdated. take care of it.
  44. // TODO: dynamic configuration may introduce race also.
  45. mux.HandleFunc(machinesPrefix, sh.serveMachines)
  46. mux.HandleFunc("/", http.NotFound)
  47. return mux
  48. }
  49. // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
  50. func NewPeerHandler(server etcdserver.Server) http.Handler {
  51. sh := &serverHandler{
  52. server: server,
  53. }
  54. mux := http.NewServeMux()
  55. mux.HandleFunc(raftPrefix, sh.serveRaft)
  56. mux.HandleFunc("/", http.NotFound)
  57. return mux
  58. }
  59. // serverHandler provides http.Handlers for etcd client and raft communication.
  60. type serverHandler struct {
  61. timeout time.Duration
  62. server etcdserver.Server
  63. peers Peers
  64. }
  65. func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
  66. if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
  67. return
  68. }
  69. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  70. defer cancel()
  71. rr, err := parseRequest(r, etcdserver.GenID())
  72. if err != nil {
  73. writeError(w, err)
  74. return
  75. }
  76. resp, err := h.server.Do(ctx, rr)
  77. if err != nil {
  78. writeError(w, err)
  79. return
  80. }
  81. switch {
  82. case resp.Event != nil:
  83. if err := writeEvent(w, resp.Event); err != nil {
  84. // Should never be reached
  85. log.Println("error writing event: %v", err)
  86. }
  87. case resp.Watcher != nil:
  88. ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
  89. defer cancel()
  90. handleWatch(ctx, w, resp.Watcher, rr.Stream)
  91. default:
  92. writeError(w, errors.New("received response with no Event/Watcher!"))
  93. }
  94. }
  95. // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
  96. // TODO: rethink the format of machine list because it is not json format.
  97. func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
  98. if !allowMethod(w, r.Method, "GET", "HEAD") {
  99. return
  100. }
  101. endpoints := h.peers.Endpoints()
  102. w.Write([]byte(strings.Join(endpoints, ", ")))
  103. }
  104. func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
  105. if !allowMethod(w, r.Method, "POST") {
  106. return
  107. }
  108. b, err := ioutil.ReadAll(r.Body)
  109. if err != nil {
  110. log.Println("etcdhttp: error reading raft message:", err)
  111. http.Error(w, "error reading raft message", http.StatusBadRequest)
  112. return
  113. }
  114. var m raftpb.Message
  115. if err := m.Unmarshal(b); err != nil {
  116. log.Println("etcdhttp: error unmarshaling raft message:", err)
  117. http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
  118. return
  119. }
  120. log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
  121. if err := h.server.Process(context.TODO(), m); err != nil {
  122. log.Println("etcdhttp: error processing raft message:", err)
  123. writeError(w, err)
  124. return
  125. }
  126. w.WriteHeader(http.StatusNoContent)
  127. }
  128. // parseRequest converts a received http.Request to a server Request,
  129. // performing validation of supplied fields as appropriate.
  130. // If any validation fails, an empty Request and non-nil error is returned.
  131. func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
  132. emptyReq := etcdserverpb.Request{}
  133. err := r.ParseForm()
  134. if err != nil {
  135. return emptyReq, etcdErr.NewRequestError(
  136. etcdErr.EcodeInvalidForm,
  137. err.Error(),
  138. )
  139. }
  140. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  141. return emptyReq, etcdErr.NewRequestError(
  142. etcdErr.EcodeInvalidForm,
  143. "incorrect key prefix",
  144. )
  145. }
  146. p := r.URL.Path[len(keysPrefix):]
  147. var pIdx, wIdx, ttl uint64
  148. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  149. return emptyReq, etcdErr.NewRequestError(
  150. etcdErr.EcodeIndexNaN,
  151. `invalid value for "prevIndex"`,
  152. )
  153. }
  154. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  155. return emptyReq, etcdErr.NewRequestError(
  156. etcdErr.EcodeIndexNaN,
  157. `invalid value for "waitIndex"`,
  158. )
  159. }
  160. if ttl, err = getUint64(r.Form, "ttl"); err != nil {
  161. return emptyReq, etcdErr.NewRequestError(
  162. etcdErr.EcodeTTLNaN,
  163. `invalid value for "ttl"`,
  164. )
  165. }
  166. var rec, sort, wait, stream bool
  167. if rec, err = getBool(r.Form, "recursive"); err != nil {
  168. return emptyReq, etcdErr.NewRequestError(
  169. etcdErr.EcodeInvalidField,
  170. `invalid value for "recursive"`,
  171. )
  172. }
  173. if sort, err = getBool(r.Form, "sorted"); err != nil {
  174. return emptyReq, etcdErr.NewRequestError(
  175. etcdErr.EcodeInvalidField,
  176. `invalid value for "sorted"`,
  177. )
  178. }
  179. if wait, err = getBool(r.Form, "wait"); err != nil {
  180. return emptyReq, etcdErr.NewRequestError(
  181. etcdErr.EcodeInvalidField,
  182. `invalid value for "wait"`,
  183. )
  184. }
  185. if stream, err = getBool(r.Form, "stream"); err != nil {
  186. return emptyReq, etcdErr.NewRequestError(
  187. etcdErr.EcodeInvalidField,
  188. `invalid value for "stream"`,
  189. )
  190. }
  191. if wait && r.Method != "GET" {
  192. return emptyReq, etcdErr.NewRequestError(
  193. etcdErr.EcodeInvalidField,
  194. `"wait" can only be used with GET requests`,
  195. )
  196. }
  197. // prevExist is nullable, so leave it null if not specified
  198. var pe *bool
  199. if _, ok := r.Form["prevExist"]; ok {
  200. bv, err := getBool(r.Form, "prevExist")
  201. if err != nil {
  202. return emptyReq, etcdErr.NewRequestError(
  203. etcdErr.EcodeInvalidField,
  204. "invalid value for prevExist",
  205. )
  206. }
  207. pe = &bv
  208. }
  209. rr := etcdserverpb.Request{
  210. Id: id,
  211. Method: r.Method,
  212. Path: p,
  213. Val: r.FormValue("value"),
  214. PrevValue: r.FormValue("prevValue"),
  215. PrevIndex: pIdx,
  216. PrevExist: pe,
  217. Recursive: rec,
  218. Since: wIdx,
  219. Sorted: sort,
  220. Stream: stream,
  221. Wait: wait,
  222. }
  223. if pe != nil {
  224. rr.PrevExist = pe
  225. }
  226. // TODO(jonboulle): use fake clock instead of time module
  227. // https://github.com/coreos/etcd/issues/1021
  228. if ttl > 0 {
  229. expr := time.Duration(ttl) * time.Second
  230. rr.Expiration = time.Now().Add(expr).UnixNano()
  231. }
  232. return rr, nil
  233. }
  234. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  235. // not exist in the form, 0 is returned. If the key exists but the value is
  236. // badly formed, an error is returned. If multiple values are present only the
  237. // first is considered.
  238. func getUint64(form url.Values, key string) (i uint64, err error) {
  239. if vals, ok := form[key]; ok {
  240. i, err = strconv.ParseUint(vals[0], 10, 64)
  241. }
  242. return
  243. }
  244. // getBool extracts a bool by the given key from a Form. If the key does not
  245. // exist in the form, false is returned. If the key exists but the value is
  246. // badly formed, an error is returned. If multiple values are present only the
  247. // first is considered.
  248. func getBool(form url.Values, key string) (b bool, err error) {
  249. if vals, ok := form[key]; ok {
  250. b, err = strconv.ParseBool(vals[0])
  251. }
  252. return
  253. }
  254. // writeError logs and writes the given Error to the ResponseWriter
  255. // If Error is an etcdErr, it is rendered to the ResponseWriter
  256. // Otherwise, it is assumed to be an InternalServerError
  257. func writeError(w http.ResponseWriter, err error) {
  258. if err == nil {
  259. return
  260. }
  261. log.Println(err)
  262. if e, ok := err.(*etcdErr.Error); ok {
  263. e.Write(w)
  264. } else {
  265. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  266. }
  267. }
  268. // writeEvent serializes a single Event and writes the resulting
  269. // JSON to the given ResponseWriter, along with the appropriate
  270. // headers
  271. func writeEvent(w http.ResponseWriter, ev *store.Event) error {
  272. if ev == nil {
  273. return errors.New("cannot write empty Event!")
  274. }
  275. w.Header().Set("Content-Type", "application/json")
  276. w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
  277. if ev.IsCreated() {
  278. w.WriteHeader(http.StatusCreated)
  279. }
  280. return json.NewEncoder(w).Encode(ev)
  281. }
  282. func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool) {
  283. defer wa.Remove()
  284. ech := wa.EventChan()
  285. var nch <-chan bool
  286. if x, ok := w.(http.CloseNotifier); ok {
  287. nch = x.CloseNotify()
  288. }
  289. w.Header().Set("Content-Type", "application/json")
  290. w.WriteHeader(http.StatusOK)
  291. // Ensure headers are flushed early, in case of long polling
  292. w.(http.Flusher).Flush()
  293. for {
  294. select {
  295. case <-nch:
  296. // Client closed connection. Nothing to do.
  297. return
  298. case <-ctx.Done():
  299. // Timed out. net/http will close the connection for us, so nothing to do.
  300. return
  301. case ev, ok := <-ech:
  302. if !ok {
  303. // If the channel is closed this may be an indication of
  304. // that notifications are much more than we are able to
  305. // send to the client in time. Then we simply end streaming.
  306. return
  307. }
  308. if err := json.NewEncoder(w).Encode(ev); err != nil {
  309. // Should never be reached
  310. log.Println("error writing event: %v", err)
  311. return
  312. }
  313. if !stream {
  314. return
  315. }
  316. w.(http.Flusher).Flush()
  317. }
  318. }
  319. }
  320. // allowMethod verifies that the given method is one of the allowed methods,
  321. // and if not, it writes an error to w. A boolean is returned indicating
  322. // whether or not the method is allowed.
  323. func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
  324. for _, meth := range ms {
  325. if m == meth {
  326. return true
  327. }
  328. }
  329. w.Header().Set("Allow", strings.Join(ms, ","))
  330. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  331. return false
  332. }