Browse Source

Merge pull request #134 from xiangli-cmu/removePeer

add remove peer
Xiang Li 12 years ago
parent
commit
41b2175fe0
6 changed files with 241 additions and 37 deletions
  1. 59 3
      command.go
  2. 1 1
      etcd_handlers.go
  3. 104 0
      etcd_test.go
  4. 4 4
      machines.go
  5. 18 0
      raft_handlers.go
  6. 55 29
      raft_server.go

+ 59 - 3
command.go

@@ -1,11 +1,13 @@
 package main
 
 import (
+	"encoding/binary"
 	"encoding/json"
 	"fmt"
 	etcdErr "github.com/coreos/etcd/error"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/go-raft"
+	"os"
 	"path"
 	"time"
 )
@@ -143,15 +145,18 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 	// check if the join command is from a previous machine, who lost all its previous log.
 	response, _ := etcdStore.RawGet(path.Join("_etcd/machines", c.Name))
 
+	b := make([]byte, 8)
+	binary.PutUvarint(b, raftServer.CommitIndex())
+
 	if response != nil {
-		return []byte("join success"), nil
+		return b, nil
 	}
 
 	// check machine number in the cluster
 	num := machineNum()
 	if num == maxClusterSize {
 		debug("Reject join request from ", c.Name)
-		return []byte("join fail"), etcdErr.NewError(103, "")
+		return []byte{0}, etcdErr.NewError(103, "")
 	}
 
 	addNameToURL(c.Name, c.RaftVersion, c.RaftURL, c.EtcdURL)
@@ -164,9 +169,60 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 	value := fmt.Sprintf("raft=%s&etcd=%s&raftVersion=%s", c.RaftURL, c.EtcdURL, c.RaftVersion)
 	etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
 
-	return []byte("join success"), err
+	return b, err
 }
 
 func (c *JoinCommand) NodeName() string {
 	return c.Name
 }
+
+// RemoveCommand
+type RemoveCommand struct {
+	Name string `json:"name"`
+}
+
+// The name of the remove command in the log
+func (c *RemoveCommand) CommandName() string {
+	return commandName("remove")
+}
+
+// Remove a server from the cluster
+func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) {
+
+	// remove machine in etcd storage
+	key := path.Join("_etcd/machines", c.Name)
+
+	_, err := etcdStore.Delete(key, raftServer.CommitIndex())
+
+	if err != nil {
+		return []byte{0}, err
+	}
+
+	// remove peer in raft
+	err = raftServer.RemovePeer(c.Name)
+
+	if err != nil {
+		return []byte{0}, err
+	}
+
+	if c.Name == raftServer.Name() {
+		// the removed node is this node
+
+		// if the node is not replaying the previous logs
+		// and the node has sent out a join request in this
+		// start. It is sure that this node received a new remove
+		// command and need to be removed
+		if raftServer.CommitIndex() > r.joinIndex && r.joinIndex != 0 {
+			debugf("server [%s] is removed", raftServer.Name())
+			os.Exit(0)
+		} else {
+			// else ignore remove
+			debugf("ignore previous remove command.")
+		}
+	}
+
+	b := make([]byte, 8)
+	binary.PutUvarint(b, raftServer.CommitIndex())
+
+	return b, err
+}

+ 1 - 1
etcd_handlers.go

@@ -189,7 +189,7 @@ func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) error {
 
 // Handler to return all the known machines in the current cluster
 func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) error {
-	machines := getMachines()
+	machines := getMachines(nameToEtcdURL)
 
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte(strings.Join(machines, ", ")))

+ 104 - 0
etcd_test.go

@@ -444,6 +444,110 @@ func TestKillRandom(t *testing.T) {
 	stop <- true
 }
 
+// remove the node and node rejoin with previous log
+func TestRemoveNode(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 3
+	argGroup, etcds, _ := test.CreateCluster(clusterSize, procAttr, false)
+
+	time.Sleep(time.Second)
+
+	c := etcd.NewClient()
+
+	c.SyncCluster()
+
+	rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/remove/node3", nil)
+
+	client := &http.Client{}
+	for i := 0; i < 2; i++ {
+		for i := 0; i < 2; i++ {
+			client.Do(rmReq)
+
+			etcds[2].Wait()
+
+			resp, err := c.Get("_etcd/machines")
+
+			if err != nil {
+				panic(err)
+			}
+
+			if len(resp) != 2 {
+				t.Fatal("cannot remove machine")
+			}
+
+			if i == 1 {
+				// rejoin with log
+				etcds[2], err = os.StartProcess("etcd", argGroup[2], procAttr)
+			} else {
+				// rejoin without log
+				etcds[2], err = os.StartProcess("etcd", append(argGroup[2], "-f"), procAttr)
+			}
+
+			if err != nil {
+				panic(err)
+			}
+
+			time.Sleep(time.Second)
+
+			resp, err = c.Get("_etcd/machines")
+
+			if err != nil {
+				panic(err)
+			}
+
+			if len(resp) != 3 {
+				t.Fatal("add machine fails")
+			}
+		}
+
+		// first kill the node, then remove it, then add it back
+		for i := 0; i < 2; i++ {
+			etcds[2].Kill()
+			etcds[2].Wait()
+
+			client.Do(rmReq)
+
+			resp, err := c.Get("_etcd/machines")
+
+			if err != nil {
+				panic(err)
+			}
+
+			if len(resp) != 2 {
+				t.Fatal("cannot remove machine")
+			}
+
+			if i == 1 {
+				// rejoin with log
+				etcds[2], err = os.StartProcess("etcd", append(argGroup[2]), procAttr)
+			} else {
+				// rejoin without log
+				etcds[2], err = os.StartProcess("etcd", append(argGroup[2], "-f"), procAttr)
+			}
+
+			if err != nil {
+				panic(err)
+			}
+
+			time.Sleep(time.Second)
+
+			resp, err = c.Get("_etcd/machines")
+
+			if err != nil {
+				panic(err)
+			}
+
+			if len(resp) != 3 {
+				t.Fatal("add machine fails")
+			}
+		}
+	}
+	test.DestroyCluster(etcds)
+
+}
+
 func templateBenchmarkEtcdDirectCall(b *testing.B, tls bool) {
 	procAttr := new(os.ProcAttr)
 	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}

+ 4 - 4
machines.go

@@ -8,14 +8,14 @@ func machineNum() int {
 }
 
 // getMachines gets the current machines in the cluster
-func getMachines() []string {
+func getMachines(toURL func(string) (string, bool)) []string {
 
 	peers := r.Peers()
 
 	machines := make([]string, len(peers)+1)
 
-	leader, ok := nameToEtcdURL(r.Leader())
-	self := e.url
+	leader, ok := toURL(r.Leader())
+	self, _ := toURL(r.Name())
 	i := 1
 
 	if ok {
@@ -30,7 +30,7 @@ func getMachines() []string {
 
 	// Add all peers to the slice
 	for peerName, _ := range peers {
-		if machine, ok := nameToEtcdURL(peerName); ok {
+		if machine, ok := toURL(peerName); ok {
 			// do not add leader twice
 			if machine != leader {
 				machines[i] = machine

+ 18 - 0
raft_handlers.go

@@ -107,6 +107,24 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) error {
 	}
 }
 
+// Response to remove request
+func RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
+	if req.Method != "DELETE" {
+		w.WriteHeader(http.StatusMethodNotAllowed)
+		return
+	}
+
+	nodeName := req.URL.Path[len("/remove/"):]
+	command := &RemoveCommand{
+		Name: nodeName,
+	}
+
+	debugf("[recv] Remove Request [%s]", command.Name)
+
+	dispatch(command, w, req, false)
+
+}
+
 // Response to the name request
 func NameHttpHandler(w http.ResponseWriter, req *http.Request) {
 	debugf("[recv] Get %s/name/ ", r.url)

+ 55 - 29
raft_server.go

@@ -3,6 +3,7 @@ package main
 import (
 	"bytes"
 	"crypto/tls"
+	"encoding/binary"
 	"encoding/json"
 	"fmt"
 	etcdErr "github.com/coreos/etcd/error"
@@ -15,11 +16,12 @@ import (
 
 type raftServer struct {
 	*raft.Server
-	version string
-	name    string
-	url     string
-	tlsConf *TLSConfig
-	tlsInfo *TLSInfo
+	version   string
+	joinIndex uint64
+	name      string
+	url       string
+	tlsConf   *TLSConfig
+	tlsInfo   *TLSInfo
 }
 
 var r *raftServer
@@ -77,7 +79,21 @@ func (r *raftServer) ListenAndServe() {
 		}
 
 	} else {
+
 		// rejoin the previous cluster
+		cluster = getMachines(nameToRaftURL)
+		for i := 0; i < len(cluster); i++ {
+			u, err := url.Parse(cluster[i])
+			if err != nil {
+				debug("rejoin cannot parse url: ", err)
+			}
+			cluster[i] = u.Host
+		}
+		ok := joinCluster(cluster)
+		if !ok {
+			warn("the whole cluster dies! restart the cluster")
+		}
+
 		debugf("%s restart as a follower", r.name)
 	}
 
@@ -105,26 +121,10 @@ func startAsLeader() {
 func startAsFollower() {
 	// start as a follower in a existing cluster
 	for i := 0; i < retryTimes; i++ {
-
-		for _, machine := range cluster {
-
-			if len(machine) == 0 {
-				continue
-			}
-
-			err := joinCluster(r.Server, machine, r.tlsConf.Scheme)
-			if err == nil {
-				debugf("%s success join to the cluster via machine %s", r.name, machine)
-				return
-
-			} else {
-				if _, ok := err.(etcdErr.Error); ok {
-					fatal(err)
-				}
-				debugf("cannot join to cluster via machine %s %s", machine, err)
-			}
+		ok := joinCluster(cluster)
+		if ok {
+			return
 		}
-
 		warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
 		time.Sleep(time.Second * RetryInterval)
 	}
@@ -149,6 +149,7 @@ func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {
 	raftMux.HandleFunc("/name", NameHttpHandler)
 	raftMux.HandleFunc("/version", RaftVersionHttpHandler)
 	raftMux.Handle("/join", errorHandler(JoinHttpHandler))
+	raftMux.HandleFunc("/remove/", RemoveHttpHandler)
 	raftMux.HandleFunc("/vote", VoteHttpHandler)
 	raftMux.HandleFunc("/log", GetLogHttpHandler)
 	raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
@@ -180,15 +181,37 @@ func getVersion(t transporter, versionURL url.URL) (string, error) {
 	return string(body), nil
 }
 
-// Send join requests to the leader.
-func joinCluster(s *raft.Server, raftURL string, scheme string) error {
+func joinCluster(cluster []string) bool {
+	for _, machine := range cluster {
+
+		if len(machine) == 0 {
+			continue
+		}
+
+		err := joinByMachine(r.Server, machine, r.tlsConf.Scheme)
+		if err == nil {
+			debugf("%s success join to the cluster via machine %s", r.name, machine)
+			return true
+
+		} else {
+			if _, ok := err.(etcdErr.Error); ok {
+				fatal(err)
+			}
+			debugf("cannot join to cluster via machine %s %s", machine, err)
+		}
+	}
+	return false
+}
+
+// Send join requests to machine.
+func joinByMachine(s *raft.Server, machine string, scheme string) error {
 	var b bytes.Buffer
 
 	// t must be ok
 	t, _ := r.Transporter().(transporter)
 
 	// Our version must match the leaders version
-	versionURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/version"}
+	versionURL := url.URL{Host: machine, Scheme: scheme, Path: "/version"}
 	version, err := getVersion(t, versionURL)
 	if err != nil {
 		return fmt.Errorf("Unable to join: %v", err)
@@ -202,9 +225,9 @@ func joinCluster(s *raft.Server, raftURL string, scheme string) error {
 
 	json.NewEncoder(&b).Encode(newJoinCommand())
 
-	joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"}
+	joinURL := url.URL{Host: machine, Scheme: scheme, Path: "/join"}
 
-	debugf("Send Join Request to %s", raftURL)
+	debugf("Send Join Request to %s", joinURL.String())
 
 	resp, err := t.Post(joinURL.String(), &b)
 
@@ -215,6 +238,8 @@ func joinCluster(s *raft.Server, raftURL string, scheme string) error {
 		if resp != nil {
 			defer resp.Body.Close()
 			if resp.StatusCode == http.StatusOK {
+				b, _ := ioutil.ReadAll(resp.Body)
+				r.joinIndex, _ = binary.Uvarint(b)
 				return nil
 			}
 			if resp.StatusCode == http.StatusTemporaryRedirect {
@@ -244,6 +269,7 @@ func joinCluster(s *raft.Server, raftURL string, scheme string) error {
 // Register commands to raft server
 func registerCommands() {
 	raft.RegisterCommand(&JoinCommand{})
+	raft.RegisterCommand(&RemoveCommand{})
 	raft.RegisterCommand(&SetCommand{})
 	raft.RegisterCommand(&GetCommand{})
 	raft.RegisterCommand(&DeleteCommand{})