http.go 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. package etcdhttp
  2. import (
  3. "encoding/binary"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "log"
  10. "net/http"
  11. "net/url"
  12. "strconv"
  13. "strings"
  14. "time"
  15. crand "crypto/rand"
  16. "github.com/coreos/etcd/elog"
  17. etcdErr "github.com/coreos/etcd/error"
  18. "github.com/coreos/etcd/etcdserver"
  19. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  20. "github.com/coreos/etcd/raft/raftpb"
  21. "github.com/coreos/etcd/store"
  22. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  23. )
  24. const (
  25. keysPrefix = "/v2/keys"
  26. machinesPrefix = "/v2/machines"
  27. raftPrefix = "/raft"
  28. DefaultTimeout = 500 * time.Millisecond
  29. )
  30. var errClosed = errors.New("etcdhttp: client closed connection")
  31. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  32. func NewClientHandler(server etcdserver.Server, peers Peers, timeout time.Duration) http.Handler {
  33. sh := &serverHandler{
  34. server: server,
  35. peers: peers,
  36. timeout: timeout,
  37. }
  38. if sh.timeout == 0 {
  39. sh.timeout = DefaultTimeout
  40. }
  41. mux := http.NewServeMux()
  42. mux.HandleFunc(keysPrefix, sh.serveKeys)
  43. mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
  44. // TODO: dynamic configuration may make this outdated. take care of it.
  45. // TODO: dynamic configuration may introduce race also.
  46. mux.HandleFunc(machinesPrefix, sh.serveMachines)
  47. mux.HandleFunc("/", http.NotFound)
  48. return mux
  49. }
  50. // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
  51. func NewPeerHandler(server etcdserver.Server) http.Handler {
  52. sh := &serverHandler{
  53. server: server,
  54. }
  55. mux := http.NewServeMux()
  56. mux.HandleFunc(raftPrefix, sh.serveRaft)
  57. mux.HandleFunc("/", http.NotFound)
  58. return mux
  59. }
  60. // serverHandler provides http.Handlers for etcd client and raft communication.
  61. type serverHandler struct {
  62. timeout time.Duration
  63. server etcdserver.Server
  64. peers Peers
  65. }
  66. func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
  67. if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
  68. return
  69. }
  70. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  71. defer cancel()
  72. rr, err := parseRequest(r, genID())
  73. if err != nil {
  74. writeError(w, err)
  75. return
  76. }
  77. resp, err := h.server.Do(ctx, rr)
  78. if err != nil {
  79. writeError(w, err)
  80. return
  81. }
  82. var ev *store.Event
  83. switch {
  84. case resp.Event != nil:
  85. ev = resp.Event
  86. case resp.Watcher != nil:
  87. if ev, err = waitForEvent(ctx, w, resp.Watcher); err != nil {
  88. http.Error(w, err.Error(), http.StatusGatewayTimeout)
  89. return
  90. }
  91. default:
  92. writeError(w, errors.New("received response with no Event/Watcher!"))
  93. return
  94. }
  95. if err = writeEvent(w, ev); err != nil {
  96. // Should never be reached
  97. log.Println("error writing event: %v", err)
  98. }
  99. }
  100. // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
  101. // TODO: rethink the format of machine list because it is not json format.
  102. func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
  103. if !allowMethod(w, r.Method, "GET", "HEAD") {
  104. return
  105. }
  106. endpoints := h.peers.Endpoints()
  107. w.Write([]byte(strings.Join(endpoints, ", ")))
  108. }
  109. func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
  110. if !allowMethod(w, r.Method, "POST") {
  111. return
  112. }
  113. b, err := ioutil.ReadAll(r.Body)
  114. if err != nil {
  115. log.Println("etcdhttp: error reading raft message:", err)
  116. http.Error(w, "error reading raft message", http.StatusBadRequest)
  117. return
  118. }
  119. var m raftpb.Message
  120. if err := m.Unmarshal(b); err != nil {
  121. log.Println("etcdhttp: error unmarshaling raft message:", err)
  122. http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
  123. return
  124. }
  125. log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
  126. if err := h.server.Process(context.TODO(), m); err != nil {
  127. log.Println("etcdhttp: error processing raft message:", err)
  128. writeError(w, err)
  129. return
  130. }
  131. w.WriteHeader(http.StatusNoContent)
  132. }
  133. // genID generates a random id that is: n < 0 < n.
  134. func genID() int64 {
  135. for {
  136. b := make([]byte, 8)
  137. if _, err := io.ReadFull(crand.Reader, b); err != nil {
  138. panic(err) // really bad stuff happened
  139. }
  140. n := int64(binary.BigEndian.Uint64(b))
  141. if n != 0 {
  142. return n
  143. }
  144. }
  145. }
  146. // parseRequest converts a received http.Request to a server Request,
  147. // performing validation of supplied fields as appropriate.
  148. // If any validation fails, an empty Request and non-nil error is returned.
  149. func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
  150. emptyReq := etcdserverpb.Request{}
  151. err := r.ParseForm()
  152. if err != nil {
  153. return emptyReq, etcdErr.NewRequestError(
  154. etcdErr.EcodeInvalidForm,
  155. err.Error(),
  156. )
  157. }
  158. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  159. return emptyReq, etcdErr.NewRequestError(
  160. etcdErr.EcodeInvalidForm,
  161. "incorrect key prefix",
  162. )
  163. }
  164. p := r.URL.Path[len(keysPrefix):]
  165. var pIdx, wIdx, ttl uint64
  166. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  167. return emptyReq, etcdErr.NewRequestError(
  168. etcdErr.EcodeIndexNaN,
  169. `invalid value for "prevIndex"`,
  170. )
  171. }
  172. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  173. return emptyReq, etcdErr.NewRequestError(
  174. etcdErr.EcodeIndexNaN,
  175. `invalid value for "waitIndex"`,
  176. )
  177. }
  178. if ttl, err = getUint64(r.Form, "ttl"); err != nil {
  179. return emptyReq, etcdErr.NewRequestError(
  180. etcdErr.EcodeTTLNaN,
  181. `invalid value for "ttl"`,
  182. )
  183. }
  184. var rec, sort, wait bool
  185. if rec, err = getBool(r.Form, "recursive"); err != nil {
  186. return emptyReq, etcdErr.NewRequestError(
  187. etcdErr.EcodeInvalidField,
  188. `invalid value for "recursive"`,
  189. )
  190. }
  191. if sort, err = getBool(r.Form, "sorted"); err != nil {
  192. return emptyReq, etcdErr.NewRequestError(
  193. etcdErr.EcodeInvalidField,
  194. `invalid value for "sorted"`,
  195. )
  196. }
  197. if wait, err = getBool(r.Form, "wait"); err != nil {
  198. return emptyReq, etcdErr.NewRequestError(
  199. etcdErr.EcodeInvalidField,
  200. `invalid value for "wait"`,
  201. )
  202. }
  203. // prevExists is nullable, so leave it null if not specified
  204. var pe *bool
  205. if _, ok := r.Form["prevExists"]; ok {
  206. bv, err := getBool(r.Form, "prevExists")
  207. if err != nil {
  208. return emptyReq, etcdErr.NewRequestError(
  209. etcdErr.EcodeInvalidField,
  210. "invalid value for prevExists",
  211. )
  212. }
  213. pe = &bv
  214. }
  215. rr := etcdserverpb.Request{
  216. Id: id,
  217. Method: r.Method,
  218. Path: p,
  219. Val: r.FormValue("value"),
  220. PrevValue: r.FormValue("prevValue"),
  221. PrevIndex: pIdx,
  222. PrevExists: pe,
  223. Recursive: rec,
  224. Since: wIdx,
  225. Sorted: sort,
  226. Wait: wait,
  227. }
  228. if pe != nil {
  229. rr.PrevExists = pe
  230. }
  231. // TODO(jonboulle): use fake clock instead of time module
  232. // https://github.com/coreos/etcd/issues/1021
  233. if ttl > 0 {
  234. expr := time.Duration(ttl) * time.Second
  235. rr.Expiration = time.Now().Add(expr).UnixNano()
  236. }
  237. return rr, nil
  238. }
  239. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  240. // not exist in the form, 0 is returned. If the key exists but the value is
  241. // badly formed, an error is returned. If multiple values are present only the
  242. // first is considered.
  243. func getUint64(form url.Values, key string) (i uint64, err error) {
  244. if vals, ok := form[key]; ok {
  245. i, err = strconv.ParseUint(vals[0], 10, 64)
  246. }
  247. return
  248. }
  249. // getBool extracts a bool by the given key from a Form. If the key does not
  250. // exist in the form, false is returned. If the key exists but the value is
  251. // badly formed, an error is returned. If multiple values are present only the
  252. // first is considered.
  253. func getBool(form url.Values, key string) (b bool, err error) {
  254. if vals, ok := form[key]; ok {
  255. b, err = strconv.ParseBool(vals[0])
  256. }
  257. return
  258. }
  259. // writeError logs and writes the given Error to the ResponseWriter
  260. // If Error is an etcdErr, it is rendered to the ResponseWriter
  261. // Otherwise, it is assumed to be an InternalServerError
  262. func writeError(w http.ResponseWriter, err error) {
  263. if err == nil {
  264. return
  265. }
  266. log.Println(err)
  267. if e, ok := err.(*etcdErr.Error); ok {
  268. e.Write(w)
  269. } else {
  270. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  271. }
  272. }
  273. // writeEvent serializes the given Event and writes the resulting JSON to the
  274. // given ResponseWriter
  275. func writeEvent(w http.ResponseWriter, ev *store.Event) error {
  276. if ev == nil {
  277. return errors.New("cannot write empty Event!")
  278. }
  279. w.Header().Set("Content-Type", "application/json")
  280. w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.Index()))
  281. if ev.IsCreated() {
  282. w.WriteHeader(http.StatusCreated)
  283. }
  284. return json.NewEncoder(w).Encode(ev)
  285. }
  286. // waitForEvent waits for a given Watcher to return its associated
  287. // event. It returns a non-nil error if the given Context times out
  288. // or the given ResponseWriter triggers a CloseNotify.
  289. func waitForEvent(ctx context.Context, w http.ResponseWriter, wa store.Watcher) (*store.Event, error) {
  290. // TODO(bmizerany): support streaming?
  291. defer wa.Remove()
  292. var nch <-chan bool
  293. if x, ok := w.(http.CloseNotifier); ok {
  294. nch = x.CloseNotify()
  295. }
  296. select {
  297. case ev := <-wa.EventChan():
  298. return ev, nil
  299. case <-nch:
  300. elog.TODO()
  301. return nil, errClosed
  302. case <-ctx.Done():
  303. return nil, ctx.Err()
  304. }
  305. }
  306. // allowMethod verifies that the given method is one of the allowed methods,
  307. // and if not, it writes an error to w. A boolean is returned indicating
  308. // whether or not the method is allowed.
  309. func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
  310. for _, meth := range ms {
  311. if m == meth {
  312. return true
  313. }
  314. }
  315. w.Header().Set("Allow", strings.Join(ms, ","))
  316. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  317. return false
  318. }