Browse Source

Add version upgrade endpoint.

Ben Johnson 12 years ago
parent
commit
ddf527e092

+ 38 - 12
server/peer_server.go

@@ -17,6 +17,7 @@ import (
 	"github.com/coreos/etcd/log"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/go-raft"
+	"github.com/gorilla/mux"
 )
 
 type PeerServer struct {
@@ -236,25 +237,27 @@ func (s *PeerServer) startAsFollower(cluster []string) {
 func (s *PeerServer) startTransport(scheme string, tlsConf tls.Config) error {
 	log.Infof("raft server [name %s, listen on %s, advertised url %s]", s.name, s.listenHost, s.url)
 
-	raftMux := http.NewServeMux()
+	router := mux.NewRouter()
 
 	s.httpServer = &http.Server{
-		Handler:   raftMux,
+		Handler:   router,
 		TLSConfig: &tlsConf,
 		Addr:      s.listenHost,
 	}
 
 	// internal commands
-	raftMux.HandleFunc("/name", s.NameHttpHandler)
-	raftMux.HandleFunc("/version", s.VersionHttpHandler)
-	raftMux.HandleFunc("/join", s.JoinHttpHandler)
-	raftMux.HandleFunc("/remove/", s.RemoveHttpHandler)
-	raftMux.HandleFunc("/vote", s.VoteHttpHandler)
-	raftMux.HandleFunc("/log", s.GetLogHttpHandler)
-	raftMux.HandleFunc("/log/append", s.AppendEntriesHttpHandler)
-	raftMux.HandleFunc("/snapshot", s.SnapshotHttpHandler)
-	raftMux.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler)
-	raftMux.HandleFunc("/etcdURL", s.EtcdURLHttpHandler)
+	router.HandleFunc("/name", s.NameHttpHandler)
+	router.HandleFunc("/version", s.VersionHttpHandler)
+	router.HandleFunc("/version/{version:[0-9]+}/check", s.VersionCheckHttpHandler)
+	router.HandleFunc("/upgrade", s.UpgradeHttpHandler)
+	router.HandleFunc("/join", s.JoinHttpHandler)
+	router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler)
+	router.HandleFunc("/vote", s.VoteHttpHandler)
+	router.HandleFunc("/log", s.GetLogHttpHandler)
+	router.HandleFunc("/log/append", s.AppendEntriesHttpHandler)
+	router.HandleFunc("/snapshot", s.SnapshotHttpHandler)
+	router.HandleFunc("/snapshotRecovery", s.SnapshotRecoveryHttpHandler)
+	router.HandleFunc("/etcdURL", s.EtcdURLHttpHandler)
 
 	if scheme == "http" {
 		return s.listenAndServe()
@@ -283,6 +286,29 @@ func getVersion(t *transporter, versionURL url.URL) (int, error) {
 	return version, nil
 }
 
+// Upgradable checks whether all peers in a cluster support an upgrade to the next store version.
+func (s *PeerServer) Upgradable() error {
+	nextVersion := s.store.Version() + 1
+	for _, peerURL := range s.registry.PeerURLs(s.raftServer.Leader(), s.name) {
+		u, err := url.Parse(peerURL)
+		if err != nil {
+			return fmt.Errorf("PeerServer: Cannot parse URL: '%s' (%s)", peerURL, err)
+		}
+
+		t, _ := s.raftServer.Transporter().(*transporter)
+		checkURL := (&url.URL{Host: u.Host, Scheme: s.tlsConf.Scheme, Path: fmt.Sprintf("/version/%d/check", nextVersion)}).String()
+		resp, _, err := t.Get(checkURL)
+		if err != nil {
+			return fmt.Errorf("PeerServer: Cannot check version compatibility: %s", u.Host)
+		}
+		if resp.StatusCode != 200 {
+			return fmt.Errorf("PeerServer: Version %d is not compatible with peer: %s", nextVersion, u.Host)
+		}
+	}
+
+	return nil
+}
+
 func (s *PeerServer) joinCluster(cluster []string) bool {
 	for _, machine := range cluster {
 		if len(machine) == 0 {

+ 36 - 2
server/peer_server_handlers.go

@@ -7,7 +7,9 @@ import (
 
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/log"
+	"github.com/coreos/etcd/store"
 	"github.com/coreos/go-raft"
+	"github.com/gorilla/mux"
 )
 
 // Get all the current logs
@@ -134,9 +136,9 @@ func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request
 		return
 	}
 
-	nodeName := req.URL.Path[len("/remove/"):]
+	vars := mux.Vars(req)
 	command := &RemoveCommand{
-		Name: nodeName,
+		Name: vars["name"],
 	}
 
 	log.Debugf("[recv] Remove Request [%s]", command.Name)
@@ -157,3 +159,35 @@ func (ps *PeerServer) VersionHttpHandler(w http.ResponseWriter, req *http.Reques
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte(strconv.Itoa(ps.store.Version())))
 }
+
+// Checks whether a given version is supported.
+func (ps *PeerServer) VersionCheckHttpHandler(w http.ResponseWriter, req *http.Request) {
+	log.Debugf("[recv] Get %s%s ", ps.url, req.URL.Path)
+	vars := mux.Vars(req)
+	version, _ := strconv.Atoi(vars["version"])
+	if version >= store.MinVersion() && version <= store.MaxVersion() {
+		w.WriteHeader(http.StatusOK)
+	} else {
+		w.WriteHeader(http.StatusForbidden)
+	}
+}
+
+// Upgrades the current store version to the next version.
+func (ps *PeerServer) UpgradeHttpHandler(w http.ResponseWriter, req *http.Request) {
+	log.Debugf("[recv] Get %s/version", ps.url)
+
+	// Check if upgrade is possible for all nodes.
+	if err := ps.Upgradable(); err != nil {
+		http.Error(w, err.Error(), http.StatusInternalServerError)
+		return
+	}
+
+	// Create an upgrade command from the current version.
+	c := ps.store.CommandFactory().CreateUpgradeCommand()
+	if err := ps.server.Dispatch(c, w, req); err != nil {
+		http.Error(w, err.Error(), http.StatusInternalServerError)
+		return
+	}
+
+	w.WriteHeader(http.StatusOK)
+}

+ 1 - 0
store/command_factory.go

@@ -15,6 +15,7 @@ var minVersion, maxVersion int
 // depending on the current version of the store.
 type CommandFactory interface {
 	Version() int
+	CreateUpgradeCommand() raft.Command
 	CreateSetCommand(key string, value string, expireTime time.Time) raft.Command
 	CreateCreateCommand(key string, value string, expireTime time.Time, unique bool) raft.Command
 	CreateUpdateCommand(key string, value string, expireTime time.Time) raft.Command

+ 5 - 0
store/v2/command_factory.go

@@ -20,6 +20,11 @@ func (f *CommandFactory) Version() int {
 	return 2
 }
 
+// CreateUpgradeCommand is a no-op since version 2 is the first version to support store versioning.
+func (f *CommandFactory) CreateUpgradeCommand() raft.Command {
+	return &raft.NOPCommand{}
+}
+
 // CreateSetCommand creates a version 2 command to set a key to a given value in the store.
 func (f *CommandFactory) CreateSetCommand(key string, value string, expireTime time.Time) raft.Command {
 	return &SetCommand{

+ 1 - 1
store/v2/compare_and_swap_command.go

@@ -23,7 +23,7 @@ type CompareAndSwapCommand struct {
 
 // The name of the testAndSet command in the log
 func (c *CompareAndSwapCommand) CommandName() string {
-	return "etcd:v2:compareAndSwap"
+	return "etcd:compareAndSwap"
 }
 
 // Set the key-value pair if the current value of the key equals to the given prevValue

+ 1 - 1
store/v2/create_command.go

@@ -22,7 +22,7 @@ type CreateCommand struct {
 
 // The name of the create command in the log
 func (c *CreateCommand) CommandName() string {
-	return "etcd:v2:create"
+	return "etcd:create"
 }
 
 // Create node

+ 1 - 1
store/v2/delete_command.go

@@ -18,7 +18,7 @@ type DeleteCommand struct {
 
 // The name of the delete command in the log
 func (c *DeleteCommand) CommandName() string {
-	return "etcd:v2:delete"
+	return "etcd:delete"
 }
 
 // Delete the key

+ 1 - 1
store/v2/set_command.go

@@ -21,7 +21,7 @@ type SetCommand struct {
 
 // The name of the create command in the log
 func (c *SetCommand) CommandName() string {
-	return "etcd:v2:set"
+	return "etcd:set"
 }
 
 // Create node

+ 1 - 1
store/v2/update_command.go

@@ -20,7 +20,7 @@ type UpdateCommand struct {
 
 // The name of the update command in the log
 func (c *UpdateCommand) CommandName() string {
-	return "etcd:v2:update"
+	return "etcd:update"
 }
 
 // Create node

+ 46 - 0
tests/functional/version_check_test.go

@@ -0,0 +1,46 @@
+package test
+
+import (
+	"net/http"
+	"os"
+	"testing"
+	"time"
+)
+
+// Ensure that a node can reply to a version check appropriately.
+func TestVersionCheck(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+	args := []string{"etcd", "-n=node1", "-f", "-d=/tmp/version_check"}
+
+	process, err := os.StartProcess(EtcdBinPath, args, procAttr)
+	if err != nil {
+		t.Fatal("start process failed:" + err.Error())
+		return
+	}
+	defer process.Kill()
+
+	time.Sleep(time.Second)
+
+	// Check a version too small.
+	resp, _ := http.Get("http://localhost:7001/version/1/check")
+	resp.Body.Close()
+	if resp.StatusCode != http.StatusForbidden {
+		t.Fatal("Invalid version check: ", resp.StatusCode)
+	}
+
+	// Check a version too large.
+	resp, _ = http.Get("http://localhost:7001/version/3/check")
+	resp.Body.Close()
+	if resp.StatusCode != http.StatusForbidden {
+		t.Fatal("Invalid version check: ", resp.StatusCode)
+	}
+
+	// Check a version that's just right.
+	resp, _ = http.Get("http://localhost:7001/version/2/check")
+	resp.Body.Close()
+	if resp.StatusCode != http.StatusOK {
+		t.Fatal("Invalid version check: ", resp.StatusCode)
+	}
+}
+