Browse Source

Refactor v2 API into server/v2.

Ben Johnson 12 years ago
parent
commit
594c2cab47

+ 0 - 52
command.go

@@ -107,32 +107,6 @@ func (c *TestAndSetCommand) Apply(server *raft.Server) (interface{}, error) {
 	return e, nil
 	return e, nil
 }
 }
 
 
-// Get command
-type GetCommand struct {
-	Key       string `json:"key"`
-	Recursive bool   `json:"recursive"`
-	Sorted    bool   `json:"sorted"`
-}
-
-// The name of the get command in the log
-func (c *GetCommand) CommandName() string {
-	return commandName("get")
-}
-
-// Get the value of key
-func (c *GetCommand) Apply(server *raft.Server) (interface{}, error) {
-	s, _ := server.StateMachine().(*store.Store)
-
-	e, err := s.Get(c.Key, c.Recursive, c.Sorted, server.CommitIndex(), server.Term())
-
-	if err != nil {
-		debug(err)
-		return nil, err
-	}
-
-	return e, nil
-}
-
 // Delete command
 // Delete command
 type DeleteCommand struct {
 type DeleteCommand struct {
 	Key       string `json:"key"`
 	Key       string `json:"key"`
@@ -158,32 +132,6 @@ func (c *DeleteCommand) Apply(server *raft.Server) (interface{}, error) {
 	return e, nil
 	return e, nil
 }
 }
 
 
-// Watch command
-type WatchCommand struct {
-	Key        string `json:"key"`
-	SinceIndex uint64 `json:"sinceIndex"`
-	Recursive  bool   `json:"recursive"`
-}
-
-// The name of the watch command in the log
-func (c *WatchCommand) CommandName() string {
-	return commandName("watch")
-}
-
-func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
-	s, _ := server.StateMachine().(*store.Store)
-
-	eventChan, err := s.Watch(c.Key, c.Recursive, c.SinceIndex, server.CommitIndex(), server.Term())
-
-	if err != nil {
-		return nil, err
-	}
-
-	e := <-eventChan
-
-	return e, nil
-}
-
 // JoinCommand
 // JoinCommand
 type JoinCommand struct {
 type JoinCommand struct {
 	RaftVersion string `json:"raftVersion"`
 	RaftVersion string `json:"raftVersion"`

+ 6 - 7
etcd.go

@@ -3,14 +3,13 @@ package main
 import (
 import (
 	"crypto/tls"
 	"crypto/tls"
 	"flag"
 	"flag"
-	"fmt"
 	"io/ioutil"
 	"io/ioutil"
-	"net/url"
 	"os"
 	"os"
 	"strings"
 	"strings"
 	"time"
 	"time"
 
 
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
+	"github.com/coreos/etcd/server"
 	"github.com/coreos/go-raft"
 	"github.com/coreos/go-raft"
 )
 )
 
 
@@ -154,8 +153,6 @@ func main() {
 		raft.SetLogLevel(raft.Debug)
 		raft.SetLogLevel(raft.Debug)
 	}
 	}
 
 
-	parseCorsFlag()
-
 	if machines != "" {
 	if machines != "" {
 		cluster = strings.Split(machines, ",")
 		cluster = strings.Split(machines, ",")
 	} else if machinesFile != "" {
 	} else if machinesFile != "" {
@@ -204,10 +201,12 @@ func main() {
 	r := newRaftServer(info.Name, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS)
 	r := newRaftServer(info.Name, info.RaftURL, info.RaftListenHost, &raftTLSConfig, &info.RaftTLS)
 	snapConf = r.newSnapshotConf()
 	snapConf = r.newSnapshotConf()
 
 
-	e = newEtcdServer(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, r)
+	s := server.New(info.Name, info.EtcdURL, info.EtcdListenHost, &etcdTLSConfig, &info.EtcdTLS, r)
+	if err := e.AllowOrigins(cors); err != nil {
+		panic(err)
+	}
 
 
 	r.ListenAndServe()
 	r.ListenAndServe()
-	e.ListenAndServe()
-
+	s.ListenAndServe()
 }
 }
 
 

+ 0 - 364
etcd_handlers.go

@@ -1,364 +0,0 @@
-package main
-
-import (
-	"encoding/json"
-	"fmt"
-	"net/http"
-	"strconv"
-	"strings"
-
-	etcdErr "github.com/coreos/etcd/error"
-	"github.com/coreos/etcd/store"
-	"github.com/coreos/go-raft"
-)
-
-//-------------------------------------------------------------------
-// Handlers to handle etcd-store related request via etcd url
-//-------------------------------------------------------------------
-
-func NewEtcdMuxer() *http.ServeMux {
-	// external commands
-	router := mux.NewRouter()
-	etcdMux.Handle("/v2/keys/", errorHandler(e.Multiplexer))
-	etcdMux.Handle("/v2/leader", errorHandler(e.LeaderHttpHandler))
-	etcdMux.Handle("/v2/machines", errorHandler(e.MachinesHttpHandler))
-	etcdMux.Handle("/v2/stats/", errorHandler(e.StatsHttpHandler))
-	etcdMux.Handle("/version", errorHandler(e.VersionHttpHandler))
-	etcdMux.HandleFunc("/test/", TestHttpHandler)
-
-	// backward support
-	etcdMux.Handle("/v1/keys/", errorHandler(e.MultiplexerV1))
-	etcdMux.Handle("/v1/leader", errorHandler(e.LeaderHttpHandler))
-	etcdMux.Handle("/v1/machines", errorHandler(e.MachinesHttpHandler))
-	etcdMux.Handle("/v1/stats/", errorHandler(e.StatsHttpHandler))
-
-	return etcdMux
-}
-
-type errorHandler func(http.ResponseWriter, *http.Request) error
-
-// addCorsHeader parses the request Origin header and loops through the user
-// provided allowed origins and sets the Access-Control-Allow-Origin header if
-// there is a match.
-func addCorsHeader(w http.ResponseWriter, r *http.Request) {
-	val, ok := corsList["*"]
-	if val && ok {
-		w.Header().Add("Access-Control-Allow-Origin", "*")
-		return
-	}
-
-	requestOrigin := r.Header.Get("Origin")
-	val, ok = corsList[requestOrigin]
-	if val && ok {
-		w.Header().Add("Access-Control-Allow-Origin", requestOrigin)
-		return
-	}
-}
-
-func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
-	addCorsHeader(w, r)
-	if e := fn(w, r); e != nil {
-		if etcdErr, ok := e.(*etcdErr.Error); ok {
-			debug("Return error: ", (*etcdErr).Error())
-			etcdErr.Write(w)
-		} else {
-			http.Error(w, e.Error(), http.StatusInternalServerError)
-		}
-	}
-}
-
-// Multiplex GET/POST/DELETE request to corresponding handlers
-func (e *etcdServer) Multiplexer(w http.ResponseWriter, req *http.Request) error {
-
-	switch req.Method {
-	case "GET":
-		return e.GetHttpHandler(w, req)
-	case "POST":
-		return e.CreateHttpHandler(w, req)
-	case "PUT":
-		return e.UpdateHttpHandler(w, req)
-	case "DELETE":
-		return e.DeleteHttpHandler(w, req)
-	default:
-		w.WriteHeader(http.StatusMethodNotAllowed)
-		return nil
-	}
-
-	return nil
-}
-
-//--------------------------------------
-// State sensitive handlers
-// Set/Delete will dispatch to leader
-//--------------------------------------
-
-func (e *etcdServer) CreateHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	key := getNodePath(req.URL.Path)
-
-	debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
-
-	value := req.FormValue("value")
-
-	expireTime, err := durationToExpireTime(req.FormValue("ttl"))
-
-	if err != nil {
-		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm)
-	}
-
-	command := &CreateCommand{
-		Key:        key,
-		Value:      value,
-		ExpireTime: expireTime,
-	}
-
-	if req.FormValue("incremental") == "true" {
-		command.IncrementalSuffix = true
-	}
-
-	return e.dispatchEtcdCommand(command, w, req)
-
-}
-
-func (e *etcdServer) UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	key := getNodePath(req.URL.Path)
-
-	debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
-
-	req.ParseForm()
-
-	value := req.Form.Get("value")
-
-	expireTime, err := durationToExpireTime(req.Form.Get("ttl"))
-
-	if err != nil {
-		return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
-	}
-
-	// update should give at least one option
-	if value == "" && expireTime.Sub(store.Permanent) == 0 {
-		return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
-	}
-
-	prevValue, valueOk := req.Form["prevValue"]
-
-	prevIndexStr, indexOk := req.Form["prevIndex"]
-
-	if !valueOk && !indexOk { // update without test
-		command := &UpdateCommand{
-			Key:        key,
-			Value:      value,
-			ExpireTime: expireTime,
-		}
-
-		return e.dispatchEtcdCommand(command, w, req)
-
-	} else { // update with test
-		var prevIndex uint64
-
-		if indexOk {
-			prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64)
-
-			// bad previous index
-			if err != nil {
-				return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm)
-			}
-		} else {
-			prevIndex = 0
-		}
-
-		command := &TestAndSetCommand{
-			Key:       key,
-			Value:     value,
-			PrevValue: prevValue[0],
-			PrevIndex: prevIndex,
-		}
-
-		return e.dispatchEtcdCommand(command, w, req)
-	}
-
-}
-
-// Delete Handler
-func (e *etcdServer) DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	key := getNodePath(req.URL.Path)
-
-	debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
-
-	command := &DeleteCommand{
-		Key: key,
-	}
-
-	if req.FormValue("recursive") == "true" {
-		command.Recursive = true
-	}
-
-	return e.dispatchEtcdCommand(command, w, req)
-}
-
-// Dispatch the command to leader
-func (e *etcdServer) dispatchEtcdCommand(c Command, w http.ResponseWriter, req *http.Request) error {
-	return e.raftServer.dispatch(c, w, req, nameToEtcdURL)
-}
-
-//--------------------------------------
-// State non-sensitive handlers
-// command with consistent option will
-// still dispatch to the leader
-//--------------------------------------
-
-// Handler to return the current leader's raft address
-func (e *etcdServer) LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	r := e.raftServer
-
-	leader := r.Leader()
-
-	if leader != "" {
-		w.WriteHeader(http.StatusOK)
-		raftURL, _ := nameToRaftURL(leader)
-		w.Write([]byte(raftURL))
-
-		return nil
-	} else {
-		return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm)
-	}
-}
-
-// Handler to return all the known machines in the current cluster
-func (e *etcdServer) MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	machines := e.raftServer.getMachines(nameToEtcdURL)
-
-	w.WriteHeader(http.StatusOK)
-	w.Write([]byte(strings.Join(machines, ", ")))
-
-	return nil
-}
-
-// Handler to return the current version of etcd
-func (e *etcdServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	w.WriteHeader(http.StatusOK)
-	fmt.Fprintf(w, "etcd %s", releaseVersion)
-
-	return nil
-}
-
-// Handler to return the basic stats of etcd
-func (e *etcdServer) StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	option := req.URL.Path[len("/v1/stats/"):]
-	w.WriteHeader(http.StatusOK)
-
-	r := e.raftServer
-
-	switch option {
-	case "self":
-		w.Write(r.Stats())
-	case "leader":
-		if r.State() == raft.Leader {
-			w.Write(r.PeerStats())
-		} else {
-			leader := r.Leader()
-			// current no leader
-			if leader == "" {
-				return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
-			}
-			hostname, _ := nameToEtcdURL(leader)
-			redirect(hostname, w, req)
-		}
-	case "store":
-		w.Write(etcdStore.JsonStats())
-	}
-
-	return nil
-}
-
-func (e *etcdServer) GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	var err error
-	var event interface{}
-
-	r := e.raftServer
-
-	debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
-
-	if req.FormValue("consistent") == "true" && r.State() != raft.Leader {
-		// help client to redirect the request to the current leader
-		leader := r.Leader()
-		hostname, _ := nameToEtcdURL(leader)
-		redirect(hostname, w, req)
-		return nil
-	}
-
-	key := getNodePath(req.URL.Path)
-
-	recursive := req.FormValue("recursive")
-
-	if req.FormValue("wait") == "true" { // watch
-		command := &WatchCommand{
-			Key: key,
-		}
-
-		if recursive == "true" {
-			command.Recursive = true
-		}
-
-		indexStr := req.FormValue("wait_index")
-		if indexStr != "" {
-			sinceIndex, err := strconv.ParseUint(indexStr, 10, 64)
-
-			if err != nil {
-				return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm)
-			}
-
-			command.SinceIndex = sinceIndex
-		}
-
-		event, err = command.Apply(r.Server)
-
-	} else { //get
-
-		command := &GetCommand{
-			Key: key,
-		}
-
-		sorted := req.FormValue("sorted")
-		if sorted == "true" {
-			command.Sorted = true
-		}
-
-		if recursive == "true" {
-			command.Recursive = true
-		}
-
-		event, err = command.Apply(r.Server)
-	}
-
-	if err != nil {
-		return err
-
-	} else {
-		event, _ := event.(*store.Event)
-		bytes, _ := json.Marshal(event)
-
-		w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
-		w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
-		w.WriteHeader(http.StatusOK)
-
-		w.Write(bytes)
-
-		return nil
-	}
-
-}
-
-// TestHandler
-func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
-	testType := req.URL.Path[len("/test/"):]
-
-	if testType == "speed" {
-		directSet()
-		w.WriteHeader(http.StatusOK)
-		w.Write([]byte("speed test success"))
-
-		return
-	}
-
-	w.WriteHeader(http.StatusBadRequest)
-}

+ 46 - 14
server/server.go

@@ -3,12 +3,20 @@ package server
 import (
 import (
 	"github.com/gorilla/mux"
 	"github.com/gorilla/mux"
 	"net/http"
 	"net/http"
+	"net/url"
 )
 )
 
 
-// The Server provides an HTTP interface to the underlying data store.
-type Server struct {
+// The Server provides an HTTP interface to the underlying store.
+type Server interface {
+    CommitIndex() uint64 
+    Term() uint64 
+    Dispatch(Command, http.ResponseWriter, *http.Request)
+}
+
+// This is the default implementation of the Server interface.
+type server struct {
 	http.Server
 	http.Server
-	raftServer  *raftServer
+	raftServer  *raft.Server
 	name        string
 	name        string
 	url         string
 	url         string
 	tlsConf     *TLSConfig
 	tlsConf     *TLSConfig
@@ -17,8 +25,8 @@ type Server struct {
 }
 }
 
 
 // Creates a new Server.
 // Creates a new Server.
-func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, raftServer *raftServer) *Server {
-	s := &etcdServer{
+func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsInfo *TLSInfo, raftServer *raft.Server) *Server {
+	s := &server{
 		Server: http.Server{
 		Server: http.Server{
 			Handler:   mux.NewRouter(),
 			Handler:   mux.NewRouter(),
 			TLSConfig: &tlsConf.Server,
 			TLSConfig: &tlsConf.Server,
@@ -31,20 +39,44 @@ func New(name string, urlStr string, listenHost string, tlsConf *TLSConfig, tlsI
 		raftServer: raftServer,
 		raftServer: raftServer,
 	}
 	}
 
 
-	// TODO: Move to main.go.
 	// Install the routes for each version of the API.
 	// Install the routes for each version of the API.
-	// v1.Install(s)
-	// v2.Install(s)
+	s.installV1()
 
 
 	return s
 	return s
 }
 }
 
 
+// The current Raft committed index.
+func (s *server) CommitIndex() uint64 {
+	return c.raftServer.CommitIndex()
+}
+
+// The current Raft term.
+func (s *server) Term() uint64 {
+	return c.raftServer.Term()
+}
+
+// Executes a command against the Raft server.
+func (s *server) Do(c Command, localOnly bool) (interface{}, error) {
+	return c.raftServer.Do(s.RaftServer().Server)
+}
+
+func (s *server) installV1() {
+	s.handleFunc("/v1/keys/{key:.*}", v1.GetKeyHandler).Methods("GET")
+	s.handleFunc("/v1/keys/{key:.*}", v1.SetKeyHandler).Methods("POST", "PUT")
+	s.handleFunc("/v1/keys/{key:.*}", v1.DeleteKeyHandler).Methods("DELETE")
+
+	s.handleFunc("/v1/watch/{key:.*}", v1.WatchKeyHandler).Methods("GET", "POST")
+}
+
 // Adds a server handler to the router.
 // Adds a server handler to the router.
-func (s *Server) HandleFunc(path string, f func(http.ResponseWriter, *http.Request, *server.Server) error) *mux.Route {
+func (s *server) handleFunc(path string, f func(http.ResponseWriter, *http.Request, Server) error) *mux.Route {
 	r := s.Handler.(*mux.Router)
 	r := s.Handler.(*mux.Router)
 
 
 	// Wrap the standard HandleFunc interface to pass in the server reference.
 	// Wrap the standard HandleFunc interface to pass in the server reference.
 	return r.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) {
 	return r.HandleFunc(path, func(w http.ResponseWriter, req *http.Request) {
+		// Log request.
+		debugf("[recv] %s %s [%s]", req.Method, s.url, req.URL.Path, req.RemoteAddr)
+
 		// Write CORS header.
 		// Write CORS header.
 		if s.OriginAllowed("*") {
 		if s.OriginAllowed("*") {
 			w.Header().Add("Access-Control-Allow-Origin", "*")
 			w.Header().Add("Access-Control-Allow-Origin", "*")
@@ -65,7 +97,7 @@ func (s *Server) HandleFunc(path string, f func(http.ResponseWriter, *http.Reque
 }
 }
 
 
 // Start to listen and response etcd client command
 // Start to listen and response etcd client command
-func (s *Server) ListenAndServe() {
+func (s *server) ListenAndServe() {
 	infof("etcd server [name %s, listen on %s, advertised url %s]", s.name, s.Server.Addr, s.url)
 	infof("etcd server [name %s, listen on %s, advertised url %s]", s.name, s.Server.Addr, s.url)
 
 
 	if s.tlsConf.Scheme == "http" {
 	if s.tlsConf.Scheme == "http" {
@@ -76,7 +108,7 @@ func (s *Server) ListenAndServe() {
 }
 }
 
 
 // Sets a comma-delimited list of origins that are allowed.
 // Sets a comma-delimited list of origins that are allowed.
-func (s *Server) AllowOrigins(origins string) error {
+func (s *server) AllowOrigins(origins string) error {
 	// Construct a lookup of all origins.
 	// Construct a lookup of all origins.
 	m := make(map[string]bool)
 	m := make(map[string]bool)
 	for _, v := range strings.Split(cors, ",") {
 	for _, v := range strings.Split(cors, ",") {
@@ -87,12 +119,12 @@ func (s *Server) AllowOrigins(origins string) error {
 		}
 		}
 		m[v] = true
 		m[v] = true
 	}
 	}
-	s.origins = m
+	s.corsOrigins = m
 
 
 	return nil
 	return nil
 }
 }
 
 
 // Determines whether the server will allow a given CORS origin.
 // Determines whether the server will allow a given CORS origin.
-func (s *Server) OriginAllowed(origin string) {
-	return s.origins["*"] || s.origins[origin]
+func (s *server) OriginAllowed(origin string) {
+	return s.corsOrigins["*"] || s.corsOrigins[origin]
 }
 }

+ 12 - 10
server/v1/delete_key_handler.go

@@ -1,13 +1,15 @@
 package v1
 package v1
 
 
-func deleteKeyHandler(w http.ResponseWriter, req *http.Request, e *etcdServer) error {
-	key := req.URL.Path[len("/v1/keys/"):]
-
-	debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
-
-	command := &DeleteCommand{
-		Key: key,
-	}
-
-	return dispatchEtcdCommandV1(command, w, req)
+import (
+    "encoding/json"
+    "github.com/coreos/etcd/store"
+    "net/http"
+)
+
+// Removes a key from the store.
+func DeleteKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
+    vars := mux.Vars(req)
+    key := "/" + vars["key"]
+	command := &DeleteCommand{Key: key}
+	return s.Dispatch(command, w, req)
 }
 }

+ 19 - 19
server/v1/dispatch.go

@@ -1,40 +1,40 @@
 package v1
 package v1
 
 
 // Dispatch the command to leader.
 // Dispatch the command to leader.
-func dispatchCommand(c Command, w http.ResponseWriter, req *http.Request) error {
-	return dispatch(c, w, req, nameToEtcdURL)
+func dispatchCommand(c Command, w http.ResponseWriter, req *http.Request, s *server.Server) error {
+	return dispatch(c, w, req, s, nameToEtcdURL)
 }
 }
 
 
 // Dispatches a command to a given URL.
 // Dispatches a command to a given URL.
-func dispatch(c Command, w http.ResponseWriter, req *http.Request, toURL func(name string) (string, bool)) error {
-	r := e.raftServer
+func dispatch(c Command, w http.ResponseWriter, req *http.Request, s *server.Server, toURL func(name string) (string, bool)) error {
+	r := s.raftServer
 	if r.State() == raft.Leader {
 	if r.State() == raft.Leader {
-		if event, err := r.Do(c); err != nil {
+		event, err := r.Do(c)
+		if err != nil {
 			return err
 			return err
-		} else {
-			if event == nil {
-				return etcdErr.NewError(300, "Empty result from raft", store.UndefIndex, store.UndefTerm)
-			}
-
-			event, _ := event.(*store.Event)
+		}
 
 
-			response := eventToResponse(event)
-			bytes, _ := json.Marshal(response)
+		if event == nil {
+			return etcdErr.NewError(300, "Empty result from raft", store.UndefIndex, store.UndefTerm)
+		}
 
 
-			w.WriteHeader(http.StatusOK)
-			w.Write(bytes)
-			return nil
+		event, _ := event.(*store.Event)
+		response := eventToResponse(event)
+		b, _ := json.Marshal(response)
+		w.WriteHeader(http.StatusOK)
+		w.Write(b)
 
 
-		}
+		return nil
 
 
 	} else {
 	} else {
 		leader := r.Leader()
 		leader := r.Leader()
-		// current no leader
+
+		// No leader available.
 		if leader == "" {
 		if leader == "" {
 			return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
 			return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
 		}
 		}
-		url, _ := toURL(leader)
 
 
+		url, _ := toURL(leader)
 		redirect(url, w, req)
 		redirect(url, w, req)
 
 
 		return nil
 		return nil

+ 3 - 6
server/v1/get_key_handler.go

@@ -7,15 +7,12 @@ import (
 )
 )
 
 
 // Retrieves the value for a given key.
 // Retrieves the value for a given key.
-func getKeyHandler(w http.ResponseWriter, req *http.Request, e *etcdServer) error {
+func GetKeyHandler(w http.ResponseWriter, req *http.Request, e *etcdServer) error {
 	vars := mux.Vars(req)
 	vars := mux.Vars(req)
 	key := "/" + vars["key"]
 	key := "/" + vars["key"]
 
 
-	debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
-
-	// Execute the command.
-	command := &GetCommand{Key: key}
-	event, err := command.Apply(e.raftServer.Server)
+	// Retrieve the key from the store.
+	event, err := s.Store().Get(key, false, false, s.CommitIndex(), s.Term())
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}

+ 0 - 15
server/v1/install.go

@@ -1,15 +0,0 @@
-package v1
-
-import (
-	"github.com/coreos/etcd/server"
-	"github.com/gorilla/mux"
-)
-
-// Installs the routes for version 1 of the API on to a server.
-func Install(s *server.Server) {
-	s.HandleFunc("/v1/keys/{key:.*}", getKeyHandler).Methods("GET")
-	s.HandleFunc("/v1/keys/{key:.*}", setKeyHandler).Methods("POST", "PUT")
-	s.HandleFunc("/v1/keys/{key:.*}", deleteKeyHandler).Methods("DELETE")
-
-	s.HandleFunc("/v1/watch/{key:.*}", watchKeyHandler).Methods("GET", "POST")
-}

+ 2 - 4
server/v1/set_key_handler.go

@@ -7,12 +7,10 @@ import (
 )
 )
 
 
 // Sets the value for a given key.
 // Sets the value for a given key.
-func setKeyHandler(w http.ResponseWriter, req *http.Request, e *etcdServer) error {
+func SetKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
 	vars := mux.Vars(req)
 	vars := mux.Vars(req)
 	key := "/" + vars["key"]
 	key := "/" + vars["key"]
 
 
-	debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
-
 	req.ParseForm()
 	req.ParseForm()
 
 
 	// Parse non-blank value.
 	// Parse non-blank value.
@@ -46,5 +44,5 @@ func setKeyHandler(w http.ResponseWriter, req *http.Request, e *etcdServer) erro
 		}
 		}
 	}
 	}
 
 
-	return dispatchEtcdCommand(command, w, req)
+	return s.Dispatch(command, w, req)
 }
 }

+ 0 - 38
server/v1/util.go

@@ -1,38 +0,0 @@
-package v1
-
-// Converts an event object into a response object.
-func eventToResponse(event *store.Event) interface{} {
-	if !event.Dir {
-		response := &store.Response{
-			Action:     event.Action,
-			Key:        event.Key,
-			Value:      event.Value,
-			PrevValue:  event.PrevValue,
-			Index:      event.Index,
-			TTL:        event.TTL,
-			Expiration: event.Expiration,
-		}
-
-		if response.Action == store.Create || response.Action == store.Update {
-			response.Action = "set"
-			if response.PrevValue == "" {
-				response.NewKey = true
-			}
-		}
-
-		return response
-	} else {
-		responses := make([]*store.Response, len(event.KVPairs))
-
-		for i, kv := range event.KVPairs {
-			responses[i] = &store.Response{
-				Action: event.Action,
-				Key:    kv.Key,
-				Value:  kv.Value,
-				Dir:    kv.Dir,
-				Index:  event.Index,
-			}
-		}
-		return responses
-	}
-}

+ 50 - 0
server/v1/v1.go

@@ -0,0 +1,50 @@
+package v1
+
+import (
+	"github.com/coreos/etcd/server"
+	"github.com/gorilla/mux"
+)
+
+// The Server interface provides all the methods required for the v1 API.
+type Server interface {
+    CommitIndex() uint64 
+    Term() uint64 
+    Dispatch(http.ResponseWriter, *http.Request, Command) 
+}
+
+// Converts an event object into a response object.
+func eventToResponse(event *store.Event) interface{} {
+    if !event.Dir {
+        response := &store.Response{
+            Action:     event.Action,
+            Key:        event.Key,
+            Value:      event.Value,
+            PrevValue:  event.PrevValue,
+            Index:      event.Index,
+            TTL:        event.TTL,
+            Expiration: event.Expiration,
+        }
+
+        if response.Action == store.Create || response.Action == store.Update {
+            response.Action = "set"
+            if response.PrevValue == "" {
+                response.NewKey = true
+            }
+        }
+
+        return response
+    } else {
+        responses := make([]*store.Response, len(event.KVPairs))
+
+        for i, kv := range event.KVPairs {
+            responses[i] = &store.Response{
+                Action: event.Action,
+                Key:    kv.Key,
+                Value:  kv.Value,
+                Dir:    kv.Dir,
+                Index:  event.Index,
+            }
+        }
+        return responses
+    }
+}

+ 7 - 8
server/v1/watch_key_handler.go

@@ -7,27 +7,26 @@ import (
 )
 )
 
 
 // Watches a given key prefix for changes.
 // Watches a given key prefix for changes.
-func watchKeyHandler(w http.ResponseWriter, req *http.Request, e *etcdServer) error {
+func WatchKeyHandler(w http.ResponseWriter, req *http.Request, s Server) error {
+	var err error
 	vars := mux.Vars(req)
 	vars := mux.Vars(req)
 	key := "/" + vars["key"]
 	key := "/" + vars["key"]
 
 
-	debugf("[recv] %s %s/watch/%s [%s]", req.Method, e.url, key, req.RemoteAddr)
-
 	// Create a command to watch from a given index (default 0).
 	// Create a command to watch from a given index (default 0).
-	command := &WatchCommand{Key: key}
+	sinceIndex := 0
 	if req.Method == "POST" {
 	if req.Method == "POST" {
-		sinceIndex, err := strconv.ParseUint(string(req.FormValue("index")), 10, 64)
+		sinceIndex, err = strconv.ParseUint(string(req.FormValue("index")), 10, 64)
 		if err != nil {
 		if err != nil {
 			return etcdErr.NewError(203, "Watch From Index", store.UndefIndex, store.UndefTerm)
 			return etcdErr.NewError(203, "Watch From Index", store.UndefIndex, store.UndefTerm)
 		}
 		}
-		command.SinceIndex = sinceIndex
 	}
 	}
 
 
-	// Apply the command and write the response.
-	event, err := command.Apply(e.raftServer.Server)
+	// Start the watcher on the store.
+	c, err := s.Store().Watch(key, false, sinceIndex, s.CommitIndex(), s.Term())
 	if err != nil {
 	if err != nil {
 		return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm)
 		return etcdErr.NewError(500, key, store.UndefIndex, store.UndefTerm)
 	}
 	}
+	event := <-c
 
 
 	event, _ := event.(*store.Event)
 	event, _ := event.(*store.Event)
 	response := eventToResponse(event)
 	response := eventToResponse(event)

+ 364 - 0
server/v2/handlers.go

@@ -0,0 +1,364 @@
+package main
+
+import (
+    "encoding/json"
+    "fmt"
+    "net/http"
+    "strconv"
+    "strings"
+
+    etcdErr "github.com/coreos/etcd/error"
+    "github.com/coreos/etcd/store"
+    "github.com/coreos/go-raft"
+)
+
+//-------------------------------------------------------------------
+// Handlers to handle etcd-store related request via etcd url
+//-------------------------------------------------------------------
+
+func NewEtcdMuxer() *http.ServeMux {
+    // external commands
+    router := mux.NewRouter()
+    etcdMux.Handle("/v2/keys/", errorHandler(e.Multiplexer))
+    etcdMux.Handle("/v2/leader", errorHandler(e.LeaderHttpHandler))
+    etcdMux.Handle("/v2/machines", errorHandler(e.MachinesHttpHandler))
+    etcdMux.Handle("/v2/stats/", errorHandler(e.StatsHttpHandler))
+    etcdMux.Handle("/version", errorHandler(e.VersionHttpHandler))
+    etcdMux.HandleFunc("/test/", TestHttpHandler)
+
+    // backward support
+    etcdMux.Handle("/v1/keys/", errorHandler(e.MultiplexerV1))
+    etcdMux.Handle("/v1/leader", errorHandler(e.LeaderHttpHandler))
+    etcdMux.Handle("/v1/machines", errorHandler(e.MachinesHttpHandler))
+    etcdMux.Handle("/v1/stats/", errorHandler(e.StatsHttpHandler))
+
+    return etcdMux
+}
+
+type errorHandler func(http.ResponseWriter, *http.Request) error
+
+// addCorsHeader parses the request Origin header and loops through the user
+// provided allowed origins and sets the Access-Control-Allow-Origin header if
+// there is a match.
+func addCorsHeader(w http.ResponseWriter, r *http.Request) {
+    val, ok := corsList["*"]
+    if val && ok {
+        w.Header().Add("Access-Control-Allow-Origin", "*")
+        return
+    }
+
+    requestOrigin := r.Header.Get("Origin")
+    val, ok = corsList[requestOrigin]
+    if val && ok {
+        w.Header().Add("Access-Control-Allow-Origin", requestOrigin)
+        return
+    }
+}
+
+func (fn errorHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+    addCorsHeader(w, r)
+    if e := fn(w, r); e != nil {
+        if etcdErr, ok := e.(*etcdErr.Error); ok {
+            debug("Return error: ", (*etcdErr).Error())
+            etcdErr.Write(w)
+        } else {
+            http.Error(w, e.Error(), http.StatusInternalServerError)
+        }
+    }
+}
+
+// Multiplex GET/POST/DELETE request to corresponding handlers
+func (e *etcdServer) Multiplexer(w http.ResponseWriter, req *http.Request) error {
+
+    switch req.Method {
+    case "GET":
+        return e.GetHttpHandler(w, req)
+    case "POST":
+        return e.CreateHttpHandler(w, req)
+    case "PUT":
+        return e.UpdateHttpHandler(w, req)
+    case "DELETE":
+        return e.DeleteHttpHandler(w, req)
+    default:
+        w.WriteHeader(http.StatusMethodNotAllowed)
+        return nil
+    }
+
+    return nil
+}
+
+//--------------------------------------
+// State sensitive handlers
+// Set/Delete will dispatch to leader
+//--------------------------------------
+
+func (e *etcdServer) CreateHttpHandler(w http.ResponseWriter, req *http.Request) error {
+    key := getNodePath(req.URL.Path)
+
+    debugf("recv.post[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
+
+    value := req.FormValue("value")
+
+    expireTime, err := durationToExpireTime(req.FormValue("ttl"))
+
+    if err != nil {
+        return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Create", store.UndefIndex, store.UndefTerm)
+    }
+
+    command := &CreateCommand{
+        Key:        key,
+        Value:      value,
+        ExpireTime: expireTime,
+    }
+
+    if req.FormValue("incremental") == "true" {
+        command.IncrementalSuffix = true
+    }
+
+    return e.dispatchEtcdCommand(command, w, req)
+
+}
+
+func (e *etcdServer) UpdateHttpHandler(w http.ResponseWriter, req *http.Request) error {
+    key := getNodePath(req.URL.Path)
+
+    debugf("recv.put[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
+
+    req.ParseForm()
+
+    value := req.Form.Get("value")
+
+    expireTime, err := durationToExpireTime(req.Form.Get("ttl"))
+
+    if err != nil {
+        return etcdErr.NewError(etcdErr.EcodeTTLNaN, "Update", store.UndefIndex, store.UndefTerm)
+    }
+
+    // update should give at least one option
+    if value == "" && expireTime.Sub(store.Permanent) == 0 {
+        return etcdErr.NewError(etcdErr.EcodeValueOrTTLRequired, "Update", store.UndefIndex, store.UndefTerm)
+    }
+
+    prevValue, valueOk := req.Form["prevValue"]
+
+    prevIndexStr, indexOk := req.Form["prevIndex"]
+
+    if !valueOk && !indexOk { // update without test
+        command := &UpdateCommand{
+            Key:        key,
+            Value:      value,
+            ExpireTime: expireTime,
+        }
+
+        return e.dispatchEtcdCommand(command, w, req)
+
+    } else { // update with test
+        var prevIndex uint64
+
+        if indexOk {
+            prevIndex, err = strconv.ParseUint(prevIndexStr[0], 10, 64)
+
+            // bad previous index
+            if err != nil {
+                return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Update", store.UndefIndex, store.UndefTerm)
+            }
+        } else {
+            prevIndex = 0
+        }
+
+        command := &TestAndSetCommand{
+            Key:       key,
+            Value:     value,
+            PrevValue: prevValue[0],
+            PrevIndex: prevIndex,
+        }
+
+        return e.dispatchEtcdCommand(command, w, req)
+    }
+
+}
+
+// Delete Handler
+func (e *etcdServer) DeleteHttpHandler(w http.ResponseWriter, req *http.Request) error {
+    key := getNodePath(req.URL.Path)
+
+    debugf("recv.delete[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
+
+    command := &DeleteCommand{
+        Key: key,
+    }
+
+    if req.FormValue("recursive") == "true" {
+        command.Recursive = true
+    }
+
+    return e.dispatchEtcdCommand(command, w, req)
+}
+
+// Dispatch the command to leader
+func (e *etcdServer) dispatchEtcdCommand(c Command, w http.ResponseWriter, req *http.Request) error {
+    return e.raftServer.dispatch(c, w, req, nameToEtcdURL)
+}
+
+//--------------------------------------
+// State non-sensitive handlers
+// command with consistent option will
+// still dispatch to the leader
+//--------------------------------------
+
+// Handler to return the current leader's raft address
+func (e *etcdServer) LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
+    r := e.raftServer
+
+    leader := r.Leader()
+
+    if leader != "" {
+        w.WriteHeader(http.StatusOK)
+        raftURL, _ := nameToRaftURL(leader)
+        w.Write([]byte(raftURL))
+
+        return nil
+    } else {
+        return etcdErr.NewError(etcdErr.EcodeLeaderElect, "", store.UndefIndex, store.UndefTerm)
+    }
+}
+
+// Handler to return all the known machines in the current cluster
+func (e *etcdServer) MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
+    machines := e.raftServer.getMachines(nameToEtcdURL)
+
+    w.WriteHeader(http.StatusOK)
+    w.Write([]byte(strings.Join(machines, ", ")))
+
+    return nil
+}
+
+// Handler to return the current version of etcd
+func (e *etcdServer) VersionHttpHandler(w http.ResponseWriter, req *http.Request) error {
+    w.WriteHeader(http.StatusOK)
+    fmt.Fprintf(w, "etcd %s", releaseVersion)
+
+    return nil
+}
+
+// Handler to return the basic stats of etcd
+func (e *etcdServer) StatsHttpHandler(w http.ResponseWriter, req *http.Request) error {
+    option := req.URL.Path[len("/v1/stats/"):]
+    w.WriteHeader(http.StatusOK)
+
+    r := e.raftServer
+
+    switch option {
+    case "self":
+        w.Write(r.Stats())
+    case "leader":
+        if r.State() == raft.Leader {
+            w.Write(r.PeerStats())
+        } else {
+            leader := r.Leader()
+            // current no leader
+            if leader == "" {
+                return etcdErr.NewError(300, "", store.UndefIndex, store.UndefTerm)
+            }
+            hostname, _ := nameToEtcdURL(leader)
+            redirect(hostname, w, req)
+        }
+    case "store":
+        w.Write(etcdStore.JsonStats())
+    }
+
+    return nil
+}
+
+func (e *etcdServer) GetHttpHandler(w http.ResponseWriter, req *http.Request) error {
+    var err error
+    var event interface{}
+
+    r := e.raftServer
+
+    debugf("recv.get[%v] [%v%v]\n", req.RemoteAddr, req.Host, req.URL)
+
+    if req.FormValue("consistent") == "true" && r.State() != raft.Leader {
+        // help client to redirect the request to the current leader
+        leader := r.Leader()
+        hostname, _ := nameToEtcdURL(leader)
+        redirect(hostname, w, req)
+        return nil
+    }
+
+    key := getNodePath(req.URL.Path)
+
+    recursive := req.FormValue("recursive")
+
+    if req.FormValue("wait") == "true" { // watch
+        command := &WatchCommand{
+            Key: key,
+        }
+
+        if recursive == "true" {
+            command.Recursive = true
+        }
+
+        indexStr := req.FormValue("wait_index")
+        if indexStr != "" {
+            sinceIndex, err := strconv.ParseUint(indexStr, 10, 64)
+
+            if err != nil {
+                return etcdErr.NewError(etcdErr.EcodeIndexNaN, "Watch From Index", store.UndefIndex, store.UndefTerm)
+            }
+
+            command.SinceIndex = sinceIndex
+        }
+
+        event, err = command.Apply(r.Server)
+
+    } else { //get
+
+        command := &GetCommand{
+            Key: key,
+        }
+
+        sorted := req.FormValue("sorted")
+        if sorted == "true" {
+            command.Sorted = true
+        }
+
+        if recursive == "true" {
+            command.Recursive = true
+        }
+
+        event, err = command.Apply(r.Server)
+    }
+
+    if err != nil {
+        return err
+
+    } else {
+        event, _ := event.(*store.Event)
+        bytes, _ := json.Marshal(event)
+
+        w.Header().Add("X-Etcd-Index", fmt.Sprint(event.Index))
+        w.Header().Add("X-Etcd-Term", fmt.Sprint(event.Term))
+        w.WriteHeader(http.StatusOK)
+
+        w.Write(bytes)
+
+        return nil
+    }
+
+}
+
+// TestHandler
+func TestHttpHandler(w http.ResponseWriter, req *http.Request) {
+    testType := req.URL.Path[len("/test/"):]
+
+    if testType == "speed" {
+        directSet()
+        w.WriteHeader(http.StatusOK)
+        w.Write([]byte("speed test success"))
+
+        return
+    }
+
+    w.WriteHeader(http.StatusBadRequest)
+}