http.go 11 KB


  1. package etcdhttp
  2. import (
  3. "encoding/json"
  4. "errors"
  5. "fmt"
  6. "io/ioutil"
  7. "log"
  8. "net/http"
  9. "net/url"
  10. "strconv"
  11. "strings"
  12. "time"
  13. etcdErr "github.com/coreos/etcd/error"
  14. "github.com/coreos/etcd/etcdserver"
  15. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  16. "github.com/coreos/etcd/raft/raftpb"
  17. "github.com/coreos/etcd/store"
  18. "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
  19. )
  20. const (
  21. keysPrefix = "/v2/keys"
  22. machinesPrefix = "/v2/machines"
  23. raftPrefix = "/raft"
  24. // time to wait for response from EtcdServer requests
  25. defaultServerTimeout = 500 * time.Millisecond
  26. // time to wait for a Watch request
  27. defaultWatchTimeout = 5 * time.Minute
  28. )
  29. var errClosed = errors.New("etcdhttp: client closed connection")
  30. // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
  31. func NewClientHandler(server *etcdserver.EtcdServer, timeout time.Duration) http.Handler {
  32. sh := &serverHandler{
  33. server: server,
  34. clusterStore: server.ClusterStore,
  35. timer: server,
  36. timeout: timeout,
  37. }
  38. if sh.timeout == 0 {
  39. sh.timeout = defaultServerTimeout
  40. }
  41. mux := http.NewServeMux()
  42. mux.HandleFunc(keysPrefix, sh.serveKeys)
  43. mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
  44. // TODO: dynamic configuration may make this outdated. take care of it.
  45. // TODO: dynamic configuration may introduce race also.
  46. mux.HandleFunc(machinesPrefix, sh.serveMachines)
  47. mux.HandleFunc("/", http.NotFound)
  48. return mux
  49. }
  50. // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
  51. func NewPeerHandler(server etcdserver.Server) http.Handler {
  52. sh := &serverHandler{
  53. server: server,
  54. }
  55. mux := http.NewServeMux()
  56. mux.HandleFunc(raftPrefix, sh.serveRaft)
  57. mux.HandleFunc("/", http.NotFound)
  58. return mux
  59. }
  60. // serverHandler provides http.Handlers for etcd client and raft communication.
  61. type serverHandler struct {
  62. timeout time.Duration
  63. server etcdserver.Server
  64. timer etcdserver.RaftTimer
  65. clusterStore etcdserver.ClusterStore
  66. }
  67. func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
  68. if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
  69. return
  70. }
  71. ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
  72. defer cancel()
  73. rr, err := parseRequest(r, etcdserver.GenID())
  74. if err != nil {
  75. writeError(w, err)
  76. return
  77. }
  78. resp, err := h.server.Do(ctx, rr)
  79. if err != nil {
  80. writeError(w, err)
  81. return
  82. }
  83. switch {
  84. case resp.Event != nil:
  85. if err := writeEvent(w, resp.Event, h.timer); err != nil {
  86. // Should never be reached
  87. log.Printf("error writing event: %v", err)
  88. }
  89. case resp.Watcher != nil:
  90. ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
  91. defer cancel()
  92. handleWatch(ctx, w, resp.Watcher, rr.Stream, h.timer)
  93. default:
  94. writeError(w, errors.New("received response with no Event/Watcher!"))
  95. }
  96. }
  97. // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
  98. // TODO: rethink the format of machine list because it is not json format.
  99. func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
  100. if !allowMethod(w, r.Method, "GET", "HEAD") {
  101. return
  102. }
  103. endpoints := h.clusterStore.Get().ClientURLs()
  104. w.Write([]byte(strings.Join(endpoints, ", ")))
  105. }
  106. func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
  107. if !allowMethod(w, r.Method, "POST") {
  108. return
  109. }
  110. b, err := ioutil.ReadAll(r.Body)
  111. if err != nil {
  112. log.Println("etcdhttp: error reading raft message:", err)
  113. http.Error(w, "error reading raft message", http.StatusBadRequest)
  114. return
  115. }
  116. var m raftpb.Message
  117. if err := m.Unmarshal(b); err != nil {
  118. log.Println("etcdhttp: error unmarshaling raft message:", err)
  119. http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
  120. return
  121. }
  122. log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
  123. if err := h.server.Process(context.TODO(), m); err != nil {
  124. log.Println("etcdhttp: error processing raft message:", err)
  125. writeError(w, err)
  126. return
  127. }
  128. w.WriteHeader(http.StatusNoContent)
  129. }
  130. // parseRequest converts a received http.Request to a server Request,
  131. // performing validation of supplied fields as appropriate.
  132. // If any validation fails, an empty Request and non-nil error is returned.
  133. func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
  134. emptyReq := etcdserverpb.Request{}
  135. err := r.ParseForm()
  136. if err != nil {
  137. return emptyReq, etcdErr.NewRequestError(
  138. etcdErr.EcodeInvalidForm,
  139. err.Error(),
  140. )
  141. }
  142. if !strings.HasPrefix(r.URL.Path, keysPrefix) {
  143. return emptyReq, etcdErr.NewRequestError(
  144. etcdErr.EcodeInvalidForm,
  145. "incorrect key prefix",
  146. )
  147. }
  148. p := r.URL.Path[len(keysPrefix):]
  149. var pIdx, wIdx uint64
  150. if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
  151. return emptyReq, etcdErr.NewRequestError(
  152. etcdErr.EcodeIndexNaN,
  153. `invalid value for "prevIndex"`,
  154. )
  155. }
  156. if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
  157. return emptyReq, etcdErr.NewRequestError(
  158. etcdErr.EcodeIndexNaN,
  159. `invalid value for "waitIndex"`,
  160. )
  161. }
  162. var rec, sort, wait, dir, stream bool
  163. if rec, err = getBool(r.Form, "recursive"); err != nil {
  164. return emptyReq, etcdErr.NewRequestError(
  165. etcdErr.EcodeInvalidField,
  166. `invalid value for "recursive"`,
  167. )
  168. }
  169. if sort, err = getBool(r.Form, "sorted"); err != nil {
  170. return emptyReq, etcdErr.NewRequestError(
  171. etcdErr.EcodeInvalidField,
  172. `invalid value for "sorted"`,
  173. )
  174. }
  175. if wait, err = getBool(r.Form, "wait"); err != nil {
  176. return emptyReq, etcdErr.NewRequestError(
  177. etcdErr.EcodeInvalidField,
  178. `invalid value for "wait"`,
  179. )
  180. }
  181. // TODO(jonboulle): define what parameters dir is/isn't compatible with?
  182. if dir, err = getBool(r.Form, "dir"); err != nil {
  183. return emptyReq, etcdErr.NewRequestError(
  184. etcdErr.EcodeInvalidField,
  185. `invalid value for "dir"`,
  186. )
  187. }
  188. if stream, err = getBool(r.Form, "stream"); err != nil {
  189. return emptyReq, etcdErr.NewRequestError(
  190. etcdErr.EcodeInvalidField,
  191. `invalid value for "stream"`,
  192. )
  193. }
  194. if wait && r.Method != "GET" {
  195. return emptyReq, etcdErr.NewRequestError(
  196. etcdErr.EcodeInvalidField,
  197. `"wait" can only be used with GET requests`,
  198. )
  199. }
  200. pV := r.FormValue("prevValue")
  201. if _, ok := r.Form["prevValue"]; ok && pV == "" {
  202. return emptyReq, etcdErr.NewRequestError(
  203. etcdErr.EcodeInvalidField,
  204. `"prevValue" cannot be empty`,
  205. )
  206. }
  207. // TTL is nullable, so leave it null if not specified
  208. // or an empty string
  209. var ttl *uint64
  210. if len(r.FormValue("ttl")) > 0 {
  211. i, err := getUint64(r.Form, "ttl")
  212. if err != nil {
  213. return emptyReq, etcdErr.NewRequestError(
  214. etcdErr.EcodeTTLNaN,
  215. `invalid value for "ttl"`,
  216. )
  217. }
  218. ttl = &i
  219. }
  220. // prevExist is nullable, so leave it null if not specified
  221. var pe *bool
  222. if _, ok := r.Form["prevExist"]; ok {
  223. bv, err := getBool(r.Form, "prevExist")
  224. if err != nil {
  225. return emptyReq, etcdErr.NewRequestError(
  226. etcdErr.EcodeInvalidField,
  227. "invalid value for prevExist",
  228. )
  229. }
  230. pe = &bv
  231. }
  232. rr := etcdserverpb.Request{
  233. ID: id,
  234. Method: r.Method,
  235. Path: p,
  236. Val: r.FormValue("value"),
  237. Dir: dir,
  238. PrevValue: pV,
  239. PrevIndex: pIdx,
  240. PrevExist: pe,
  241. Recursive: rec,
  242. Since: wIdx,
  243. Sorted: sort,
  244. Stream: stream,
  245. Wait: wait,
  246. }
  247. if pe != nil {
  248. rr.PrevExist = pe
  249. }
  250. // Null TTL is equivalent to unset Expiration
  251. // TODO(jonboulle): use fake clock instead of time module
  252. // https://github.com/coreos/etcd/issues/1021
  253. if ttl != nil {
  254. expr := time.Duration(*ttl) * time.Second
  255. rr.Expiration = time.Now().Add(expr).UnixNano()
  256. }
  257. return rr, nil
  258. }
  259. // getUint64 extracts a uint64 by the given key from a Form. If the key does
  260. // not exist in the form, 0 is returned. If the key exists but the value is
  261. // badly formed, an error is returned. If multiple values are present only the
  262. // first is considered.
  263. func getUint64(form url.Values, key string) (i uint64, err error) {
  264. if vals, ok := form[key]; ok {
  265. i, err = strconv.ParseUint(vals[0], 10, 64)
  266. }
  267. return
  268. }
  269. // getBool extracts a bool by the given key from a Form. If the key does not
  270. // exist in the form, false is returned. If the key exists but the value is
  271. // badly formed, an error is returned. If multiple values are present only the
  272. // first is considered.
  273. func getBool(form url.Values, key string) (b bool, err error) {
  274. if vals, ok := form[key]; ok {
  275. b, err = strconv.ParseBool(vals[0])
  276. }
  277. return
  278. }
  279. // writeError logs and writes the given Error to the ResponseWriter
  280. // If Error is an etcdErr, it is rendered to the ResponseWriter
  281. // Otherwise, it is assumed to be an InternalServerError
  282. func writeError(w http.ResponseWriter, err error) {
  283. if err == nil {
  284. return
  285. }
  286. log.Println(err)
  287. if e, ok := err.(*etcdErr.Error); ok {
  288. e.Write(w)
  289. } else {
  290. http.Error(w, "Internal Server Error", http.StatusInternalServerError)
  291. }
  292. }
  293. // writeEvent serializes a single Event and writes the resulting
  294. // JSON to the given ResponseWriter, along with the appropriate
  295. // headers
  296. func writeEvent(w http.ResponseWriter, ev *store.Event, rt etcdserver.RaftTimer) error {
  297. if ev == nil {
  298. return errors.New("cannot write empty Event!")
  299. }
  300. w.Header().Set("Content-Type", "application/json")
  301. w.Header().Set("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
  302. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  303. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  304. if ev.IsCreated() {
  305. w.WriteHeader(http.StatusCreated)
  306. }
  307. return json.NewEncoder(w).Encode(ev)
  308. }
  309. func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool, rt etcdserver.RaftTimer) {
  310. defer wa.Remove()
  311. ech := wa.EventChan()
  312. var nch <-chan bool
  313. if x, ok := w.(http.CloseNotifier); ok {
  314. nch = x.CloseNotify()
  315. }
  316. w.Header().Set("Content-Type", "application/json")
  317. w.Header().Set("X-Etcd-Index", fmt.Sprint(wa.StartIndex()))
  318. w.Header().Set("X-Raft-Index", fmt.Sprint(rt.Index()))
  319. w.Header().Set("X-Raft-Term", fmt.Sprint(rt.Term()))
  320. w.WriteHeader(http.StatusOK)
  321. // Ensure headers are flushed early, in case of long polling
  322. w.(http.Flusher).Flush()
  323. for {
  324. select {
  325. case <-nch:
  326. // Client closed connection. Nothing to do.
  327. return
  328. case <-ctx.Done():
  329. // Timed out. net/http will close the connection for us, so nothing to do.
  330. return
  331. case ev, ok := <-ech:
  332. if !ok {
  333. // If the channel is closed this may be an indication of
  334. // that notifications are much more than we are able to
  335. // send to the client in time. Then we simply end streaming.
  336. return
  337. }
  338. if err := json.NewEncoder(w).Encode(ev); err != nil {
  339. // Should never be reached
  340. log.Println("error writing event: %v", err)
  341. return
  342. }
  343. if !stream {
  344. return
  345. }
  346. w.(http.Flusher).Flush()
  347. }
  348. }
  349. }
  350. // allowMethod verifies that the given method is one of the allowed methods,
  351. // and if not, it writes an error to w. A boolean is returned indicating
  352. // whether or not the method is allowed.
  353. func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
  354. for _, meth := range ms {
  355. if m == meth {
  356. return true
  357. }
  358. }
  359. w.Header().Set("Allow", strings.Join(ms, ","))
  360. http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
  361. return false
  362. }