Xiang Li 12 years ago
parent
commit
c3533d6ac2
4 changed files with 75 additions and 24 deletions
  1. 7 17
      etcd_handlers.go
  2. 14 6
      etcd_long_test.go
  3. 40 0
      machines.go
  4. 14 1
      test.go

+ 7 - 17
etcd_handlers.go

@@ -6,6 +6,7 @@ import (
 	"github.com/coreos/go-raft"
 	"net/http"
 	"strconv"
+	"strings"
 )
 
 //-------------------------------------------------------------------
@@ -120,6 +121,7 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 
 // Dispatch the command to leader
 func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) {
+
 	if raftServer.State() == raft.Leader {
 		if body, err := raftServer.Do(c); err != nil {
 
@@ -181,6 +183,9 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) {
 
 		if etcd {
 			etcdAddr, _ := nameToEtcdURL(leader)
+			if etcdAddr == "" {
+				panic(leader)
+			}
 			url = etcdAddr + path
 		} else {
 			raftAddr, _ := nameToRaftURL(leader)
@@ -222,25 +227,10 @@ func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 // Handler to return all the known machines in the current cluster
 func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
-	peers := raftServer.Peers()
-
-	// Add itself to the machine list first
-	// Since peer map does not contain the server itself
-	machines := info.EtcdURL
-
-	// Add all peers to the list and separate by comma
-	// We do not use json here since we accept machines list
-	// in the command line separate by comma.
-
-	for peerName, _ := range peers {
-		if addr, ok := nameToEtcdURL(peerName); ok {
-			machines = machines + "," + addr
-		}
-	}
+	machines := getMachines()
 
 	w.WriteHeader(http.StatusOK)
-	w.Write([]byte(machines))
-
+	w.Write([]byte(strings.Join(machines, ", ")))
 }
 
 // Handler to return the current version of etcd

+ 14 - 6
etcd_long_test.go

@@ -26,11 +26,13 @@ func TestKillLeader(t *testing.T) {
 
 	defer destroyCluster(etcds)
 
+	stop := make(chan bool)
 	leaderChan := make(chan string, 1)
+	all := make(chan bool, 1)
 
 	time.Sleep(time.Second)
 
-	go leaderMonitor(clusterSize, 1, leaderChan)
+	go monitor(clusterSize, 1, leaderChan, all, stop)
 
 	var totalTime time.Duration
 
@@ -61,6 +63,7 @@ func TestKillLeader(t *testing.T) {
 		fmt.Println("Leader election time average is", avgTime, "with election timeout", ElectionTimeout)
 		etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr)
 	}
+	stop<-true
 }
 
 // TestKillRandom kills random machines in the cluster and
@@ -78,16 +81,19 @@ func TestKillRandom(t *testing.T) {
 
 	defer destroyCluster(etcds)
 
+	stop := make(chan bool)
 	leaderChan := make(chan string, 1)
+	all := make(chan bool, 1)
 
 	time.Sleep(3 * time.Second)
 
-	go leaderMonitor(clusterSize, 4, leaderChan)
+
+	go monitor(clusterSize, 4, leaderChan, all, stop)
 
 	toKill := make(map[int]bool)
 
-	for i := 0; i < 20; i++ {
-		fmt.Printf("TestKillRandom Round[%d/20]\n", i)
+	for i := 0; i < 200; i++ {
+		fmt.Printf("TestKillRandom Round[%d/200]\n", i)
 
 		j := 0
 		for {
@@ -109,6 +115,8 @@ func TestKillRandom(t *testing.T) {
 			etcds[num].Release()
 		}
 
+		time.Sleep(ElectionTimeout)
+
 		<-leaderChan
 
 		for num, _ := range toKill {
@@ -116,10 +124,10 @@ func TestKillRandom(t *testing.T) {
 		}
 
 		toKill = make(map[int]bool)
+		<-all
 	}
 
-	<-leaderChan
-
+	stop<-true
 }
 
 func templateBenchmarkEtcdDirectCall(b *testing.B, tls bool) {

+ 40 - 0
machines.go

@@ -6,3 +6,43 @@ func machineNum() int {
 
 	return len(response)
 }
+
+// getMachines gets the current machines in the cluster
+func getMachines() []string {
+
+	peers := raftServer.Peers()
+
+	machines := make([]string, len(peers)+1)
+
+	leader, _ := nameToEtcdURL(raftServer.Leader())
+
+ 	i := 0
+
+	if leader != "" {
+
+		// Add leader at the first of the machines list
+		// Add server itself to the machine list
+		// Since peer map does not contain the server itself
+		if leader == info.EtcdURL {
+			machines[i] = info.EtcdURL
+			i++
+		} else {
+			machines[i] = leader
+			i++
+			machines[i] = info.EtcdURL
+			i++ 
+		}
+	}
+
+	// Add all peers to the slice
+	for peerName, _ := range peers {
+		if machine, ok := nameToEtcdURL(peerName); ok {
+			// do not add leader twice
+			if machine != leader {
+				machines[i] = machine
+				i++
+			}
+		}
+	}
+	return machines
+}

+ 14 - 1
test.go

@@ -118,7 +118,7 @@ func destroyCluster(etcds []*os.Process) error {
 }
 
 //
-func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
+func monitor(size int, allowDeadNum int, leaderChan chan string, all chan bool, stop chan bool) {
 	leaderMap := make(map[int]string)
 	baseAddrFormat := "http://0.0.0.0:400%d"
 
@@ -131,6 +131,7 @@ func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
 			leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+1))
 
 			if err == nil {
+				//fmt.Printf("leader:[%d]->%s\n", i, leader)
 				leaderMap[i] = leader
 
 				if knownLeader == "unknown" {
@@ -143,6 +144,7 @@ func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
 				}
 
 			} else {
+				//fmt.Printf("dead: [%d]\n", i)
 				dead++
 				if dead > allowDeadNum {
 					break
@@ -152,7 +154,10 @@ func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
 		}
 
 		if i == size {
+			//fmt.Println("leader found")
 			select {
+			case <- stop:
+				return
 			case <-leaderChan:
 				leaderChan <- knownLeader
 			default:
@@ -160,6 +165,14 @@ func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
 			}
 
 		}
+		if dead == 0 {
+			select {
+			case <-all:
+				all <- true
+			default:
+				all <- true
+			}
+		}
 
 		time.Sleep(time.Millisecond * 10)
 	}