http.go 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. package etcdhttp
  2. import (
  3. "encoding/binary"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "io"
  8. "io/ioutil"
  9. "log"
  10. "net/http"
  11. "net/url"
  12. "strconv"
  13. "strings"
  14. "time"
  15. crand "crypto/rand"
  16. "github.com/coreos/etcd/elog"
  17. etcdErr "github.com/coreos/etcd/error"
  18. "github.com/coreos/etcd/etcdserver"
  19. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  20. "github.com/coreos/etcd/raft/raftpb"
  21. "github.com/coreos/etcd/store"
  22. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  23. )
  24. const (
  25. keysPrefix = "/v2/keys"
  26. machinesPrefix = "/v2/machines"
  27. DefaultTimeout = 500 * time.Millisecond
  28. )
  29. var errClosed = errors.New("etcdhttp: client closed connection")
  30. // Handler implements the http.Handler interface and serves etcd client and
  31. // raft communication.
  32. type Handler struct {
  33. Timeout time.Duration
  34. Server *etcdserver.Server
  35. // TODO: dynamic configuration may make this outdated. take care of it.
  36. // TODO: dynamic configuration may introduce race also.
  37. Peers Peers
  38. }
  39. func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  40. // TODO: set read/write timeout?
  41. timeout := h.Timeout
  42. if timeout == 0 {
  43. timeout = DefaultTimeout
  44. }
  45. ctx, cancel := context.WithTimeout(context.Background(), timeout)
  46. defer cancel()
  47. switch {
  48. case strings.HasPrefix(r.URL.Path, "/raft"):
  49. h.serveRaft(ctx, w, r)
  50. case strings.HasPrefix(r.URL.Path, keysPrefix):
  51. h.serveKeys(ctx, w, r)
  52. case strings.HasPrefix(r.URL.Path, machinesPrefix):
  53. h.serveMachines(w, r)
  54. default:
  55. http.NotFound(w, r)
  56. }
  57. }
  58. func (h Handler) serveKeys(ctx context.Context, w http.ResponseWriter, r *http.Request) {
  59. rr, err := parseRequest(r, genID())
  60. if err != nil {
  61. writeError(w, err)
  62. return
  63. }
  64. resp, err := h.Server.Do(ctx, rr)
  65. if err != nil {
  66. writeError(w, err)
  67. return
  68. }
  69. var ev *store.Event
  70. switch {
  71. case resp.Event != nil:
  72. ev = resp.Event
  73. case resp.Watcher != nil:
  74. if ev, err = waitForEvent(ctx, w, resp.Watcher); err != nil {
  75. http.Error(w, err.Error(), http.StatusGatewayTimeout)
  76. return
  77. }
  78. default:
  79. writeError(w, errors.New("received response with no Event/Watcher!"))
  80. return
  81. }
  82. if err = writeEvent(w, ev); err != nil {
  83. // Should never be reached
  84. log.Println("error writing event: %v", err)
  85. }
  86. }
  87. // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
  88. // TODO: rethink the format of machine list because it is not json format.
  89. func (h Handler) serveMachines(w http.ResponseWriter, r *http.Request) {
  90. if r.Method != "GET" && r.Method != "HEAD" {
  91. allow(w, "GET", "HEAD")
  92. return
  93. }
  94. endpoints := h.Peers.Endpoints()
  95. w.Write([]byte(strings.Join(endpoints, ", ")))
  96. }
  97. func (h Handler) serveRaft(ctx context.Context, w http.ResponseWriter, r *http.Request) {
  98. b, err := ioutil.ReadAll(r.Body)
  99. if err != nil {
  100. log.Println("etcdhttp: error reading raft message:", err)
  101. }
  102. var m raftpb.Message
  103. if err := m.Unmarshal(b); err != nil {
  104. log.Println("etcdhttp: error unmarshaling raft message:", err)
  105. }
  106. log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
  107. if err := h.Server.Node.Step(ctx, m); err != nil {
  108. log.Println("etcdhttp: error stepping raft messages:", err)
  109. }
  110. }
  111. // genID generates a random id that is: n < 0 < n.
  112. func genID() int64 {
  113. for {
  114. b := make([]byte, 8)
  115. if _, err := io.ReadFull(crand.Reader, b); err != nil {
  116. panic(err) // really bad stuff happened
  117. }
  118. n := int64(binary.BigEndian.Uint64(b))
  119. if n != 0 {
  120. return n
  121. }
  122. }
  123. }
  124. // parseRequest converts a received http.Request to a server Request,
  125. // performing validation of supplied fields as appropriate.
  126. // If any validation fails, an empty Request and non-nil error is returned.
  127. func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
  128. emptyReq := etcdserverpb.Request{}
  129. err := r.ParseForm()
  130. if err != nil {
  131. return emptyReq, etcdErr.NewRequestError(
  132. etcdErr.EcodeInvalidForm,
  133. err.Error(),
  134. )
  135. }
  136. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  137. return emptyReq, etcdErr.NewRequestError(
  138. etcdErr.EcodeInvalidForm,
  139. "incorrect key prefix",
  140. )
  141. }
  142. p := r.URL.Path[len(keysPrefix):]
  143. var pIdx, wIdx, ttl uint64
  144. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  145. return emptyReq, etcdErr.NewRequestError(
  146. etcdErr.EcodeIndexNaN,
  147. `invalid value for "prevIndex"`,
  148. )
  149. }
  150. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  151. return emptyReq, etcdErr.NewRequestError(
  152. etcdErr.EcodeIndexNaN,
  153. `invalid value for "waitIndex"`,
  154. )
  155. }
  156. if ttl, err = getUint64(r.Form, "ttl"); err != nil {
  157. return emptyReq, etcdErr.NewRequestError(
  158. etcdErr.EcodeTTLNaN,
  159. `invalid value for "ttl"`,
  160. )
  161. }
  162. var rec, sort, wait bool
  163. if rec, err = getBool(r.Form, "recursive"); err != nil {
  164. return emptyReq, etcdErr.NewRequestError(
  165. etcdErr.EcodeInvalidField,
  166. `invalid value for "recursive"`,
  167. )
  168. }
  169. if sort, err = getBool(r.Form, "sorted"); err != nil {
  170. return emptyReq, etcdErr.NewRequestError(
  171. etcdErr.EcodeInvalidField,
  172. `invalid value for "sorted"`,
  173. )
  174. }
  175. if wait, err = getBool(r.Form, "wait"); err != nil {
  176. return emptyReq, etcdErr.NewRequestError(
  177. etcdErr.EcodeInvalidField,
  178. `invalid value for "wait"`,
  179. )
  180. }
  181. // prevExists is nullable, so leave it null if not specified
  182. var pe *bool
  183. if _, ok := r.Form["prevExists"]; ok {
  184. bv, err := getBool(r.Form, "prevExists")
  185. if err != nil {
  186. return emptyReq, etcdErr.NewRequestError(
  187. etcdErr.EcodeInvalidField,
  188. "invalid value for prevExists",
  189. )
  190. }
  191. pe = &bv
  192. }
  193. rr := etcdserverpb.Request{
  194. Id: id,
  195. Method: r.Method,
  196. Path: p,
  197. Val: r.FormValue("value"),
  198. PrevValue: r.FormValue("prevValue"),
  199. PrevIndex: pIdx,
  200. PrevExists: pe,
  201. Recursive: rec,
  202. Since: wIdx,
  203. Sorted: sort,
  204. Wait: wait,
  205. }
  206. if pe != nil {
  207. rr.PrevExists = pe
  208. }
  209. if ttl > 0 {
  210. expr := time.Duration(ttl) * time.Second
  211. // TODO(jonboulle): use fake clock instead of time module
  212. // https://github.com/coreos/etcd/issues/1021
  213. rr.Expiration = time.Now().Add(expr).UnixNano()
  214. }
  215. return rr, nil
  216. }
  217. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  218. // not exist in the form, 0 is returned. If the key exists but the value is
  219. // badly formed, an error is returned. If multiple values are present only the
  220. // first is considered.
  221. func getUint64(form url.Values, key string) (i uint64, err error) {
  222. if vals, ok := form[key]; ok {
  223. i, err = strconv.ParseUint(vals[0], 10, 64)
  224. }
  225. return
  226. }
  227. // getBool extracts a bool by the given key from a Form. If the key does not
  228. // exist in the form, false is returned. If the key exists but the value is
  229. // badly formed, an error is returned. If multiple values are present only the
  230. // first is considered.
  231. func getBool(form url.Values, key string) (b bool, err error) {
  232. if vals, ok := form[key]; ok {
  233. b, err = strconv.ParseBool(vals[0])
  234. }
  235. return
  236. }
  237. // writeError logs and writes the given Error to the ResponseWriter
  238. // If Error is an etcdErr, it is rendered to the ResponseWriter
  239. // Otherwise, it is assumed to be an InternalServerError
  240. func writeError(w http.ResponseWriter, err error) {
  241. if err == nil {
  242. return
  243. }
  244. log.Println(err)
  245. if e, ok := err.(*etcdErr.Error); ok {
  246. e.Write(w)
  247. } else {
  248. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  249. }
  250. }
  251. // writeEvent serializes the given Event and writes the resulting JSON to the
  252. // given ResponseWriter
  253. func writeEvent(w http.ResponseWriter, ev *store.Event) error {
  254. if ev == nil {
  255. return errors.New("cannot write empty Event!")
  256. }
  257. w.Header().Set("Content-Type", "application/json")
  258. w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.Index()))
  259. if ev.IsCreated() {
  260. w.WriteHeader(http.StatusCreated)
  261. }
  262. return json.NewEncoder(w).Encode(ev)
  263. }
  264. // waitForEvent waits for a given Watcher to return its associated
  265. // event. It returns a non-nil error if the given Context times out
  266. // or the given ResponseWriter triggers a CloseNotify.
  267. func waitForEvent(ctx context.Context, w http.ResponseWriter, wa store.Watcher) (*store.Event, error) {
  268. // TODO(bmizerany): support streaming?
  269. defer wa.Remove()
  270. var nch <-chan bool
  271. if x, ok := w.(http.CloseNotifier); ok {
  272. nch = x.CloseNotify()
  273. }
  274. select {
  275. case ev := <-wa.EventChan():
  276. return ev, nil
  277. case <-nch:
  278. elog.TODO()
  279. return nil, errClosed
  280. case <-ctx.Done():
  281. return nil, ctx.Err()
  282. }
  283. }
  284. // allow writes response for the case that Method Not Allowed
  285. func allow(w http.ResponseWriter, m ...string) {
  286. w.Header().Set("Allow", strings.Join(m, ","))
  287. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  288. }