| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- /*
- Copyright 2013 CoreOS Inc.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- package main
- import (
- "fmt"
- "net/http"
- "strconv"
- "strings"
- etcdErr "github.com/coreos/etcd/error"
- "github.com/coreos/etcd/store"
- "github.com/coreos/etcd/mod"
- "github.com/coreos/go-raft"
- )
- //-------------------------------------------------------------------
- // Handlers to handle etcd-store related request via etcd url
- //-------------------------------------------------------------------
- func NewEtcdMuxer() *http.ServeMux {
- // external commands
- etcdMux := http.NewServeMux()
- etcdMux.Handle("/"+version+"/keys/", errorHandler(Multiplexer))
- etcdMux.Handle("/"+version+"/watch/", errorHandler(WatchHttpHandler))
- etcdMux.Handle("/"+version+"/leader", errorHandler(LeaderHttpHandler))
- etcdMux.Handle("/"+version+"/machines", errorHandler(MachinesHttpHandler))
- etcdMux.Handle("/"+version+"/stats/", errorHandler(StatsHttpHandler))
- etcdMux.Handle("/version", errorHandler(VersionHttpHandler))
- etcdMux.HandleFunc("/test/", TestHttpHandler)
- // TODO: Use a mux in 0.2 that can handle this
- etcdMux.Handle("/etcd/mod/dashboard/", *mod.ServeMux)
- return etcdMux
- }
- type errorHandler func(http.ResponseWriter, *http.Request) error
- // addCorsHeader parses the request Origin header and loops through the user
- // provided allowed origins and sets the Access-Control-Allow-Origin header if
- // there is a match.
- func addCorsHeader(w http.ResponseWriter, r *http.Request) {
- val, ok := corsList["*"]
- if val && ok {
- w.Header().Add("Access-Control-Allow-Origin", "*")
- return
- }
- requestOrigin := r.Header.Get("Origin")
- val, ok = corsList[requestOrigin]
- if val && ok {
- w.Header().Add("Access-Control-Allow-Origin", requestOrigin)
- return
- }
- }
- func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- addCorsHeader(w, r)
- if e := fn(w, r); e != nil {
- if etcdErr, ok := e.(etcdErr.Error); ok {
- debug("Return error: ", etcdErr.Error())
- etcdErr.Write(w)
- } else {
- http.Error(w, e.Error(), http.StatusInternalServerError)
- }
- }
- }
- // Multiplex GET/POST/DELETE request to corresponding handlers
- func Multiplexer(w http.ResponseWriter, req *http.Request) error {
- switch req.Method {
- case "GET":
- return GetHttpHandler(w, req)
- case "POST":
- return SetHttpHandler(w, req)
- case "PUT":
- return SetHttpHandler(w, req)
- case "DELETE":
- return DeleteHttpHandler(w, req)
- default:
- w.WriteHeader(http.StatusMethodNotAllowed)
- return nil
- }
- }
- //--------------------------------------
- // State sensitive handlers
- // Set/Delete will dispatch to leader
- //--------------------------------------
- // Set Command Handler
- func SetHttpHandler(w http.ResponseWriter, req *http.Request) error {
- key := req.URL.Path[len("/v1/keys/"):]
- if store.CheckKeyword(key) {
- return etcdErr.NewError(400, "Set")
- }
- debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
- req.ParseForm()
- value := req.Form.Get("value")
- if len(value) == 0 {
- return etcdErr.NewError(200, "Set")
- }
- strDuration := req.Form.Get("ttl")
- expireTime, err := durationToExpireTime(strDuration)
- if err != nil {
- return etcdErr.NewError(202, "Set")
- }
- if prevValueArr, ok := req.Form["prevValue"]; ok && len(prevValueArr) > 0 {
- command := &TestAndSetCommand{
- Key: key,
- Value: value,
- PrevValue: prevValueArr[0],
- ExpireTime: expireTime,
- }
- return dispatch(command, w, req, true)
- } else {
- command := &SetCommand{
- Key: key,
- Value: value,
- ExpireTime: expireTime,
- }
- return dispatch(command, w, req, true)
- }
- }
- // Delete Handler
- func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
- key := req.URL.Path[len("/v1/keys/"):]
- debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
- command := &DeleteCommand{
- Key: key,
- }
- return dispatch(command, w, req, true)
- }
- // Dispatch the command to leader
- func dispatch(c Command, w http.ResponseWriter, req *http.Request, etcd bool) error {
- if r.State() == raft.Leader {
- if body, err := r.Do(c); err != nil {
- return err
- } else {
- if body == nil {
- return etcdErr.NewError(300, "Empty result from raft")
- } else {
- body, _ := body.([]byte)
- w.WriteHeader(http.StatusOK)
- w.Write(body)
- return nil
- }
- }
- } else {
- leader := r.Leader()
- // current no leader
- if leader == "" {
- return etcdErr.NewError(300, "")
- }
- redirect(leader, etcd, w, req)
- return nil
- }
- return etcdErr.NewError(300, "")
- }
- //--------------------------------------
- // State non-sensitive handlers
- // will not dispatch to leader
- // TODO: add sensitive version for these
- // command?
- //--------------------------------------
- // Handler to return the current leader's raft address
- func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
- leader := r.Leader()
- if leader != "" {
- w.WriteHeader(http.StatusOK)
- raftURL, _ := nameToRaftURL(leader)
- w.Write([]byte(raftURL))
- return nil
- } else {
- return etcdErr.NewError(301, "")
- }
- }
- // Handler to return all the known machines in the current cluster
- func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
- machines := getMachines(nameToEtcdURL)
- w.WriteHeader(http.StatusOK)
- w.Write([]byte(strings.Join(machines, ", ")))
- return nil
- }
- // Handler to return the current version of etcd
- func VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
- w.WriteHeader(http.StatusOK)
- fmt.Fprintf(w, "etcd %s", releaseVersion)
- return nil
- }
- // Handler to return the basic stats of etcd
- func StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
- option := req.URL.Path[len("/v1/stats/"):]
- switch option {
- case "self":
- w.WriteHeader(http.StatusOK)
- w.Write(r.Stats())
- case "leader":
- if r.State() == raft.Leader {
- w.Write(r.PeerStats())
- } else {
- leader := r.Leader()
- // current no leader
- if leader == "" {
- return etcdErr.NewError(300, "")
- }
- redirect(leader, true, w, req)
- }
- case "store":
- w.WriteHeader(http.StatusOK)
- w.Write(etcdStore.Stats())
- }
- return nil
- }
- // Get Handler
- func GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
- key := req.URL.Path[len("/v1/keys/"):]
- debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
- command := &GetCommand{
- Key: key,
- }
- if body, err := command.Apply(r.Server); err != nil {
- return err
- } else {
- body, _ := body.([]byte)
- w.WriteHeader(http.StatusOK)
- w.Write(body)
- return nil
- }
- }
- // Watch handler
- func WatchHttpHandler(w http.ResponseWriter, req *http.Request) error {
- key := req.URL.Path[len("/v1/watch/"):]
- command := &WatchCommand{
- Key: key,
- }
- if req.Method == "GET" {
- debugf("[recv] GET %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
- command.SinceIndex = 0
- } else if req.Method == "POST" {
- // watch from a specific index
- debugf("[recv] POST %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
- content := req.FormValue("index")
- sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
- if err != nil {
- return etcdErr.NewError(203, "Watch From Index")
- }
- command.SinceIndex = sinceIndex
- } else {
- w.WriteHeader(http.StatusMethodNotAllowed)
- return nil
- }
- if body, err := command.Apply(r.Server); err != nil {
- return etcdErr.NewError(500, key)
- } else {
- w.WriteHeader(http.StatusOK)
- body, _ := body.([]byte)
- w.Write(body)
- return nil
- }
- }
- // TestHandler
- func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
- testType := req.URL.Path[len("/test/"):]
- if testType == "speed" {
- directSet()
- w.WriteHeader(http.StatusOK)
- w.Write([]byte("speed test success"))
- return
- }
- w.WriteHeader(http.StatusBadRequest)
- }
|