| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396 |
- package etcdhttp
- import (
- "encoding/json"
- "errors"
- "fmt"
- "io/ioutil"
- "log"
- "net/http"
- "net/url"
- "strconv"
- "strings"
- "time"
- etcdErr "github.com/coreos/etcd/error"
- "github.com/coreos/etcd/etcdserver"
- "github.com/coreos/etcd/etcdserver/etcdserverpb"
- "github.com/coreos/etcd/raft/raftpb"
- "github.com/coreos/etcd/store"
- "github.com/coreos/etcd/third_party/code.google.com/p/go.net/context"
- )
- const (
- keysPrefix = "/v2/keys"
- machinesPrefix = "/v2/machines"
- raftPrefix = "/raft"
- // time to wait for response from EtcdServer requests
- defaultServerTimeout = 500 * time.Millisecond
- // time to wait for a Watch request
- defaultWatchTimeout = 5 * time.Minute
- )
- var errClosed = errors.New("etcdhttp: client closed connection")
- // NewClientHandler generates a muxed http.Handler with the given parameters to serve etcd client requests.
- func NewClientHandler(server etcdserver.Server, peers Peers, timeout time.Duration) http.Handler {
- sh := &serverHandler{
- server: server,
- peers: peers,
- timeout: timeout,
- }
- if sh.timeout == 0 {
- sh.timeout = defaultServerTimeout
- }
- mux := http.NewServeMux()
- mux.HandleFunc(keysPrefix, sh.serveKeys)
- mux.HandleFunc(keysPrefix+"/", sh.serveKeys)
- // TODO: dynamic configuration may make this outdated. take care of it.
- // TODO: dynamic configuration may introduce race also.
- mux.HandleFunc(machinesPrefix, sh.serveMachines)
- mux.HandleFunc("/", http.NotFound)
- return mux
- }
- // NewPeerHandler generates an http.Handler to handle etcd peer (raft) requests.
- func NewPeerHandler(server etcdserver.Server) http.Handler {
- sh := &serverHandler{
- server: server,
- }
- mux := http.NewServeMux()
- mux.HandleFunc(raftPrefix, sh.serveRaft)
- mux.HandleFunc("/", http.NotFound)
- return mux
- }
- // serverHandler provides http.Handlers for etcd client and raft communication.
- type serverHandler struct {
- timeout time.Duration
- server etcdserver.Server
- peers Peers
- }
- func (h serverHandler) serveKeys(w http.ResponseWriter, r *http.Request) {
- if !allowMethod(w, r.Method, "GET", "PUT", "POST", "DELETE") {
- return
- }
- ctx, cancel := context.WithTimeout(context.Background(), h.timeout)
- defer cancel()
- rr, err := parseRequest(r, etcdserver.GenID())
- if err != nil {
- writeError(w, err)
- return
- }
- resp, err := h.server.Do(ctx, rr)
- if err != nil {
- writeError(w, err)
- return
- }
- switch {
- case resp.Event != nil:
- if err := writeEvent(w, resp.Event); err != nil {
- // Should never be reached
- log.Println("error writing event: %v", err)
- }
- case resp.Watcher != nil:
- ctx, cancel := context.WithTimeout(context.Background(), defaultWatchTimeout)
- defer cancel()
- handleWatch(ctx, w, resp.Watcher, rr.Stream)
- default:
- writeError(w, errors.New("received response with no Event/Watcher!"))
- }
- }
- // serveMachines responds address list in the format '0.0.0.0, 1.1.1.1'.
- // TODO: rethink the format of machine list because it is not json format.
- func (h serverHandler) serveMachines(w http.ResponseWriter, r *http.Request) {
- if !allowMethod(w, r.Method, "GET", "HEAD") {
- return
- }
- endpoints := h.peers.Endpoints()
- w.Write([]byte(strings.Join(endpoints, ", ")))
- }
- func (h serverHandler) serveRaft(w http.ResponseWriter, r *http.Request) {
- if !allowMethod(w, r.Method, "POST") {
- return
- }
- b, err := ioutil.ReadAll(r.Body)
- if err != nil {
- log.Println("etcdhttp: error reading raft message:", err)
- http.Error(w, "error reading raft message", http.StatusBadRequest)
- return
- }
- var m raftpb.Message
- if err := m.Unmarshal(b); err != nil {
- log.Println("etcdhttp: error unmarshaling raft message:", err)
- http.Error(w, "error unmarshaling raft message", http.StatusBadRequest)
- return
- }
- log.Printf("etcdhttp: raft recv message from %#x: %+v", m.From, m)
- if err := h.server.Process(context.TODO(), m); err != nil {
- log.Println("etcdhttp: error processing raft message:", err)
- writeError(w, err)
- return
- }
- w.WriteHeader(http.StatusNoContent)
- }
- // parseRequest converts a received http.Request to a server Request,
- // performing validation of supplied fields as appropriate.
- // If any validation fails, an empty Request and non-nil error is returned.
- func parseRequest(r *http.Request, id int64) (etcdserverpb.Request, error) {
- emptyReq := etcdserverpb.Request{}
- err := r.ParseForm()
- if err != nil {
- return emptyReq, etcdErr.NewRequestError(
- etcdErr.EcodeInvalidForm,
- err.Error(),
- )
- }
- if !strings.HasPrefix(r.URL.Path, keysPrefix) {
- return emptyReq, etcdErr.NewRequestError(
- etcdErr.EcodeInvalidForm,
- "incorrect key prefix",
- )
- }
- p := r.URL.Path[len(keysPrefix):]
- var pIdx, wIdx uint64
- if pIdx, err = getUint64(r.Form, "prevIndex"); err != nil {
- return emptyReq, etcdErr.NewRequestError(
- etcdErr.EcodeIndexNaN,
- `invalid value for "prevIndex"`,
- )
- }
- if wIdx, err = getUint64(r.Form, "waitIndex"); err != nil {
- return emptyReq, etcdErr.NewRequestError(
- etcdErr.EcodeIndexNaN,
- `invalid value for "waitIndex"`,
- )
- }
- var rec, sort, wait, dir, stream bool
- if rec, err = getBool(r.Form, "recursive"); err != nil {
- return emptyReq, etcdErr.NewRequestError(
- etcdErr.EcodeInvalidField,
- `invalid value for "recursive"`,
- )
- }
- if sort, err = getBool(r.Form, "sorted"); err != nil {
- return emptyReq, etcdErr.NewRequestError(
- etcdErr.EcodeInvalidField,
- `invalid value for "sorted"`,
- )
- }
- if wait, err = getBool(r.Form, "wait"); err != nil {
- return emptyReq, etcdErr.NewRequestError(
- etcdErr.EcodeInvalidField,
- `invalid value for "wait"`,
- )
- }
- // TODO(jonboulle): define what parameters dir is/isn't compatible with?
- if dir, err = getBool(r.Form, "dir"); err != nil {
- return emptyReq, etcdErr.NewRequestError(
- etcdErr.EcodeInvalidField,
- `invalid value for "dir"`,
- )
- }
- if stream, err = getBool(r.Form, "stream"); err != nil {
- return emptyReq, etcdErr.NewRequestError(
- etcdErr.EcodeInvalidField,
- `invalid value for "stream"`,
- )
- }
- if wait && r.Method != "GET" {
- return emptyReq, etcdErr.NewRequestError(
- etcdErr.EcodeInvalidField,
- `"wait" can only be used with GET requests`,
- )
- }
- pV := r.FormValue("prevValue")
- if _, ok := r.Form["prevValue"]; ok && pV == "" {
- return emptyReq, etcdErr.NewRequestError(
- etcdErr.EcodeInvalidField,
- `"prevValue" cannot be empty`,
- )
- }
- // TTL is nullable, so leave it null if not specified
- // or an empty string
- var ttl *uint64
- if len(r.FormValue("ttl")) > 0 {
- i, err := getUint64(r.Form, "ttl")
- if err != nil {
- return emptyReq, etcdErr.NewRequestError(
- etcdErr.EcodeTTLNaN,
- `invalid value for "ttl"`,
- )
- }
- ttl = &i
- }
- // prevExist is nullable, so leave it null if not specified
- var pe *bool
- if _, ok := r.Form["prevExist"]; ok {
- bv, err := getBool(r.Form, "prevExist")
- if err != nil {
- return emptyReq, etcdErr.NewRequestError(
- etcdErr.EcodeInvalidField,
- "invalid value for prevExist",
- )
- }
- pe = &bv
- }
- rr := etcdserverpb.Request{
- Id: id,
- Method: r.Method,
- Path: p,
- Val: r.FormValue("value"),
- Dir: dir,
- PrevValue: pV,
- PrevIndex: pIdx,
- PrevExist: pe,
- Recursive: rec,
- Since: wIdx,
- Sorted: sort,
- Stream: stream,
- Wait: wait,
- }
- if pe != nil {
- rr.PrevExist = pe
- }
- // Null TTL is equivalent to unset Expiration
- // TODO(jonboulle): use fake clock instead of time module
- // https://github.com/coreos/etcd/issues/1021
- if ttl != nil {
- expr := time.Duration(*ttl) * time.Second
- rr.Expiration = time.Now().Add(expr).UnixNano()
- }
- return rr, nil
- }
- // getUint64 extracts a uint64 by the given key from a Form. If the key does
- // not exist in the form, 0 is returned. If the key exists but the value is
- // badly formed, an error is returned. If multiple values are present only the
- // first is considered.
- func getUint64(form url.Values, key string) (i uint64, err error) {
- if vals, ok := form[key]; ok {
- i, err = strconv.ParseUint(vals[0], 10, 64)
- }
- return
- }
- // getBool extracts a bool by the given key from a Form. If the key does not
- // exist in the form, false is returned. If the key exists but the value is
- // badly formed, an error is returned. If multiple values are present only the
- // first is considered.
- func getBool(form url.Values, key string) (b bool, err error) {
- if vals, ok := form[key]; ok {
- b, err = strconv.ParseBool(vals[0])
- }
- return
- }
- // writeError logs and writes the given Error to the ResponseWriter
- // If Error is an etcdErr, it is rendered to the ResponseWriter
- // Otherwise, it is assumed to be an InternalServerError
- func writeError(w http.ResponseWriter, err error) {
- if err == nil {
- return
- }
- log.Println(err)
- if e, ok := err.(*etcdErr.Error); ok {
- e.Write(w)
- } else {
- http.Error(w, "Internal Server Error", http.StatusInternalServerError)
- }
- }
- // writeEvent serializes a single Event and writes the resulting
- // JSON to the given ResponseWriter, along with the appropriate
- // headers
- func writeEvent(w http.ResponseWriter, ev *store.Event) error {
- if ev == nil {
- return errors.New("cannot write empty Event!")
- }
- w.Header().Set("Content-Type", "application/json")
- w.Header().Add("X-Etcd-Index", fmt.Sprint(ev.EtcdIndex))
- if ev.IsCreated() {
- w.WriteHeader(http.StatusCreated)
- }
- return json.NewEncoder(w).Encode(ev)
- }
- func handleWatch(ctx context.Context, w http.ResponseWriter, wa store.Watcher, stream bool) {
- defer wa.Remove()
- ech := wa.EventChan()
- var nch <-chan bool
- if x, ok := w.(http.CloseNotifier); ok {
- nch = x.CloseNotify()
- }
- w.Header().Set("Content-Type", "application/json")
- w.WriteHeader(http.StatusOK)
- // Ensure headers are flushed early, in case of long polling
- w.(http.Flusher).Flush()
- for {
- select {
- case <-nch:
- // Client closed connection. Nothing to do.
- return
- case <-ctx.Done():
- // Timed out. net/http will close the connection for us, so nothing to do.
- return
- case ev, ok := <-ech:
- if !ok {
- // If the channel is closed this may be an indication of
- // that notifications are much more than we are able to
- // send to the client in time. Then we simply end streaming.
- return
- }
- if err := json.NewEncoder(w).Encode(ev); err != nil {
- // Should never be reached
- log.Println("error writing event: %v", err)
- return
- }
- if !stream {
- return
- }
- w.(http.Flusher).Flush()
- }
- }
- }
- // allowMethod verifies that the given method is one of the allowed methods,
- // and if not, it writes an error to w. A boolean is returned indicating
- // whether or not the method is allowed.
- func allowMethod(w http.ResponseWriter, m string, ms ...string) bool {
- for _, meth := range ms {
- if m == meth {
- return true
- }
- }
- w.Header().Set("Allow", strings.Join(ms, ","))
- http.Error(w, "Method Not Allowed", http.StatusMethodNotAllowed)
- return false
- }
|