| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364 |
- package main
- import (
- "encoding/json"
- "fmt"
- "net/http"
- "strconv"
- "strings"
- etcdErr "github.com/coreos/etcd/error"
- "github.com/coreos/etcd/store"
- "github.com/coreos/go-raft"
- )
- //-------------------------------------------------------------------
- // Handlers to handle etcd-store related request via etcd url
- //-------------------------------------------------------------------
- func NewEtcdMuxer() *http.ServeMux {
- // external commands
- router := mux.NewRouter()
- etcdMux.Handle("/v2/keys/", errorHandler(e.Multiplexer))
- etcdMux.Handle("/v2/leader", errorHandler(e.LeaderHttpHandler))
- etcdMux.Handle("/v2/machines", errorHandler(e.MachinesHttpHandler))
- etcdMux.Handle("/v2/stats/", errorHandler(e.StatsHttpHandler))
- etcdMux.Handle("/version", errorHandler(e.VersionHttpHandler))
- etcdMux.HandleFunc("/test/", TestHttpHandler)
- // backward support
- etcdMux.Handle("/v1/keys/", errorHandler(e.MultiplexerV1))
- etcdMux.Handle("/v1/leader", errorHandler(e.LeaderHttpHandler))
- etcdMux.Handle("/v1/machines", errorHandler(e.MachinesHttpHandler))
- etcdMux.Handle("/v1/stats/", errorHandler(e.StatsHttpHandler))
- return etcdMux
- }
- type errorHandler func(http.ResponseWriter, *http.Request) error
- // addCorsHeader parses the request Origin header and loops through the user
- // provided allowed origins and sets the Access-Control-Allow-Origin header if
- // there is a match.
- func addCorsHeader(w http.ResponseWriter, r *http.Request) {
- val, ok := corsList["*"]
- if val && ok {
- w.Header().Add("Access-Control-Allow-Origin", "*")
- return
- }
- requestOrigin := r.Header.Get("Origin")
- val, ok = corsList[requestOrigin]
- if val && ok {
- w.Header().Add("Access-Control-Allow-Origin", requestOrigin)
- return
- }
- }
- func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
- addCorsHeader(w, r)
- if e := fn(w, r); e != nil {
- if etcdErr, ok := e.(*etcdErr.Error); ok {
- debug("Return error: ", (*etcdErr).Error())
- etcdErr.Write(w)
- } else {
- http.Error(w, e.Error(), http.StatusInternalServerError)
- }
- }
- }
- // Multiplex GET/POST/DELETE request to corresponding handlers
- func (e *etcdServer) Multiplexer(w http.ResponseWriter, req *http.Request) error {
- switch req.Method {
- case "GET":
- return e.GetHttpHandler(w, req)
- case "POST":
- return e.CreateHttpHandler(w, req)
- case "PUT":
- return e.UpdateHttpHandler(w, req)
- case "DELETE":
- return e.DeleteHttpHandler(w, req)
- default:
- w.WriteHeader(http.StatusMethodNotAllowed)
- return nil
- }
- return nil
- }
- //--------------------------------------
- // State sensitive handlers
- // Set/Delete will dispatch to leader
- //--------------------------------------
- func (e *etcdServer) CreateHttpHandler(w http.ResponseWriter, req *http.Request) error {
- key := getNodePath(req.URL.Path)
- debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
- value := req.FormValue("value")
- expireTime, err := durationToExpireTime(req.FormValue("ttl"))
- if err != nil {
- return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm)
- }
- command := &CreateCommand{
- Key: key,
- Value: value,
- ExpireTime: expireTime,
- }
- if req.FormValue("incremental") == "true" {
- command.IncrementalSuffix = true
- }
- return e.dispatchEtcdCommand(command, w, req)
- }
- func (e *etcdServer) UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
- key := getNodePath(req.URL.Path)
- debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
- req.ParseForm()
- value := req.Form.Get("value")
- expireTime, err := durationToExpireTime(req.Form.Get("ttl"))
- if err != nil {
- return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
- }
- // update should give at least one option
- if value == "" && expireTime.Sub(store.Permanent) == 0 {
- return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
- }
- prevValue, valueOk := req.Form["prevValue"]
- prevIndexStr, indexOk := req.Form["prevIndex"]
- if !valueOk && !indexOk { // update without test
- command := &UpdateCommand{
- Key: key,
- Value: value,
- ExpireTime: expireTime,
- }
- return e.dispatchEtcdCommand(command, w, req)
- } else { // update with test
- var prevIndex uint64
- if indexOk {
- prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64)
- // bad previous index
- if err != nil {
- return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm)
- }
- } else {
- prevIndex = 0
- }
- command := &TestAndSetCommand{
- Key: key,
- Value: value,
- PrevValue: prevValue[0],
- PrevIndex: prevIndex,
- }
- return e.dispatchEtcdCommand(command, w, req)
- }
- }
- // Delete Handler
- func (e *etcdServer) DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
- key := getNodePath(req.URL.Path)
- debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
- command := &DeleteCommand{
- Key: key,
- }
- if req.FormValue("recursive") == "true" {
- command.Recursive = true
- }
- return e.dispatchEtcdCommand(command, w, req)
- }
- // Dispatch the command to leader
- func (e *etcdServer) dispatchEtcdCommand(c Command, w http.ResponseWriter, req *http.Request) error {
- return e.raftServer.dispatch(c, w, req, nameToEtcdURL)
- }
- //--------------------------------------
- // State non-sensitive handlers
- // command with consistent option will
- // still dispatch to the leader
- //--------------------------------------
- // Handler to return the current leader's raft address
- func (e *etcdServer) LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
- r := e.raftServer
- leader := r.Leader()
- if leader != "" {
- w.WriteHeader(http.StatusOK)
- raftURL, _ := nameToRaftURL(leader)
- w.Write([]byte(raftURL))
- return nil
- } else {
- return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm)
- }
- }
- // Handler to return all the known machines in the current cluster
- func (e *etcdServer) MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
- machines := e.raftServer.getMachines(nameToEtcdURL)
- w.WriteHeader(http.StatusOK)
- w.Write([]byte(strings.Join(machines, ", ")))
- return nil
- }
- // Handler to return the current version of etcd
- func (e *etcdServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
- w.WriteHeader(http.StatusOK)
- fmt.Fprintf(w, "etcd %s", releaseVersion)
- return nil
- }
- // Handler to return the basic stats of etcd
- func (e *etcdServer) StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
- option := req.URL.Path[len("/v1/stats/"):]
- w.WriteHeader(http.StatusOK)
- r := e.raftServer
- switch option {
- case "self":
- w.Write(r.Stats())
- case "leader":
- if r.State() == raft.Leader {
- w.Write(r.PeerStats())
- } else {
- leader := r.Leader()
- // current no leader
- if leader == "" {
- return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
- }
- hostname, _ := nameToEtcdURL(leader)
- redirect(hostname, w, req)
- }
- case "store":
- w.Write(etcdStore.JsonStats())
- }
- return nil
- }
- func (e *etcdServer) GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
- var err error
- var event interface{}
- r := e.raftServer
- debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
- if req.FormValue("consistent") == "true" && r.State() != raft.Leader {
- // help client to redirect the request to the current leader
- leader := r.Leader()
- hostname, _ := nameToEtcdURL(leader)
- redirect(hostname, w, req)
- return nil
- }
- key := getNodePath(req.URL.Path)
- recursive := req.FormValue("recursive")
- if req.FormValue("wait") == "true" { // watch
- command := &WatchCommand{
- Key: key,
- }
- if recursive == "true" {
- command.Recursive = true
- }
- indexStr := req.FormValue("wait_index")
- if indexStr != "" {
- sinceIndex, err := strconv.ParseUint(indexStr, 10, 64)
- if err != nil {
- return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm)
- }
- command.SinceIndex = sinceIndex
- }
- event, err = command.Apply(r.Server)
- } else { //get
- command := &GetCommand{
- Key: key,
- }
- sorted := req.FormValue("sorted")
- if sorted == "true" {
- command.Sorted = true
- }
- if recursive == "true" {
- command.Recursive = true
- }
- event, err = command.Apply(r.Server)
- }
- if err != nil {
- return err
- } else {
- event, _ := event.(*store.Event)
- bytes, _ := json.Marshal(event)
- w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
- w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
- w.WriteHeader(http.StatusOK)
- w.Write(bytes)
- return nil
- }
- }
- // TestHandler
- func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
- testType := req.URL.Path[len("/test/"):]
- if testType == "speed" {
- directSet()
- w.WriteHeader(http.StatusOK)
- w.Write([]byte("speed test success"))
- return
- }
- w.WriteHeader(http.StatusBadRequest)
- }
|