http.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. package etcdhttp
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. "net/http"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "time"
  13. etcdErr "github.com/coreos/etcd/error"
  14. "github.com/coreos/etcd/etcdserver"
  15. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  16. "github.com/coreos/etcd/raft/raftpb"
  17. "github.com/coreos/etcd/store"
  18. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  19. )
  20. const (
  21. keysPrefix = "/v2/keys"
  22. machinesPrefix = "/v2/machines"
  23. raftPrefix = "/raft"
  24. // time to wait for response from EtcdServer requests
  25. defaultServerTimeout = 500 * time.Millisecond
  26. // time to wait for a Watch request
  27. defaultWatchTimeout = 5 * time.Minute
  28. )
  29. var errClosed = errors.New("etcdhttp: client closed connection")
  30. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  31. func NewClientHandler(server etcdserver.Server, peers Peers, timeout time.Duration) http.Handler {
  32. sh := &serverHandler{
  33. server: server,
  34. peers: peers,
  35. timeout: timeout,
  36. }
  37. if sh.timeout == 0 {
  38. sh.timeout = defaultServerTimeout
  39. }
  40. mux := http.NewServeMux()
  41. mux.HandleFunc(keysPrefix, sh.serveKeys)
  42. mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
  43. // TODO: dynamic configuration may make this outdated. take care of it.
  44. // TODO: dynamic configuration may introduce race also.
  45. mux.HandleFunc(machinesPrefix, sh.serveMachines)
  46. mux.HandleFunc("/", http.NotFound)
  47. return mux
  48. }
  49. // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
  50. func NewPeerHandler(server etcdserver.Server) http.Handler {
  51. sh := &serverHandler{
  52. server: server,
  53. }
  54. mux := http.NewServeMux()
  55. mux.HandleFunc(raftPrefix, sh.serveRaft)
  56. mux.HandleFunc("/", http.NotFound)
  57. return mux
  58. }
  59. // serverHandler provides http.Handlers for etcd client and raft communication.
  60. type serverHandler struct {
  61. timeout time.Duration
  62. server etcdserver.Server
  63. peers Peers
  64. }
  65. func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
  66. if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
  67. return
  68. }
  69. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  70. defer cancel()
  71. rr, err := parseRequest(r, etcdserver.GenID())
  72. if err != nil {
  73. writeError(w, err)
  74. return
  75. }
  76. resp, err := h.server.Do(ctx, rr)
  77. if err != nil {
  78. writeError(w, err)
  79. return
  80. }
  81. switch {
  82. case resp.Event != nil:
  83. if err := writeEvent(w, resp.Event); err != nil {
  84. // Should never be reached
  85. log.Println("error writing event: %v", err)
  86. }
  87. case resp.Watcher != nil:
  88. ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
  89. defer cancel()
  90. handleWatch(ctx, w, resp.Watcher, rr.Stream)
  91. default:
  92. writeError(w, errors.New("received response with no Event/Watcher!"))
  93. }
  94. }
  95. // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
  96. // TODO: rethink the format of machine list because it is not json format.
  97. func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
  98. if !allowMethod(w, r.Method, "GET", "HEAD") {
  99. return
  100. }
  101. endpoints := h.peers.Endpoints()
  102. w.Write([]byte(strings.Join(endpoints, ", ")))
  103. }
  104. func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
  105. if !allowMethod(w, r.Method, "POST") {
  106. return
  107. }
  108. b, err := ioutil.ReadAll(r.Body)
  109. if err != nil {
  110. log.Println("etcdhttp: error reading raft message:", err)
  111. http.Error(w, "error reading raft message", http.StatusBadRequest)
  112. return
  113. }
  114. var m raftpb.Message
  115. if err := m.Unmarshal(b); err != nil {
  116. log.Println("etcdhttp: error unmarshaling raft message:", err)
  117. http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
  118. return
  119. }
  120. log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
  121. if err := h.server.Process(context.TODO(), m); err != nil {
  122. log.Println("etcdhttp: error processing raft message:", err)
  123. writeError(w, err)
  124. return
  125. }
  126. w.WriteHeader(http.StatusNoContent)
  127. }
  128. // parseRequest converts a received http.Request to a server Request,
  129. // performing validation of supplied fields as appropriate.
  130. // If any validation fails, an empty Request and non-nil error is returned.
  131. func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
  132. emptyReq := etcdserverpb.Request{}
  133. err := r.ParseForm()
  134. if err != nil {
  135. return emptyReq, etcdErr.NewRequestError(
  136. etcdErr.EcodeInvalidForm,
  137. err.Error(),
  138. )
  139. }
  140. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  141. return emptyReq, etcdErr.NewRequestError(
  142. etcdErr.EcodeInvalidForm,
  143. "incorrect key prefix",
  144. )
  145. }
  146. p := r.URL.Path[len(keysPrefix):]
  147. var pIdx, wIdx, ttl uint64
  148. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  149. return emptyReq, etcdErr.NewRequestError(
  150. etcdErr.EcodeIndexNaN,
  151. `invalid value for "prevIndex"`,
  152. )
  153. }
  154. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  155. return emptyReq, etcdErr.NewRequestError(
  156. etcdErr.EcodeIndexNaN,
  157. `invalid value for "waitIndex"`,
  158. )
  159. }
  160. if ttl, err = getUint64(r.Form, "ttl"); err != nil {
  161. return emptyReq, etcdErr.NewRequestError(
  162. etcdErr.EcodeTTLNaN,
  163. `invalid value for "ttl"`,
  164. )
  165. }
  166. var rec, sort, wait, stream bool
  167. if rec, err = getBool(r.Form, "recursive"); err != nil {
  168. return emptyReq, etcdErr.NewRequestError(
  169. etcdErr.EcodeInvalidField,
  170. `invalid value for "recursive"`,
  171. )
  172. }
  173. if sort, err = getBool(r.Form, "sorted"); err != nil {
  174. return emptyReq, etcdErr.NewRequestError(
  175. etcdErr.EcodeInvalidField,
  176. `invalid value for "sorted"`,
  177. )
  178. }
  179. if wait, err = getBool(r.Form, "wait"); err != nil {
  180. return emptyReq, etcdErr.NewRequestError(
  181. etcdErr.EcodeInvalidField,
  182. `invalid value for "wait"`,
  183. )
  184. }
  185. if stream, err = getBool(r.Form, "stream"); err != nil {
  186. return emptyReq, etcdErr.NewRequestError(
  187. etcdErr.EcodeInvalidField,
  188. `invalid value for "stream"`,
  189. )
  190. }
  191. if wait && r.Method != "GET" {
  192. return emptyReq, etcdErr.NewRequestError(
  193. etcdErr.EcodeInvalidField,
  194. `"wait" can only be used with GET requests`,
  195. )
  196. }
  197. pV := r.FormValue("prevValue")
  198. if _, ok := r.Form["prevValue"]; ok && pV == "" {
  199. return emptyReq, etcdErr.NewRequestError(
  200. etcdErr.EcodeInvalidField,
  201. `"prevValue" cannot be empty`,
  202. )
  203. }
  204. // prevExist is nullable, so leave it null if not specified
  205. var pe *bool
  206. if _, ok := r.Form["prevExist"]; ok {
  207. bv, err := getBool(r.Form, "prevExist")
  208. if err != nil {
  209. return emptyReq, etcdErr.NewRequestError(
  210. etcdErr.EcodeInvalidField,
  211. "invalid value for prevExist",
  212. )
  213. }
  214. pe = &bv
  215. }
  216. rr := etcdserverpb.Request{
  217. Id: id,
  218. Method: r.Method,
  219. Path: p,
  220. Val: r.FormValue("value"),
  221. PrevValue: pV,
  222. PrevIndex: pIdx,
  223. PrevExist: pe,
  224. Recursive: rec,
  225. Since: wIdx,
  226. Sorted: sort,
  227. Stream: stream,
  228. Wait: wait,
  229. }
  230. if pe != nil {
  231. rr.PrevExist = pe
  232. }
  233. // TODO(jonboulle): use fake clock instead of time module
  234. // https://github.com/coreos/etcd/issues/1021
  235. if ttl > 0 {
  236. expr := time.Duration(ttl) * time.Second
  237. rr.Expiration = time.Now().Add(expr).UnixNano()
  238. }
  239. return rr, nil
  240. }
  241. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  242. // not exist in the form, 0 is returned. If the key exists but the value is
  243. // badly formed, an error is returned. If multiple values are present only the
  244. // first is considered.
  245. func getUint64(form url.Values, key string) (i uint64, err error) {
  246. if vals, ok := form[key]; ok {
  247. i, err = strconv.ParseUint(vals[0], 10, 64)
  248. }
  249. return
  250. }
  251. // getBool extracts a bool by the given key from a Form. If the key does not
  252. // exist in the form, false is returned. If the key exists but the value is
  253. // badly formed, an error is returned. If multiple values are present only the
  254. // first is considered.
  255. func getBool(form url.Values, key string) (b bool, err error) {
  256. if vals, ok := form[key]; ok {
  257. b, err = strconv.ParseBool(vals[0])
  258. }
  259. return
  260. }
  261. // writeError logs and writes the given Error to the ResponseWriter
  262. // If Error is an etcdErr, it is rendered to the ResponseWriter
  263. // Otherwise, it is assumed to be an InternalServerError
  264. func writeError(w http.ResponseWriter, err error) {
  265. if err == nil {
  266. return
  267. }
  268. log.Println(err)
  269. if e, ok := err.(*etcdErr.Error); ok {
  270. e.Write(w)
  271. } else {
  272. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  273. }
  274. }
  275. // writeEvent serializes a single Event and writes the resulting
  276. // JSON to the given ResponseWriter, along with the appropriate
  277. // headers
  278. func writeEvent(w http.ResponseWriter, ev *store.Event) error {
  279. if ev == nil {
  280. return errors.New("cannot write empty Event!")
  281. }
  282. w.Header().Set("Content-Type", "application/json")
  283. w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
  284. if ev.IsCreated() {
  285. w.WriteHeader(http.StatusCreated)
  286. }
  287. return json.NewEncoder(w).Encode(ev)
  288. }
  289. func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool) {
  290. defer wa.Remove()
  291. ech := wa.EventChan()
  292. var nch <-chan bool
  293. if x, ok := w.(http.CloseNotifier); ok {
  294. nch = x.CloseNotify()
  295. }
  296. w.Header().Set("Content-Type", "application/json")
  297. w.WriteHeader(http.StatusOK)
  298. // Ensure headers are flushed early, in case of long polling
  299. w.(http.Flusher).Flush()
  300. for {
  301. select {
  302. case <-nch:
  303. // Client closed connection. Nothing to do.
  304. return
  305. case <-ctx.Done():
  306. // Timed out. net/http will close the connection for us, so nothing to do.
  307. return
  308. case ev, ok := <-ech:
  309. if !ok {
  310. // If the channel is closed this may be an indication of
  311. // that notifications are much more than we are able to
  312. // send to the client in time. Then we simply end streaming.
  313. return
  314. }
  315. if err := json.NewEncoder(w).Encode(ev); err != nil {
  316. // Should never be reached
  317. log.Println("error writing event: %v", err)
  318. return
  319. }
  320. if !stream {
  321. return
  322. }
  323. w.(http.Flusher).Flush()
  324. }
  325. }
  326. }
  327. // allowMethod verifies that the given method is one of the allowed methods,
  328. // and if not, it writes an error to w. A boolean is returned indicating
  329. // whether or not the method is allowed.
  330. func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
  331. for _, meth := range ms {
  332. if m == meth {
  333. return true
  334. }
  335. }
  336. w.Header().Set("Allow", strings.Join(ms, ","))
  337. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  338. return false
  339. }