Browse Source

Refactor v2 routes.

Ben Johnson 12 years ago
parent
commit
d3b064c2e9

+ 76 - 2
server/server.go

@@ -6,10 +6,12 @@ import (
 	"net/http"
 	"net/url"
 	"strings"
+	"time"
 
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/log"
 	"github.com/coreos/etcd/server/v1"
+	"github.com/coreos/etcd/server/v2"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/go-raft"
 	"github.com/gorilla/mux"
@@ -44,12 +46,24 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI
 		peerServer: peerServer,
 	}
 
-	// Install the routes for each version of the API.
+	// Install the routes.
+	s.handleFunc("/version", s.GetVersionHandler).Methods("GET")
 	s.installV1()
+	s.installV2()
 
 	return s
 }
 
+// The current state of the server in the cluster.
+func (s *Server) State() string {
+	return s.peerServer.State()
+}
+
+// The node name of the leader in the cluster.
+func (s *Server) Leader() string {
+	return s.peerServer.Leader()
+}
+
 // The current Raft committed index.
 func (s *Server) CommitIndex() uint64 {
 	return s.peerServer.CommitIndex()
@@ -65,6 +79,11 @@ func (s *Server) URL() string {
 	return s.url
 }
 
+// Retrives the Peer URL for a given node name.
+func (s *Server) PeerURL(name string) (string, bool) {
+	return s.registry.PeerURL(name)
+}
+
 // Returns a reference to the Store.
 func (s *Server) Store() *store.Store {
 	return s.store
@@ -77,7 +96,21 @@ func (s *Server) installV1() {
 	s.handleFuncV1("/v1/watch/{key:.*}", v1.WatchKeyHandler).Methods("GET", "POST")
 	s.handleFunc("/v1/leader", s.GetLeaderHandler).Methods("GET")
 	s.handleFunc("/v1/machines", s.GetMachinesHandler).Methods("GET")
-	s.handleFunc("/v1/stats", s.GetStatsHandler).Methods("GET")
+	s.handleFunc("/v1/stats/self", s.GetStatsHandler).Methods("GET")
+	s.handleFunc("/v1/stats/leader", s.GetLeaderStatsHandler).Methods("GET")
+	s.handleFunc("/v1/stats/store", s.GetStoreStatsHandler).Methods("GET")
+}
+
+func (s *Server) installV2() {
+	s.handleFuncV2("/v2/keys/{key:.*}", v2.GetKeyHandler).Methods("GET")
+	s.handleFuncV2("/v2/keys/{key:.*}", v2.CreateKeyHandler).Methods("POST")
+	s.handleFuncV2("/v2/keys/{key:.*}", v2.UpdateKeyHandler).Methods("PUT")
+	s.handleFuncV2("/v2/keys/{key:.*}", v2.DeleteKeyHandler).Methods("DELETE")
+	s.handleFunc("/v2/leader", s.GetLeaderHandler).Methods("GET")
+	s.handleFunc("/v2/machines", s.GetMachinesHandler).Methods("GET")
+	s.handleFunc("/v2/stats/self", s.GetStatsHandler).Methods("GET")
+	s.handleFunc("/v2/stats/leader", s.GetLeaderStatsHandler).Methods("GET")
+	s.handleFunc("/v2/stats/store", s.GetStoreStatsHandler).Methods("GET")
 }
 
 // Adds a v1 server handler to the router.
@@ -87,6 +120,13 @@ func (s *Server) handleFuncV1(path string, f func(http.ResponseWriter, *http.Req
 	})
 }
 
+// Adds a v2 server handler to the router.
+func (s *Server) handleFuncV2(path string, f func(http.ResponseWriter, *http.Request, v2.Server) error) *mux.Route {
+	return s.handleFunc(path, func(w http.ResponseWriter, req *http.Request) error {
+		return f(w, req, s)
+	})
+}
+
 // Adds a server handler to the router.
 func (s *Server) handleFunc(path string, f func(http.ResponseWriter, *http.Request) error) *mux.Route {
 	r := s.Handler.(*mux.Router)
@@ -181,6 +221,13 @@ func (s *Server) OriginAllowed(origin string) bool {
 	return s.corsOrigins["*"] || s.corsOrigins[origin]
 }
 
+// Handler to return the current version of etcd.
+func (s *Server) GetVersionHandler(w http.ResponseWriter, req *http.Request) error {
+	w.WriteHeader(http.StatusOK)
+	fmt.Fprintf(w, "etcd %s", releaseVersion)
+	return nil
+}
+
 // Handler to return the current leader's raft address
 func (s *Server) GetLeaderHandler(w http.ResponseWriter, req *http.Request) error {
 	leader := s.peerServer.Leader()
@@ -228,3 +275,30 @@ func (s *Server) GetStoreStatsHandler(w http.ResponseWriter, req *http.Request)
   w.Write(s.store.JsonStats())
   return nil
 }
+
+// Executes a speed test to evaluate the performance of update replication.
+func (s *Server) SpeedTestHandler(w http.ResponseWriter, req *http.Request) error {
+	count := 1000
+	c := make(chan bool, count)
+	for i := 0; i < count; i++ {
+		go func() {
+			for j := 0; j < 10; j++ {
+				c := &store.UpdateCommand{
+					Key: "foo",
+					Value: "bar",
+					ExpireTime: time.Unix(0, 0),
+				}
+				s.peerServer.Do(c)
+			}
+			c <- true
+		}()
+	}
+
+	for i := 0; i < count; i++ {
+		<-c
+	}
+
+	w.WriteHeader(http.StatusOK)
+	w.Write([]byte("speed test success"))
+	return nil
+}

+ 29 - 0
server/v2/create_key_handler.go

@@ -0,0 +1,29 @@
+package v2
+
+import (
+    "net/http"
+
+    etcdErr "github.com/coreos/etcd/error"
+    "github.com/coreos/etcd/store"
+    "github.com/gorilla/mux"
+)
+
+func CreateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
+    vars := mux.Vars(req)
+    key := "/" + vars["key"]
+
+    value := req.FormValue("value")
+    expireTime, err := store.TTL(req.FormValue("ttl"))
+    if err != nil {
+        return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm)
+    }
+
+    c := &store.CreateCommand{
+        Key:        key,
+        Value:      value,
+        ExpireTime: expireTime,
+        IncrementalSuffix: (req.FormValue("incremental") == "true"),
+    }
+
+    return s.Dispatch(c, w, req)
+}

+ 20 - 0
server/v2/delete_key_handler.go

@@ -0,0 +1,20 @@
+package v2
+
+import (
+    "net/http"
+
+    "github.com/coreos/etcd/store"
+    "github.com/gorilla/mux"
+)
+
+func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
+    vars := mux.Vars(req)
+    key := "/" + vars["key"]
+
+    c := &store.DeleteCommand{
+        Key: key,
+        Recursive: (req.FormValue("recursive") == "true"),
+    }
+
+    return s.Dispatch(c, w, req)
+}

+ 69 - 0
server/v2/get_key_handler.go

@@ -0,0 +1,69 @@
+package v2
+
+import (
+    "encoding/json"
+    "fmt"
+    "net/http"
+    "strconv"
+
+    etcdErr "github.com/coreos/etcd/error"
+    "github.com/coreos/etcd/log"
+    "github.com/coreos/etcd/store"
+    "github.com/coreos/go-raft"
+    "github.com/gorilla/mux"
+)
+
+func GetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
+    var err error
+    var event *store.Event
+
+    vars := mux.Vars(req)
+    key := "/" + vars["key"]
+
+    // Help client to redirect the request to the current leader
+    if req.FormValue("consistent") == "true" && s.State() != raft.Leader {
+        leader := s.Leader()
+        hostname, _ := s.PeerURL(leader)
+        url := hostname + req.URL.Path
+        log.Debugf("Redirect to %s", url)
+        http.Redirect(w, req, url, http.StatusTemporaryRedirect)
+        return nil
+    }
+
+    recursive := (req.FormValue("recursive") == "true")
+    sorted := (req.FormValue("sorted") == "true")
+
+    if req.FormValue("wait") == "true" { // watch
+        // Create a command to watch from a given index (default 0).
+        var sinceIndex uint64 = 0
+        if req.Method == "POST" {
+            sinceIndex, err = strconv.ParseUint(string(req.FormValue("wait_index")), 10, 64)
+            if err != nil {
+                return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm)
+            }
+        }
+
+        // Start the watcher on the store.
+        c, err := s.Store().Watch(key, recursive, sinceIndex, s.CommitIndex(), s.Term())
+        if err != nil {
+            return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm)
+        }
+        event = <-c
+
+    } else { //get
+        // Retrieve the key from the store.
+        event, err = s.Store().Get(key, recursive, sorted, s.CommitIndex(), s.Term())
+        if err != nil {
+            return err
+        }
+    }
+
+    w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
+    w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
+    w.WriteHeader(http.StatusOK)
+
+    b, _ := json.Marshal(event)
+    w.Write(b)
+
+    return nil
+}

+ 0 - 336
server/v2/handlers.go

@@ -1,336 +0,0 @@
-package main
-
-import (
-	"encoding/json"
-	"fmt"
-	"net/http"
-	"strconv"
-	"strings"
-
-	etcdErr "github.com/coreos/etcd/error"
-	"github.com/coreos/etcd/store"
-	"github.com/coreos/go-raft"
-)
-
-func NewEtcdMuxer() *http.ServeMux {
-	// external commands
-	router := mux.NewRouter()
-	etcdMux.Handle("/v2/keys/", errorHandler(e.Multiplexer))
-	etcdMux.Handle("/v2/leader", errorHandler(e.LeaderHttpHandler))
-	etcdMux.Handle("/v2/machines", errorHandler(e.MachinesHttpHandler))
-	etcdMux.Handle("/v2/stats/", errorHandler(e.StatsHttpHandler))
-	etcdMux.Handle("/version", errorHandler(e.VersionHttpHandler))
-	etcdMux.HandleFunc("/test/", TestHttpHandler)
-
-	// backward support
-	etcdMux.Handle("/v1/keys/", errorHandler(e.MultiplexerV1))
-	etcdMux.Handle("/v1/leader", errorHandler(e.LeaderHttpHandler))
-	etcdMux.Handle("/v1/machines", errorHandler(e.MachinesHttpHandler))
-	etcdMux.Handle("/v1/stats/", errorHandler(e.StatsHttpHandler))
-
-	return etcdMux
-}
-
-type errorHandler func(http.ResponseWriter, *http.Request) error
-
-// addCorsHeader parses the request Origin header and loops through the user
-// provided allowed origins and sets the Access-Control-Allow-Origin header if
-// there is a match.
-func addCorsHeader(w http.ResponseWriter, r *http.Request) {
-	val, ok := corsList["*"]
-	if val && ok {
-		w.Header().Add("Access-Control-Allow-Origin", "*")
-		return
-	}
-
-	requestOrigin := r.Header.Get("Origin")
-	val, ok = corsList[requestOrigin]
-	if val && ok {
-		w.Header().Add("Access-Control-Allow-Origin", requestOrigin)
-		return
-	}
-}
-
-func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-	addCorsHeader(w, r)
-	if e := fn(w, r); e != nil {
-		if etcdErr, ok := e.(*etcdErr.Error); ok {
-			debug("Return error: ", (*etcdErr).Error())
-			etcdErr.Write(w)
-		} else {
-			http.Error(w, e.Error(), http.StatusInternalServerError)
-		}
-	}
-}
-
-// Multiplex GET/POST/DELETE request to corresponding handlers
-func (e *etcdServer) Multiplexer(w http.ResponseWriter, req *http.Request) error {
-
-	switch req.Method {
-	case "GET":
-		return e.GetHttpHandler(w, req)
-	case "POST":
-		return e.CreateHttpHandler(w, req)
-	case "PUT":
-		return e.UpdateHttpHandler(w, req)
-	case "DELETE":
-		return e.DeleteHttpHandler(w, req)
-	default:
-		w.WriteHeader(http.StatusMethodNotAllowed)
-		return nil
-	}
-
-	return nil
-}
-
-//--------------------------------------
-// State sensitive handlers
-// Set/Delete will dispatch to leader
-//--------------------------------------
-
-func (e *etcdServer) CreateHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	key := getNodePath(req.URL.Path)
-
-	debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
-
-	value := req.FormValue("value")
-
-	expireTime, err := store.TTL(req.FormValue("ttl"))
-
-	if err != nil {
-		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm)
-	}
-
-	command := &CreateCommand{
-		Key:        key,
-		Value:      value,
-		ExpireTime: expireTime,
-	}
-
-	if req.FormValue("incremental") == "true" {
-		command.IncrementalSuffix = true
-	}
-
-	return e.dispatchEtcdCommand(command, w, req)
-
-}
-
-func (e *etcdServer) UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	key := getNodePath(req.URL.Path)
-
-	debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
-
-	req.ParseForm()
-
-	value := req.Form.Get("value")
-
-	expireTime, err := store.TTL(req.Form.Get("ttl"))
-
-	if err != nil {
-		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
-	}
-
-	// update should give at least one option
-	if value == "" && expireTime.Sub(store.Permanent) == 0 {
-		return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
-	}
-
-	prevValue, valueOk := req.Form["prevValue"]
-
-	prevIndexStr, indexOk := req.Form["prevIndex"]
-
-	if !valueOk && !indexOk { // update without test
-		command := &UpdateCommand{
-			Key:        key,
-			Value:      value,
-			ExpireTime: expireTime,
-		}
-
-		return e.dispatchEtcdCommand(command, w, req)
-
-	} else { // update with test
-		var prevIndex uint64
-
-		if indexOk {
-			prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64)
-
-			// bad previous index
-			if err != nil {
-				return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm)
-			}
-		} else {
-			prevIndex = 0
-		}
-
-		command := &TestAndSetCommand{
-			Key:       key,
-			Value:     value,
-			PrevValue: prevValue[0],
-			PrevIndex: prevIndex,
-		}
-
-		return e.dispatchEtcdCommand(command, w, req)
-	}
-
-}
-
-// Delete Handler
-func (e *etcdServer) DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	key := getNodePath(req.URL.Path)
-
-	debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
-
-	command := &DeleteCommand{
-		Key: key,
-	}
-
-	if req.FormValue("recursive") == "true" {
-		command.Recursive = true
-	}
-
-	return e.dispatchEtcdCommand(command, w, req)
-}
-
-// Dispatch the command to leader
-func (e *etcdServer) dispatchEtcdCommand(c Command, w http.ResponseWriter, req *http.Request) error {
-	return e.raftServer.dispatch(c, w, req, nameToEtcdURL)
-}
-
-//--------------------------------------
-// State non-sensitive handlers
-// command with consistent option will
-// still dispatch to the leader
-//--------------------------------------
-
-// Handler to return the current version of etcd
-func (e *etcdServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	w.WriteHeader(http.StatusOK)
-	fmt.Fprintf(w, "etcd %s", releaseVersion)
-
-	return nil
-}
-
-func (e *etcdServer) GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	var err error
-	var event interface{}
-
-	r := e.raftServer
-
-	debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
-
-	if req.FormValue("consistent") == "true" && r.State() != raft.Leader {
-		// help client to redirect the request to the current leader
-		leader := r.Leader()
-		hostname, _ := nameToEtcdURL(leader)
-		redirect(hostname, w, req)
-		return nil
-	}
-
-	key := getNodePath(req.URL.Path)
-
-	recursive := req.FormValue("recursive")
-
-	if req.FormValue("wait") == "true" { // watch
-		command := &WatchCommand{
-			Key: key,
-		}
-
-		if recursive == "true" {
-			command.Recursive = true
-		}
-
-		indexStr := req.FormValue("wait_index")
-		if indexStr != "" {
-			sinceIndex, err := strconv.ParseUint(indexStr, 10, 64)
-
-			if err != nil {
-				return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm)
-			}
-
-			command.SinceIndex = sinceIndex
-		}
-
-		event, err = command.Apply(r.Server)
-
-	} else { //get
-
-		command := &GetCommand{
-			Key: key,
-		}
-
-		sorted := req.FormValue("sorted")
-		if sorted == "true" {
-			command.Sorted = true
-		}
-
-		if recursive == "true" {
-			command.Recursive = true
-		}
-
-		event, err = command.Apply(r.Server)
-	}
-
-	if err != nil {
-		return err
-
-	} else {
-		event, _ := event.(*store.Event)
-		bytes, _ := json.Marshal(event)
-
-		w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
-		w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
-		w.WriteHeader(http.StatusOK)
-
-		w.Write(bytes)
-
-		return nil
-	}
-
-}
-
-func getNodePath(urlPath string) string {
-	pathPrefixLen := len("/" + version + "/keys")
-	return urlPath[pathPrefixLen:]
-}
-
-
-//--------------------------------------
-// Testing
-//--------------------------------------
-
-// TestHandler
-func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
-	testType := req.URL.Path[len("/test/"):]
-
-	if testType == "speed" {
-		directSet()
-		w.WriteHeader(http.StatusOK)
-		w.Write([]byte("speed test success"))
-
-		return
-	}
-
-	w.WriteHeader(http.StatusBadRequest)
-}
-
-func directSet() {
-	c := make(chan bool, 1000)
-	for i := 0; i < 1000; i++ {
-		go send(c)
-	}
-
-	for i := 0; i < 1000; i++ {
-		<-c
-	}
-}
-
-func send(c chan bool) {
-	for i := 0; i < 10; i++ {
-		command := &UpdateCommand{}
-		command.Key = "foo"
-		command.Value = "bar"
-		command.ExpireTime = time.Unix(0, 0)
-		//r.Do(command)
-	}
-	c <- true
-}

+ 64 - 0
server/v2/update_key_handler.go

@@ -0,0 +1,64 @@
+package v2
+
+import (
+    "net/http"
+    "strconv"
+
+    etcdErr "github.com/coreos/etcd/error"
+    "github.com/coreos/etcd/store"
+    "github.com/coreos/go-raft"
+    "github.com/gorilla/mux"
+)
+
+func UpdateKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
+    vars := mux.Vars(req)
+    key := "/" + vars["key"]
+
+    req.ParseForm()
+
+    value := req.Form.Get("value")
+    expireTime, err := store.TTL(req.Form.Get("ttl"))
+    if err != nil {
+        return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
+    }
+
+    // Update should give at least one option
+    if value == "" && expireTime.Sub(store.Permanent) == 0 {
+        return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
+    }
+
+    prevValue, valueOk := req.Form["prevValue"]
+    prevIndexStr, indexOk := req.Form["prevIndex"]
+
+    var c raft.Command
+    if !valueOk && !indexOk { // update without test
+        c = &store.UpdateCommand{
+            Key:        key,
+            Value:      value,
+            ExpireTime: expireTime,
+        }
+
+    } else { // update with test
+        var prevIndex uint64
+
+        if indexOk {
+            prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64)
+
+            // bad previous index
+            if err != nil {
+                return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm)
+            }
+        } else {
+            prevIndex = 0
+        }
+
+        c = &store.TestAndSetCommand{
+            Key:       key,
+            Value:     value,
+            PrevValue: prevValue[0],
+            PrevIndex: prevIndex,
+        }
+    }
+
+    return s.Dispatch(c, w, req)
+}

+ 18 - 0
server/v2/v2.go

@@ -0,0 +1,18 @@
+package v2
+
+import (
+  "net/http"
+  "github.com/coreos/etcd/store"
+  "github.com/coreos/go-raft"
+)
+
+// The Server interface provides all the methods required for the v2 API.
+type Server interface {
+  State() string
+  Leader() string
+  CommitIndex() uint64
+  Term() uint64
+  PeerURL(string) (string, bool)
+  Store() *store.Store
+  Dispatch(raft.Command, http.ResponseWriter, *http.Request) error
+}