Browse Source

simplify remove/join process; add tests

Xiang Li 12 years ago
parent
commit
798d52e695
3 changed files with 123 additions and 36 deletions
  1. 2 16
      command.go
  2. 104 0
      etcd_test.go
  3. 17 20
      raft_server.go

+ 2 - 16
command.go

@@ -142,10 +142,6 @@ func (c *JoinCommand) CommandName() string {
 // Join a server to the cluster
 // Join a server to the cluster
 func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 
 
-	if c.Name == r.Name() {
-		r.pendingJoin = false
-	}
-
 	// check if the join command is from a previous machine, who lost all its previous log.
 	// check if the join command is from a previous machine, who lost all its previous log.
 	response, _ := etcdStore.RawGet(path.Join("_etcd/machines", c.Name))
 	response, _ := etcdStore.RawGet(path.Join("_etcd/machines", c.Name))
 
 
@@ -220,18 +216,8 @@ func (c *RemoveCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 			debugf("server [%s] is removed", raftServer.Name())
 			debugf("server [%s] is removed", raftServer.Name())
 			os.Exit(0)
 			os.Exit(0)
 		} else {
 		} else {
-			// the node is replaying previous logs and there is a join command
-			// afterwards, we should not exit
-
-			if r.joinIndex == 0 {
-				// if the node has not sent a join command in this start
-				// it will need to send a join command after replay the logs
-				r.pendingJoin = true
-			} else {
-				// else ignore remove
-				debugf("ignore previous remove command.")
-			}
-
+			// else ignore remove
+			debugf("ignore previous remove command.")
 		}
 		}
 	}
 	}
 
 

+ 104 - 0
etcd_test.go

@@ -444,6 +444,110 @@ func TestKillRandom(t *testing.T) {
 	stop <- true
 	stop <- true
 }
 }
 
 
+// remove the node and node rejoin with previous log
+func TestRemoveNode(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 3
+	argGroup, etcds, _ := test.CreateCluster(clusterSize, procAttr, false)
+
+	time.Sleep(time.Second)
+
+	c := etcd.NewClient()
+
+	c.SyncCluster()
+
+	rmReq, _ := http.NewRequest("DELETE", "http://127.0.0.1:7001/remove/node3", nil)
+
+	client := &http.Client{}
+	for i := 0; i < 2; i++ {
+		for i := 0; i < 2; i++ {
+			client.Do(rmReq)
+
+			etcds[2].Wait()
+
+			resp, err := c.Get("_etcd/machines")
+
+			if err != nil {
+				panic(err)
+			}
+
+			if len(resp) != 2 {
+				t.Fatal("cannot remove machine")
+			}
+
+			if i == 1 {
+				// rejoin with log
+				etcds[2], err = os.StartProcess("etcd", argGroup[2], procAttr)
+			} else {
+				// rejoin without log
+				etcds[2], err = os.StartProcess("etcd", append(argGroup[2], "-f"), procAttr)
+			}
+
+			if err != nil {
+				panic(err)
+			}
+
+			time.Sleep(time.Second)
+
+			resp, err = c.Get("_etcd/machines")
+
+			if err != nil {
+				panic(err)
+			}
+
+			if len(resp) != 3 {
+				t.Fatal("add machine fails")
+			}
+		}
+
+		// first kill the node, then remove it, then add it back
+		for i := 0; i < 2; i++ {
+			etcds[2].Kill()
+			etcds[2].Wait()
+
+			client.Do(rmReq)
+
+			resp, err := c.Get("_etcd/machines")
+
+			if err != nil {
+				panic(err)
+			}
+
+			if len(resp) != 2 {
+				t.Fatal("cannot remove machine")
+			}
+
+			if i == 1 {
+				// rejoin with log
+				etcds[2], err = os.StartProcess("etcd", append(argGroup[2]), procAttr)
+			} else {
+				// rejoin without log
+				etcds[2], err = os.StartProcess("etcd", append(argGroup[2], "-f"), procAttr)
+			}
+
+			if err != nil {
+				panic(err)
+			}
+
+			time.Sleep(time.Second)
+
+			resp, err = c.Get("_etcd/machines")
+
+			if err != nil {
+				panic(err)
+			}
+
+			if len(resp) != 3 {
+				t.Fatal("add machine fails")
+			}
+		}
+	}
+	test.DestroyCluster(etcds)
+
+}
+
 func templateBenchmarkEtcdDirectCall(b *testing.B, tls bool) {
 func templateBenchmarkEtcdDirectCall(b *testing.B, tls bool) {
 	procAttr := new(os.ProcAttr)
 	procAttr := new(os.ProcAttr)
 	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
 	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}

+ 17 - 20
raft_server.go

@@ -16,13 +16,12 @@ import (
 
 
 type raftServer struct {
 type raftServer struct {
 	*raft.Server
 	*raft.Server
-	version     string
-	joinIndex   uint64
-	pendingJoin bool
-	name        string
-	url         string
-	tlsConf     *TLSConfig
-	tlsInfo     *TLSInfo
+	version   string
+	joinIndex uint64
+	name      string
+	url       string
+	tlsConf   *TLSConfig
+	tlsInfo   *TLSInfo
 }
 }
 
 
 var r *raftServer
 var r *raftServer
@@ -81,22 +80,20 @@ func (r *raftServer) ListenAndServe() {
 
 
 	} else {
 	} else {
 
 
-		if r.pendingJoin {
-			cluster = getMachines(nameToRaftURL)
-			for i := 0; i < len(cluster); i++ {
-				u, err := url.Parse(cluster[i])
-				if err != nil {
-					debug("rejoin cannot parse url: ", err)
-				}
-				cluster[i] = u.Host
-			}
-			ok := joinCluster(cluster)
-			if !ok {
-				fatal("cannot rejoin to the cluster")
+		// rejoin the previous cluster
+		cluster = getMachines(nameToRaftURL)
+		for i := 0; i < len(cluster); i++ {
+			u, err := url.Parse(cluster[i])
+			if err != nil {
+				debug("rejoin cannot parse url: ", err)
 			}
 			}
+			cluster[i] = u.Host
+		}
+		ok := joinCluster(cluster)
+		if !ok {
+			fatal("cannot rejoin to the cluster")
 		}
 		}
 
 
-		// rejoin the previous cluster
 		debugf("%s restart as a follower", r.name)
 		debugf("%s restart as a follower", r.name)
 	}
 	}