Browse Source

add killleader test and leader monitor func

Xiang Li 12 years ago
parent
commit
00157ccdce
3 changed files with 227 additions and 78 deletions
  1. 62 0
      etcd_long_test.go
  2. 1 78
      etcd_test.go
  3. 164 0
      test.go

+ 62 - 0
etcd_long_test.go

@@ -0,0 +1,62 @@
+package main
+
+import (
+	"fmt"
+	"os"
+	"strconv"
+	"strings"
+	"testing"
+	"time"
+)
+
+// This test will kill the current leader and wait for the etcd cluster to elect a new leader for 200 times.
+// It will print out the election time and the average election time.
+func TestKillLeader(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 5
+	argGroup, etcds, err := createCluster(clusterSize, procAttr)
+
+	if err != nil {
+		t.Fatal("cannot create cluster")
+	}
+
+	defer destroyCluster(etcds)
+
+	leaderChan := make(chan string, 1)
+
+	time.Sleep(time.Second)
+
+	go leaderMonitor(clusterSize, 1, leaderChan)
+
+	var totalTime time.Duration
+
+	leader := "0.0.0.0:7001"
+
+	for i := 0; i < 200; i++ {
+		port, _ := strconv.Atoi(strings.Split(leader, ":")[1])
+		num := port - 7001
+		fmt.Println("kill server ", num)
+		etcds[num].Kill()
+		etcds[num].Release()
+
+		start := time.Now()
+		for {
+			newLeader := <-leaderChan
+			if newLeader != leader {
+				leader = newLeader
+				break
+			}
+		}
+		take := time.Now().Sub(start)
+
+		totalTime += take
+		avgTime := totalTime / (time.Duration)(i+1)
+
+		fmt.Println("Leader election time is ", take, "with election timeout", ELECTIONTIMTOUT)
+		fmt.Println("Leader election time average is", avgTime, "with election timeout", ELECTIONTIMTOUT)
+		etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr)
+	}
+
+}

+ 1 - 78
etcd_test.go

@@ -5,7 +5,7 @@ import (
 	"github.com/coreos/go-etcd/etcd"
 	"github.com/coreos/go-etcd/etcd"
 	"math/rand"
 	"math/rand"
 	"os"
 	"os"
-	"strconv"
+	//"strconv"
 	"testing"
 	"testing"
 	"time"
 	"time"
 )
 )
@@ -191,80 +191,3 @@ func TestMultiNodeRecovery(t *testing.T) {
 	stop <- true
 	stop <- true
 	<-stop
 	<-stop
 }
 }
-
-// Sending set commands
-func set(stop chan bool) {
-
-	stopSet := false
-	i := 0
-
-	for {
-		key := fmt.Sprintf("%s_%v", "foo", i)
-
-		result, err := etcd.Set(key, "bar", 0)
-
-		if err != nil || result.Key != "/"+key || result.Value != "bar" {
-			select {
-			case <-stop:
-				stopSet = true
-
-			default:
-				fmt.Println("Set failed!")
-				return
-			}
-		}
-
-		select {
-		case <-stop:
-			stopSet = true
-
-		default:
-		}
-
-		if stopSet {
-			break
-		}
-
-		i++
-	}
-	fmt.Println("set stop")
-	stop <- true
-}
-
-// Create a cluster of etcd nodes
-func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process, error) {
-	argGroup := make([][]string, size)
-	for i := 0; i < size; i++ {
-		if i == 0 {
-			argGroup[i] = []string{"etcd", "-d=/tmp/node1"}
-		} else {
-			strI := strconv.Itoa(i + 1)
-			argGroup[i] = []string{"etcd", "-c=400" + strI, "-s=700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"}
-		}
-	}
-
-	etcds := make([]*os.Process, size)
-
-	for i, _ := range etcds {
-		var err error
-		etcds[i], err = os.StartProcess("etcd", append(argGroup[i], "-i"), procAttr)
-		if err != nil {
-			return nil, nil, err
-		}
-	}
-
-	return argGroup, etcds, nil
-}
-
-// Destroy all the nodes in the cluster
-func destroyCluster(etcds []*os.Process) error {
-	for i, etcd := range etcds {
-		err := etcd.Kill()
-		fmt.Println("kill ", i)
-		if err != nil {
-			panic(err.Error())
-		}
-		etcd.Release()
-	}
-	return nil
-}

+ 164 - 0
test.go

@@ -0,0 +1,164 @@
+package main
+
+import (
+	"fmt"
+	"github.com/coreos/go-etcd/etcd"
+	"io/ioutil"
+	"net"
+	"net/http"
+	"os"
+	"strconv"
+	"time"
+)
+
+var client = http.Client{Transport: &http.Transport{
+	Dial: dialTimeoutFast,
+},
+}
+
+// Sending set commands
+func set(stop chan bool) {
+
+	stopSet := false
+	i := 0
+
+	for {
+		key := fmt.Sprintf("%s_%v", "foo", i)
+
+		result, err := etcd.Set(key, "bar", 0)
+
+		if err != nil || result.Key != "/"+key || result.Value != "bar" {
+			select {
+			case <-stop:
+				stopSet = true
+
+			default:
+				fmt.Println("Set failed!")
+				return
+			}
+		}
+
+		select {
+		case <-stop:
+			stopSet = true
+
+		default:
+		}
+
+		if stopSet {
+			break
+		}
+
+		i++
+	}
+	fmt.Println("set stop")
+	stop <- true
+}
+
+// Create a cluster of etcd nodes
+func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process, error) {
+	argGroup := make([][]string, size)
+	for i := 0; i < size; i++ {
+		if i == 0 {
+			argGroup[i] = []string{"etcd", "-d=/tmp/node1"}
+		} else {
+			strI := strconv.Itoa(i + 1)
+			argGroup[i] = []string{"etcd", "-c=400" + strI, "-s=700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"}
+		}
+	}
+
+	etcds := make([]*os.Process, size)
+
+	for i, _ := range etcds {
+		var err error
+		etcds[i], err = os.StartProcess("etcd", append(argGroup[i], "-i"), procAttr)
+		if err != nil {
+			return nil, nil, err
+		}
+	}
+
+	return argGroup, etcds, nil
+}
+
+// Destroy all the nodes in the cluster
+func destroyCluster(etcds []*os.Process) error {
+	for i, etcd := range etcds {
+		err := etcd.Kill()
+		fmt.Println("kill ", i)
+		if err != nil {
+			panic(err.Error())
+		}
+		etcd.Release()
+	}
+	return nil
+}
+
+//
+func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
+	leaderMap := make(map[int]string)
+	baseAddrFormat := "http://0.0.0.0:400%d/leader"
+	for {
+		knownLeader := "unknown"
+		dead := 0
+		var i int
+		for i = 0; i < size; i++ {
+			leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+1))
+			if err == nil {
+				leaderMap[i] = leader
+
+				if knownLeader == "unknown" {
+					knownLeader = leader
+				} else {
+					if leader != knownLeader {
+						break
+					}
+				}
+			} else {
+				dead++
+				if dead > allowDeadNum {
+					break
+				}
+			}
+		}
+		if i == size {
+			select {
+			case <-leaderChan:
+				leaderChan <- knownLeader
+			default:
+				leaderChan <- knownLeader
+			}
+
+		}
+		time.Sleep(time.Millisecond * 10)
+	}
+}
+
+func getLeader(addr string) (string, error) {
+
+	resp, err := client.Get(addr)
+
+	if err != nil {
+		return "", err
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		resp.Body.Close()
+		return "", fmt.Errorf("no leader")
+	}
+
+	b, err := ioutil.ReadAll(resp.Body)
+
+	resp.Body.Close()
+
+	if err != nil {
+		return "", err
+	}
+
+	return string(b), nil
+
+}
+
+// Dial with timeout
+func dialTimeoutFast(network, addr string) (net.Conn, error) {
+	return net.DialTimeout(network, addr, time.Millisecond*10)
+}