Browse Source

Merge pull request #38 from xiangli-cmu/master

More tests
Xiang Li 12 years ago
parent
commit
9cbe9c52f9
5 changed files with 293 additions and 74 deletions
  1. 6 1
      command.go
  2. 62 0
      etcd_long_test.go
  3. 60 73
      etcd_test.go
  4. 1 0
      store/keywords.go
  5. 164 0
      test.go

+ 6 - 1
command.go

@@ -2,9 +2,10 @@ package main
 
 
 import (
 import (
 	"encoding/json"
 	"encoding/json"
-	//"errors"
+	"fmt"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/go-raft"
 	"github.com/coreos/go-raft"
+	"path"
 	"time"
 	"time"
 )
 )
 
 
@@ -120,6 +121,10 @@ func (c *JoinCommand) CommandName() string {
 func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 	err := raftServer.AddPeer(c.Name)
 	err := raftServer.AddPeer(c.Name)
 	addMachine(c.Name, c.Hostname, c.RaftPort, c.ClientPort)
 	addMachine(c.Name, c.Hostname, c.RaftPort, c.ClientPort)
+	nodeName := fmt.Sprintf("%s%d", "node", raftServer.CommitIndex())
+	key := path.Join("machines", nodeName)
+	value := fmt.Sprintf("%s,%d,%d", c.Hostname, c.RaftPort, c.ClientPort)
+	etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
 	return []byte("join success"), err
 	return []byte("join success"), err
 }
 }
 
 

+ 62 - 0
etcd_long_test.go

@@ -0,0 +1,62 @@
+package main
+
+import (
+	"fmt"
+	"os"
+	"strconv"
+	"strings"
+	"testing"
+	"time"
+)
+
+// This test will kill the current leader and wait for the etcd cluster to elect a new leader for 200 times.
+// It will print out the election time and the average election time.
+func TestKillLeader(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 5
+	argGroup, etcds, err := createCluster(clusterSize, procAttr)
+
+	if err != nil {
+		t.Fatal("cannot create cluster")
+	}
+
+	defer destroyCluster(etcds)
+
+	leaderChan := make(chan string, 1)
+
+	time.Sleep(time.Second)
+
+	go leaderMonitor(clusterSize, 1, leaderChan)
+
+	var totalTime time.Duration
+
+	leader := "0.0.0.0:7001"
+
+	for i := 0; i < 200; i++ {
+		port, _ := strconv.Atoi(strings.Split(leader, ":")[1])
+		num := port - 7001
+		fmt.Println("kill server ", num)
+		etcds[num].Kill()
+		etcds[num].Release()
+
+		start := time.Now()
+		for {
+			newLeader := <-leaderChan
+			if newLeader != leader {
+				leader = newLeader
+				break
+			}
+		}
+		take := time.Now().Sub(start)
+
+		totalTime += take
+		avgTime := totalTime / (time.Duration)(i+1)
+
+		fmt.Println("Leader election time is ", take, "with election timeout", ELECTIONTIMTOUT)
+		fmt.Println("Leader election time average is", avgTime, "with election timeout", ELECTIONTIMTOUT)
+		etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr)
+	}
+
+}

+ 60 - 73
etcd_test.go

@@ -5,7 +5,7 @@ import (
 	"github.com/coreos/go-etcd/etcd"
 	"github.com/coreos/go-etcd/etcd"
 	"math/rand"
 	"math/rand"
 	"os"
 	"os"
-	"strconv"
+	//"strconv"
 	"testing"
 	"testing"
 	"time"
 	"time"
 )
 )
@@ -14,7 +14,7 @@ import (
 func TestSingleNode(t *testing.T) {
 func TestSingleNode(t *testing.T) {
 	procAttr := new(os.ProcAttr)
 	procAttr := new(os.ProcAttr)
 	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
 	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
-	args := []string{"etcd", "-i", "-v", "-d=/tmp/node1"}
+	args := []string{"etcd", "-i", "-d=/tmp/node1"}
 
 
 	process, err := os.StartProcess("etcd", args, procAttr)
 	process, err := os.StartProcess("etcd", args, procAttr)
 	if err != nil {
 	if err != nil {
@@ -49,6 +49,62 @@ func TestSingleNode(t *testing.T) {
 	}
 	}
 }
 }
 
 
+// This test creates a single node and then set a value to it.
+// Then this test kills the node and restart it and tries to get the value again.
+func TestSingleNodeRecovery(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+	args := []string{"etcd", "-d=/tmp/node1"}
+
+	process, err := os.StartProcess("etcd", append(args, "-i"), procAttr)
+	if err != nil {
+		t.Fatal("start process failed:" + err.Error())
+		return
+	}
+
+	time.Sleep(time.Second)
+
+	etcd.SyncCluster()
+	// Test Set
+	result, err := etcd.Set("foo", "bar", 100)
+
+	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL != 99 {
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		t.Fatalf("Set 1 failed with %s %s %v", result.Key, result.Value, result.TTL)
+	}
+
+	time.Sleep(time.Second)
+
+	process.Kill()
+
+	process, err = os.StartProcess("etcd", args, procAttr)
+	defer process.Kill()
+	if err != nil {
+		t.Fatal("start process failed:" + err.Error())
+		return
+	}
+
+	time.Sleep(time.Second)
+
+	results, err := etcd.Get("foo")
+	if err != nil {
+		t.Fatal("get fail: " + err.Error())
+		return
+	}
+
+	result = results[0]
+
+	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL > 99 {
+		if err != nil {
+			t.Fatal(err)
+		}
+		t.Fatalf("Recovery Get failed with %s %s %v", result.Key, result.Value, result.TTL)
+	}
+}
+
 // Create a three nodes and try to set value
 // Create a three nodes and try to set value
 func TestSimpleMultiNode(t *testing.T) {
 func TestSimpleMultiNode(t *testing.T) {
 	procAttr := new(os.ProcAttr)
 	procAttr := new(os.ProcAttr)
@@ -113,7 +169,7 @@ func TestMultiNodeRecovery(t *testing.T) {
 
 
 	stop := make(chan bool)
 	stop := make(chan bool)
 	// Test Set
 	// Test Set
-	go set(t, stop)
+	go set(stop)
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
 		num := rand.Int() % clusterSize
 		num := rand.Int() % clusterSize
@@ -131,76 +187,7 @@ func TestMultiNodeRecovery(t *testing.T) {
 		}
 		}
 		time.Sleep(time.Second)
 		time.Sleep(time.Second)
 	}
 	}
-
+	fmt.Println("stop")
 	stop <- true
 	stop <- true
 	<-stop
 	<-stop
 }
 }
-
-// Sending set commands
-func set(t *testing.T, stop chan bool) {
-
-	stopSet := false
-	i := 0
-
-	for {
-		key := fmt.Sprintf("%s_%v", "foo", i)
-
-		result, err := etcd.Set(key, "bar", 0)
-
-		if err != nil || result.Key != "/"+key || result.Value != "bar" {
-			if err != nil {
-				t.Fatal(err)
-			}
-			t.Fatalf("Set failed with %s %s %v", result.Key, result.Value)
-		}
-
-		select {
-		case <-stop:
-			stopSet = true
-
-		default:
-		}
-
-		if stopSet {
-			break
-		}
-
-		i++
-	}
-
-	stop <- true
-}
-
-// Create a cluster of etcd nodes
-func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process, error) {
-	argGroup := make([][]string, size)
-	for i := 0; i < size; i++ {
-		if i == 0 {
-			argGroup[i] = []string{"etcd", "-d=/tmp/node1"}
-		} else {
-			strI := strconv.Itoa(i + 1)
-			argGroup[i] = []string{"etcd", "-c=400" + strI, "-s=700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"}
-		}
-	}
-
-	etcds := make([]*os.Process, size)
-
-	for i, _ := range etcds {
-		var err error
-		etcds[i], err = os.StartProcess("etcd", append(argGroup[i], "-i"), procAttr)
-		if err != nil {
-			return nil, nil, err
-		}
-	}
-
-	return argGroup, etcds, nil
-}
-
-// Destroy all the nodes in the cluster
-func destroyCluster(etcds []*os.Process) error {
-	for _, etcd := range etcds {
-		etcd.Kill()
-		etcd.Release()
-	}
-	return nil
-}

+ 1 - 0
store/keywords.go

@@ -4,4 +4,5 @@ package store
 var keywords = map[string]bool{
 var keywords = map[string]bool{
 	"/acoounts":       true,
 	"/acoounts":       true,
 	"/ephemeralNodes": true,
 	"/ephemeralNodes": true,
+	"/machines":	   true,
 }
 }

+ 164 - 0
test.go

@@ -0,0 +1,164 @@
+package main
+
+import (
+	"fmt"
+	"github.com/coreos/go-etcd/etcd"
+	"io/ioutil"
+	"net"
+	"net/http"
+	"os"
+	"strconv"
+	"time"
+)
+
+var client = http.Client{Transport: &http.Transport{
+	Dial: dialTimeoutFast,
+},
+}
+
+// Sending set commands
+func set(stop chan bool) {
+
+	stopSet := false
+	i := 0
+
+	for {
+		key := fmt.Sprintf("%s_%v", "foo", i)
+
+		result, err := etcd.Set(key, "bar", 0)
+
+		if err != nil || result.Key != "/"+key || result.Value != "bar" {
+			select {
+			case <-stop:
+				stopSet = true
+
+			default:
+				fmt.Println("Set failed!")
+				return
+			}
+		}
+
+		select {
+		case <-stop:
+			stopSet = true
+
+		default:
+		}
+
+		if stopSet {
+			break
+		}
+
+		i++
+	}
+	fmt.Println("set stop")
+	stop <- true
+}
+
+// Create a cluster of etcd nodes
+func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process, error) {
+	argGroup := make([][]string, size)
+	for i := 0; i < size; i++ {
+		if i == 0 {
+			argGroup[i] = []string{"etcd", "-d=/tmp/node1"}
+		} else {
+			strI := strconv.Itoa(i + 1)
+			argGroup[i] = []string{"etcd", "-c=400" + strI, "-s=700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"}
+		}
+	}
+
+	etcds := make([]*os.Process, size)
+
+	for i, _ := range etcds {
+		var err error
+		etcds[i], err = os.StartProcess("etcd", append(argGroup[i], "-i"), procAttr)
+		if err != nil {
+			return nil, nil, err
+		}
+	}
+
+	return argGroup, etcds, nil
+}
+
+// Destroy all the nodes in the cluster
+func destroyCluster(etcds []*os.Process) error {
+	for i, etcd := range etcds {
+		err := etcd.Kill()
+		fmt.Println("kill ", i)
+		if err != nil {
+			panic(err.Error())
+		}
+		etcd.Release()
+	}
+	return nil
+}
+
+//
+func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
+	leaderMap := make(map[int]string)
+	baseAddrFormat := "http://0.0.0.0:400%d/leader"
+	for {
+		knownLeader := "unknown"
+		dead := 0
+		var i int
+		for i = 0; i < size; i++ {
+			leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+1))
+			if err == nil {
+				leaderMap[i] = leader
+
+				if knownLeader == "unknown" {
+					knownLeader = leader
+				} else {
+					if leader != knownLeader {
+						break
+					}
+				}
+			} else {
+				dead++
+				if dead > allowDeadNum {
+					break
+				}
+			}
+		}
+		if i == size {
+			select {
+			case <-leaderChan:
+				leaderChan <- knownLeader
+			default:
+				leaderChan <- knownLeader
+			}
+
+		}
+		time.Sleep(time.Millisecond * 10)
+	}
+}
+
+func getLeader(addr string) (string, error) {
+
+	resp, err := client.Get(addr)
+
+	if err != nil {
+		return "", err
+	}
+
+	if resp.StatusCode != http.StatusOK {
+		resp.Body.Close()
+		return "", fmt.Errorf("no leader")
+	}
+
+	b, err := ioutil.ReadAll(resp.Body)
+
+	resp.Body.Close()
+
+	if err != nil {
+		return "", err
+	}
+
+	return string(b), nil
+
+}
+
+// Dial with timeout
+func dialTimeoutFast(network, addr string) (net.Conn, error) {
+	return net.DialTimeout(network, addr, time.Millisecond*10)
+}