Browse Source

Merge pull request #124 from philips/xiangli-cleanup

Cleanups from #112
Xiang Li 12 years ago
parent
commit
ec6a7be63a
15 changed files with 466 additions and 379 deletions
  1. 1 1
      .travis.yml
  2. 8 0
      command.go
  3. 32 31
      config.go
  4. 27 70
      etcd.go
  5. 20 29
      etcd_handlers.go
  6. 0 150
      etcd_long_test.go
  7. 48 0
      etcd_server.go
  8. 160 9
      etcd_test.go
  9. 34 0
      machines.go
  10. 13 13
      raft_handlers.go
  11. 47 44
      raft_server.go
  12. 1 1
      snapshot.go
  13. 0 0
      test.sh
  14. 17 31
      test/test.go
  15. 58 0
      util.go

+ 1 - 1
.travis.yml

@@ -5,4 +5,4 @@ install:
  - echo "Skip install"
 
 script:
- - ./test
+ - ./test.sh

+ 8 - 0
command.go

@@ -121,6 +121,14 @@ type JoinCommand struct {
 	EtcdURL string `json:"etcdURL"`
 }
 
+func newJoinCommand() *JoinCommand {
+	return &JoinCommand{
+		Name:    r.name,
+		RaftURL: r.url,
+		EtcdURL: e.url,
+	}
+}
+
 // The name of the join command in the log
 func (c *JoinCommand) CommandName() string {
 	return commandName("join")

+ 32 - 31
config.go

@@ -14,39 +14,14 @@ import (
 // Config
 //--------------------------------------
 
-func parseInfo(path string) *Info {
-	file, err := os.Open(path)
-
-	if err != nil {
-		return nil
-	}
-	defer file.Close()
-
-	info := &Info{}
-
-	content, err := ioutil.ReadAll(file)
-	if err != nil {
-		fatalf("Unable to read info: %v", err)
-		return nil
-	}
-
-	if err = json.Unmarshal(content, &info); err != nil {
-		fatalf("Unable to parse info: %v", err)
-		return nil
-	}
-
-	return info
-}
-
 // Get the server info from previous conf file
 // or from the user
 func getInfo(path string) *Info {
 
-	// Read in the server info if available.
 	infoPath := filepath.Join(path, "info")
 
-	// Delete the old configuration if exist
 	if force {
+		// Delete the old configuration if exist
 		logPath := filepath.Join(path, "log")
 		confPath := filepath.Join(path, "conf")
 		snapshotPath := filepath.Join(path, "snapshot")
@@ -54,15 +29,13 @@ func getInfo(path string) *Info {
 		os.Remove(logPath)
 		os.Remove(confPath)
 		os.RemoveAll(snapshotPath)
-	}
-
-	info := parseInfo(infoPath)
-	if info != nil {
+	} else if info := readInfo(infoPath); info != nil {
 		infof("Found node configuration in '%s'. Ignoring flags", infoPath)
 		return info
 	}
 
-	info = &argInfo
+	// Read info from command line
+	info := &argInfo
 
 	// Write to file.
 	content, _ := json.MarshalIndent(info, "", " ")
@@ -76,6 +49,34 @@ func getInfo(path string) *Info {
 	return info
 }
 
+// readInfo reads from info file and decode to Info struct
+func readInfo(path string) *Info {
+	file, err := os.Open(path)
+
+	if err != nil {
+		if err == os.ErrNotExist {
+			return nil
+		}
+		fatal(err)
+	}
+	defer file.Close()
+
+	info := &Info{}
+
+	content, err := ioutil.ReadAll(file)
+	if err != nil {
+		fatalf("Unable to read info: %v", err)
+		return nil
+	}
+
+	if err = json.Unmarshal(content, &info); err != nil {
+		fatalf("Unable to parse info: %v", err)
+		return nil
+	}
+
+	return info
+}
+
 func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) {
 	var keyFile, certFile, CAFile string
 	var tlsCert tls.Certificate

+ 27 - 70
etcd.go

@@ -4,13 +4,9 @@ import (
 	"crypto/tls"
 	"flag"
 	"github.com/coreos/etcd/store"
-	"github.com/coreos/etcd/web"
+	"github.com/coreos/go-raft"
 	"io/ioutil"
-	"net/http"
-	"net/url"
 	"os"
-	"os/signal"
-	"runtime/pprof"
 	"strings"
 	"time"
 )
@@ -21,28 +17,30 @@ import (
 //
 //------------------------------------------------------------------------------
 
-var verbose bool
-var veryVerbose bool
+var (
+	verbose     bool
+	veryVerbose bool
 
-var machines string
-var machinesFile string
+	machines     string
+	machinesFile string
 
-var cluster []string
+	cluster []string
 
-var argInfo Info
-var dirPath string
+	argInfo Info
+	dirPath string
 
-var force bool
+	force bool
 
-var maxSize int
+	maxSize int
 
-var snapshot bool
+	snapshot bool
 
-var retryTimes int
+	retryTimes int
 
-var maxClusterSize int
+	maxClusterSize int
 
-var cpuprofile string
+	cpuprofile string
+)
 
 func init() {
 	flag.BoolVar(&verbose, "v", false, "verbose logging")
@@ -126,7 +124,6 @@ type TLSConfig struct {
 //------------------------------------------------------------------------------
 
 var etcdStore *store.Store
-var info *Info
 
 //------------------------------------------------------------------------------
 //
@@ -142,27 +139,12 @@ func main() {
 	flag.Parse()
 
 	if cpuprofile != "" {
-		f, err := os.Create(cpuprofile)
-		if err != nil {
-			fatal(err)
-		}
-		pprof.StartCPUProfile(f)
-		defer pprof.StopCPUProfile()
-
-		c := make(chan os.Signal, 1)
-		signal.Notify(c, os.Interrupt)
-		go func() {
-			for sig := range c {
-				infof("captured %v, stopping profiler and exiting..", sig)
-				pprof.StopCPUProfile()
-				os.Exit(1)
-			}
-		}()
-
+		runCPUProfile()
 	}
 
 	if veryVerbose {
 		verbose = true
+		raft.SetLogLevel(raft.Debug)
 	}
 
 	if machines != "" {
@@ -175,6 +157,7 @@ func main() {
 		cluster = strings.Split(string(b), ",")
 	}
 
+	// Check TLS arguments
 	raftTLSConfig, ok := tlsConfigFromInfo(argInfo.RaftTLS)
 	if !ok {
 		fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
@@ -190,54 +173,28 @@ func main() {
 		fatal("ERROR: server name required. e.g. '-n=server_name'")
 	}
 
+	// Check host name arguments
 	argInfo.RaftURL = sanitizeURL(argInfo.RaftURL, raftTLSConfig.Scheme)
 	argInfo.EtcdURL = sanitizeURL(argInfo.EtcdURL, etcdTLSConfig.Scheme)
 	argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http")
 
-	// Setup commands.
-	registerCommands()
-
 	// Read server info from file or grab it from user.
 	if err := os.MkdirAll(dirPath, 0744); err != nil {
 		fatalf("Unable to create path: %s", err)
 	}
 
-	info = getInfo(dirPath)
+	info := getInfo(dirPath)
 
 	// Create etcd key-value store
 	etcdStore = store.CreateStore(maxSize)
 	snapConf = newSnapshotConf()
 
-	startRaft(raftTLSConfig)
-
-	if argInfo.WebURL != "" {
-		// start web
-		argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http")
-		go webHelper()
-		go web.Start(raftServer, argInfo.WebURL)
-	}
-
-	startEtcdTransport(*info, etcdTLSConfig.Scheme, etcdTLSConfig.Server)
-
-}
+	// Create etcd and raft server
+	e = newEtcdServer(info.Name, info.EtcdURL, &etcdTLSConfig, &info.EtcdTLS)
+	r = newRaftServer(info.Name, info.RaftURL, &raftTLSConfig, &info.RaftTLS)
 
-// Start to listen and response client command
-func startEtcdTransport(info Info, scheme string, tlsConf tls.Config) {
-	u, err := url.Parse(info.EtcdURL)
-	if err != nil {
-		fatalf("invalid url '%s': %s", info.EtcdURL, err)
-	}
-	infof("etcd server [%s:%s]", info.Name, u)
+	startWebInterface()
+	r.ListenAndServe()
+	e.ListenAndServe()
 
-	server := http.Server{
-		Handler:   NewEtcdMuxer(),
-		TLSConfig: &tlsConf,
-		Addr:      u.Host,
-	}
-
-	if scheme == "http" {
-		fatal(server.ListenAndServe())
-	} else {
-		fatal(server.ListenAndServeTLS(info.EtcdTLS.CertFile, info.EtcdTLS.KeyFile))
-	}
 }

+ 20 - 29
etcd_handlers.go

@@ -6,6 +6,7 @@ import (
 	"github.com/coreos/go-raft"
 	"net/http"
 	"strconv"
+	"strings"
 )
 
 //-------------------------------------------------------------------
@@ -19,7 +20,7 @@ func NewEtcdMuxer() *http.ServeMux {
 	etcdMux.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
 	etcdMux.HandleFunc("/leader", LeaderHttpHandler)
 	etcdMux.HandleFunc("/machines", MachinesHttpHandler)
-	etcdMux.HandleFunc("/", VersionHttpHandler)
+	etcdMux.HandleFunc("/version", VersionHttpHandler)
 	etcdMux.HandleFunc("/stats", StatsHttpHandler)
 	etcdMux.HandleFunc("/test/", TestHttpHandler)
 	return etcdMux
@@ -58,7 +59,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	debugf("[recv] POST %v/v1/keys/%s [%s]", info.EtcdURL, key, req.RemoteAddr)
+	debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
 
 	value := req.FormValue("value")
 
@@ -109,7 +110,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]
 
-	debugf("[recv] DELETE %v/v1/keys/%s [%s]", info.EtcdURL, key, req.RemoteAddr)
+	debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
 
 	command := &DeleteCommand{
 		Key: key,
@@ -120,8 +121,9 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 
 // Dispatch the command to leader
 func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) {
-	if raftServer.State() == raft.Leader {
-		if body, err := raftServer.Do(c); err != nil {
+
+	if r.State() == raft.Leader {
+		if body, err := r.Do(c); err != nil {
 
 			if _, ok := err.(store.NotFoundError); ok {
 				(*w).WriteHeader(http.StatusNotFound)
@@ -165,7 +167,7 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) {
 			return
 		}
 	} else {
-		leader := raftServer.Leader()
+		leader := r.Leader()
 		// current no leader
 		if leader == "" {
 			(*w).WriteHeader(http.StatusInternalServerError)
@@ -181,6 +183,9 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) {
 
 		if etcd {
 			etcdAddr, _ := nameToEtcdURL(leader)
+			if etcdAddr == "" {
+				panic(leader)
+			}
 			url = etcdAddr + path
 		} else {
 			raftAddr, _ := nameToRaftURL(leader)
@@ -206,7 +211,7 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) {
 
 // Handler to return the current leader's raft address
 func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
-	leader := raftServer.Leader()
+	leader := r.Leader()
 
 	if leader != "" {
 		w.WriteHeader(http.StatusOK)
@@ -222,31 +227,17 @@ func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 // Handler to return all the known machines in the current cluster
 func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
-	peers := raftServer.Peers()
-
-	// Add itself to the machine list first
-	// Since peer map does not contain the server itself
-	machines := info.EtcdURL
-
-	// Add all peers to the list and separate by comma
-	// We do not use json here since we accept machines list
-	// in the command line separate by comma.
-
-	for peerName, _ := range peers {
-		if addr, ok := nameToEtcdURL(peerName); ok {
-			machines = machines + "," + addr
-		}
-	}
+	machines := getMachines()
 
 	w.WriteHeader(http.StatusOK)
-	w.Write([]byte(machines))
-
+	w.Write([]byte(strings.Join(machines, ", ")))
 }
 
 // Handler to return the current version of etcd
 func VersionHttpHandler(w http.ResponseWriter, req *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte(fmt.Sprintf("etcd %s", releaseVersion)))
+	w.Write([]byte(fmt.Sprintf("etcd API %s", version)))
 }
 
 // Handler to return the basic stats of etcd
@@ -259,13 +250,13 @@ func StatsHttpHandler(w http.ResponseWriter, req *http.Request) {
 func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]
 
-	debugf("[recv] GET %s/v1/keys/%s [%s]", info.EtcdURL, key, req.RemoteAddr)
+	debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
 
 	command := &GetCommand{
 		Key: key,
 	}
 
-	if body, err := command.Apply(raftServer); err != nil {
+	if body, err := command.Apply(r.Server); err != nil {
 
 		if _, ok := err.(store.NotFoundError); ok {
 			(*w).WriteHeader(http.StatusNotFound)
@@ -298,13 +289,13 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 	}
 
 	if req.Method == "GET" {
-		debugf("[recv] GET %s/watch/%s [%s]", info.EtcdURL, key, req.RemoteAddr)
+		debugf("[recv] GET %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
 		command.SinceIndex = 0
 
 	} else if req.Method == "POST" {
 		// watch from a specific index
 
-		debugf("[recv] POST %s/watch/%s [%s]", info.EtcdURL, key, req.RemoteAddr)
+		debugf("[recv] POST %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
 		content := req.FormValue("index")
 
 		sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
@@ -319,7 +310,7 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	if body, err := command.Apply(raftServer); err != nil {
+	if body, err := command.Apply(r.Server); err != nil {
 		w.WriteHeader(http.StatusInternalServerError)
 		w.Write(newJsonError(500, key))
 	} else {

+ 0 - 150
etcd_long_test.go

@@ -1,150 +0,0 @@
-package main
-
-import (
-	"fmt"
-	"math/rand"
-	"net/http"
-	"os"
-	"strconv"
-	"strings"
-	"testing"
-	"time"
-)
-
-// This test will kill the current leader and wait for the etcd cluster to elect a new leader for 200 times.
-// It will print out the election time and the average election time.
-func TestKillLeader(t *testing.T) {
-	procAttr := new(os.ProcAttr)
-	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
-
-	clusterSize := 5
-	argGroup, etcds, err := createCluster(clusterSize, procAttr, false)
-
-	if err != nil {
-		t.Fatal("cannot create cluster")
-	}
-
-	defer destroyCluster(etcds)
-
-	leaderChan := make(chan string, 1)
-
-	time.Sleep(time.Second)
-
-	go leaderMonitor(clusterSize, 1, leaderChan)
-
-	var totalTime time.Duration
-
-	leader := "http://127.0.0.1:7001"
-
-	for i := 0; i < clusterSize; i++ {
-		fmt.Println("leader is ", leader)
-		port, _ := strconv.Atoi(strings.Split(leader, ":")[2])
-		num := port - 7001
-		fmt.Println("kill server ", num)
-		etcds[num].Kill()
-		etcds[num].Release()
-
-		start := time.Now()
-		for {
-			newLeader := <-leaderChan
-			if newLeader != leader {
-				leader = newLeader
-				break
-			}
-		}
-		take := time.Now().Sub(start)
-
-		totalTime += take
-		avgTime := totalTime / (time.Duration)(i+1)
-
-		fmt.Println("Leader election time is ", take, "with election timeout", ElectionTimeout)
-		fmt.Println("Leader election time average is", avgTime, "with election timeout", ElectionTimeout)
-		etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr)
-	}
-}
-
-// TestKillRandom kills random machines in the cluster and
-// restart them after all other machines agree on the same leader
-func TestKillRandom(t *testing.T) {
-	procAttr := new(os.ProcAttr)
-	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
-
-	clusterSize := 9
-	argGroup, etcds, err := createCluster(clusterSize, procAttr, false)
-
-	if err != nil {
-		t.Fatal("cannot create cluster")
-	}
-
-	defer destroyCluster(etcds)
-
-	leaderChan := make(chan string, 1)
-
-	time.Sleep(3 * time.Second)
-
-	go leaderMonitor(clusterSize, 4, leaderChan)
-
-	toKill := make(map[int]bool)
-
-	for i := 0; i < 20; i++ {
-		fmt.Printf("TestKillRandom Round[%d/20]\n", i)
-
-		j := 0
-		for {
-
-			r := rand.Int31n(9)
-			if _, ok := toKill[int(r)]; !ok {
-				j++
-				toKill[int(r)] = true
-			}
-
-			if j > 3 {
-				break
-			}
-
-		}
-
-		for num, _ := range toKill {
-			etcds[num].Kill()
-			etcds[num].Release()
-		}
-
-		<-leaderChan
-
-		for num, _ := range toKill {
-			etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr)
-		}
-
-		toKill = make(map[int]bool)
-	}
-
-	<-leaderChan
-
-}
-
-func templateBenchmarkEtcdDirectCall(b *testing.B, tls bool) {
-	procAttr := new(os.ProcAttr)
-	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
-
-	clusterSize := 3
-	_, etcds, _ := createCluster(clusterSize, procAttr, tls)
-
-	defer destroyCluster(etcds)
-
-	time.Sleep(time.Second)
-
-	b.ResetTimer()
-	for i := 0; i < b.N; i++ {
-		resp, _ := http.Get("http://127.0.0.1:4001/test/speed")
-		resp.Body.Close()
-	}
-
-}
-
-func BenchmarkEtcdDirectCall(b *testing.B) {
-	templateBenchmarkEtcdDirectCall(b, false)
-}
-
-func BenchmarkEtcdDirectCallTls(b *testing.B) {
-	templateBenchmarkEtcdDirectCall(b, true)
-}

+ 48 - 0
etcd_server.go

@@ -0,0 +1,48 @@
+package main
+
+import (
+	"net/http"
+	"net/url"
+)
+
+type etcdServer struct {
+	http.Server
+	name    string
+	url     string
+	tlsConf *TLSConfig
+	tlsInfo *TLSInfo
+}
+
+var e *etcdServer
+
+func newEtcdServer(name string, urlStr string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *etcdServer {
+	u, err := url.Parse(urlStr)
+
+	if err != nil {
+		fatalf("invalid url '%s': %s", e.url, err)
+	}
+
+	return &etcdServer{
+		Server: http.Server{
+			Handler:   NewEtcdMuxer(),
+			TLSConfig: &tlsConf.Server,
+			Addr:      u.Host,
+		},
+		name:    name,
+		url:     urlStr,
+		tlsConf: tlsConf,
+		tlsInfo: tlsInfo,
+	}
+}
+
+// Start to listen and response etcd client command
+func (e *etcdServer) ListenAndServe() {
+
+	infof("etcd server [%s:%s]", e.name, e.url)
+
+	if e.tlsConf.Scheme == "http" {
+		fatal(e.Server.ListenAndServe())
+	} else {
+		fatal(e.Server.ListenAndServeTLS(e.tlsInfo.CertFile, e.tlsInfo.KeyFile))
+	}
+}

+ 160 - 9
etcd_test.go

@@ -2,10 +2,13 @@ package main
 
 import (
 	"fmt"
+	"github.com/coreos/etcd/test"
 	"github.com/coreos/go-etcd/etcd"
 	"math/rand"
+	"net/http"
 	"os"
-	//"strconv"
+	"strconv"
+	"strings"
 	"testing"
 	"time"
 )
@@ -31,7 +34,7 @@ func TestSingleNode(t *testing.T) {
 	// Test Set
 	result, err := c.Set("foo", "bar", 100)
 
-	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL != 99 {
+	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL < 95 {
 		if err != nil {
 			t.Fatal(err)
 		}
@@ -72,7 +75,7 @@ func TestSingleNodeRecovery(t *testing.T) {
 	// Test Set
 	result, err := c.Set("foo", "bar", 100)
 
-	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL != 99 {
+	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL < 95 {
 		if err != nil {
 			t.Fatal(err)
 		}
@@ -116,13 +119,13 @@ func templateTestSimpleMultiNode(t *testing.T, tls bool) {
 
 	clusterSize := 3
 
-	_, etcds, err := createCluster(clusterSize, procAttr, tls)
+	_, etcds, err := test.CreateCluster(clusterSize, procAttr, tls)
 
 	if err != nil {
 		t.Fatal("cannot create cluster")
 	}
 
-	defer destroyCluster(etcds)
+	defer test.DestroyCluster(etcds)
 
 	time.Sleep(time.Second)
 
@@ -133,7 +136,7 @@ func templateTestSimpleMultiNode(t *testing.T, tls bool) {
 	// Test Set
 	result, err := c.Set("foo", "bar", 100)
 
-	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL != 99 {
+	if err != nil || result.Key != "/foo" || result.Value != "bar" || result.TTL < 95 {
 		if err != nil {
 			t.Fatal(err)
 		}
@@ -169,13 +172,13 @@ func TestMultiNodeRecovery(t *testing.T) {
 	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
 
 	clusterSize := 5
-	argGroup, etcds, err := createCluster(clusterSize, procAttr, false)
+	argGroup, etcds, err := test.CreateCluster(clusterSize, procAttr, false)
 
 	if err != nil {
 		t.Fatal("cannot create cluster")
 	}
 
-	defer destroyCluster(etcds)
+	defer test.DestroyCluster(etcds)
 
 	time.Sleep(2 * time.Second)
 
@@ -185,7 +188,7 @@ func TestMultiNodeRecovery(t *testing.T) {
 
 	stop := make(chan bool)
 	// Test Set
-	go set(stop)
+	go test.Set(stop)
 
 	for i := 0; i < 10; i++ {
 		num := rand.Int() % clusterSize
@@ -207,3 +210,151 @@ func TestMultiNodeRecovery(t *testing.T) {
 	stop <- true
 	<-stop
 }
+
+// This test will kill the current leader and wait for the etcd cluster to elect a new leader for 200 times.
+// It will print out the election time and the average election time.
+func TestKillLeader(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 5
+	argGroup, etcds, err := test.CreateCluster(clusterSize, procAttr, false)
+
+	if err != nil {
+		t.Fatal("cannot create cluster")
+	}
+
+	defer test.DestroyCluster(etcds)
+
+	stop := make(chan bool)
+	leaderChan := make(chan string, 1)
+	all := make(chan bool, 1)
+
+	time.Sleep(time.Second)
+
+	go test.Monitor(clusterSize, 1, leaderChan, all, stop)
+
+	var totalTime time.Duration
+
+	leader := "http://127.0.0.1:7001"
+
+	for i := 0; i < clusterSize; i++ {
+		fmt.Println("leader is ", leader)
+		port, _ := strconv.Atoi(strings.Split(leader, ":")[2])
+		num := port - 7001
+		fmt.Println("kill server ", num)
+		etcds[num].Kill()
+		etcds[num].Release()
+
+		start := time.Now()
+		for {
+			newLeader := <-leaderChan
+			if newLeader != leader {
+				leader = newLeader
+				break
+			}
+		}
+		take := time.Now().Sub(start)
+
+		totalTime += take
+		avgTime := totalTime / (time.Duration)(i+1)
+
+		fmt.Println("Leader election time is ", take, "with election timeout", ElectionTimeout)
+		fmt.Println("Leader election time average is", avgTime, "with election timeout", ElectionTimeout)
+		etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr)
+	}
+	stop <- true
+}
+
+// TestKillRandom kills random machines in the cluster and
+// restart them after all other machines agree on the same leader
+func TestKillRandom(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 9
+	argGroup, etcds, err := test.CreateCluster(clusterSize, procAttr, false)
+
+	if err != nil {
+		t.Fatal("cannot create cluster")
+	}
+
+	defer test.DestroyCluster(etcds)
+
+	stop := make(chan bool)
+	leaderChan := make(chan string, 1)
+	all := make(chan bool, 1)
+
+	time.Sleep(3 * time.Second)
+
+	go test.Monitor(clusterSize, 4, leaderChan, all, stop)
+
+	toKill := make(map[int]bool)
+
+	for i := 0; i < 20; i++ {
+		fmt.Printf("TestKillRandom Round[%d/20]\n", i)
+
+		j := 0
+		for {
+
+			r := rand.Int31n(9)
+			if _, ok := toKill[int(r)]; !ok {
+				j++
+				toKill[int(r)] = true
+			}
+
+			if j > 3 {
+				break
+			}
+
+		}
+
+		for num, _ := range toKill {
+			err := etcds[num].Kill()
+			if err != nil {
+				panic(err)
+			}
+			etcds[num].Wait()
+		}
+
+		time.Sleep(ElectionTimeout)
+
+		<-leaderChan
+
+		for num, _ := range toKill {
+			etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr)
+		}
+
+		toKill = make(map[int]bool)
+		<-all
+	}
+
+	stop <- true
+}
+
+func templateBenchmarkEtcdDirectCall(b *testing.B, tls bool) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 3
+	_, etcds, _ := test.CreateCluster(clusterSize, procAttr, tls)
+
+	defer test.DestroyCluster(etcds)
+
+	time.Sleep(time.Second)
+
+	b.ResetTimer()
+	for i := 0; i < b.N; i++ {
+		resp, _ := http.Get("http://127.0.0.1:4001/test/speed")
+		resp.Body.Close()
+	}
+
+}
+
+func BenchmarkEtcdDirectCall(b *testing.B) {
+	templateBenchmarkEtcdDirectCall(b, false)
+}
+
+func BenchmarkEtcdDirectCallTls(b *testing.B) {
+	templateBenchmarkEtcdDirectCall(b, true)
+}

+ 34 - 0
machines.go

@@ -6,3 +6,37 @@ func machineNum() int {
 
 	return len(response)
 }
+
+// getMachines gets the current machines in the cluster
+func getMachines() []string {
+
+	peers := r.Peers()
+
+	machines := make([]string, len(peers)+1)
+
+	leader, ok := nameToEtcdURL(r.Leader())
+	self := e.url
+	i := 1
+
+	if ok {
+		machines[0] = leader
+		if leader != self {
+			machines[1] = self
+			i = 2
+		}
+	} else {
+		machines[0] = self
+	}
+
+	// Add all peers to the slice
+	for peerName, _ := range peers {
+		if machine, ok := nameToEtcdURL(peerName); ok {
+			// do not add leader twice
+			if machine != leader {
+				machines[i] = machine
+				i++
+			}
+		}
+	}
+	return machines
+}

+ 13 - 13
raft_handlers.go

@@ -12,10 +12,10 @@ import (
 
 // Get all the current logs
 func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debugf("[recv] GET %s/log", info.RaftURL)
+	debugf("[recv] GET %s/log", r.url)
 	w.Header().Set("Content-Type", "application/json")
 	w.WriteHeader(http.StatusOK)
-	json.NewEncoder(w).Encode(raftServer.LogEntries())
+	json.NewEncoder(w).Encode(r.LogEntries())
 }
 
 // Response to vote request
@@ -23,8 +23,8 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
 	rvreq := &raft.RequestVoteRequest{}
 	err := decodeJsonRequest(req, rvreq)
 	if err == nil {
-		debugf("[recv] POST %s/vote [%s]", info.RaftURL, rvreq.CandidateName)
-		if resp := raftServer.RequestVote(rvreq); resp != nil {
+		debugf("[recv] POST %s/vote [%s]", r.url, rvreq.CandidateName)
+		if resp := r.RequestVote(rvreq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			return
@@ -40,8 +40,8 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
 	err := decodeJsonRequest(req, aereq)
 
 	if err == nil {
-		debugf("[recv] POST %s/log/append [%d]", info.RaftURL, len(aereq.Entries))
-		if resp := raftServer.AppendEntries(aereq); resp != nil {
+		debugf("[recv] POST %s/log/append [%d]", r.url, len(aereq.Entries))
+		if resp := r.AppendEntries(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			if !resp.Success {
@@ -59,8 +59,8 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
 	aereq := &raft.SnapshotRequest{}
 	err := decodeJsonRequest(req, aereq)
 	if err == nil {
-		debugf("[recv] POST %s/snapshot/ ", info.RaftURL)
-		if resp := raftServer.RequestSnapshot(aereq); resp != nil {
+		debugf("[recv] POST %s/snapshot/ ", r.url)
+		if resp := r.RequestSnapshot(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			return
@@ -75,8 +75,8 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
 	aereq := &raft.SnapshotRecoveryRequest{}
 	err := decodeJsonRequest(req, aereq)
 	if err == nil {
-		debugf("[recv] POST %s/snapshotRecovery/ ", info.RaftURL)
-		if resp := raftServer.SnapshotRecoveryRequest(aereq); resp != nil {
+		debugf("[recv] POST %s/snapshotRecovery/ ", r.url)
+		if resp := r.SnapshotRecoveryRequest(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			return
@@ -88,7 +88,7 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 // Get the port that listening for etcd connecting of the server
 func EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debugf("[recv] Get %s/etcdURL/ ", info.RaftURL)
+	debugf("[recv] Get %s/etcdURL/ ", r.url)
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte(argInfo.EtcdURL))
 }
@@ -109,7 +109,7 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 // Response to the name request
 func NameHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debugf("[recv] Get %s/name/ ", info.RaftURL)
+	debugf("[recv] Get %s/name/ ", r.url)
 	w.WriteHeader(http.StatusOK)
-	w.Write([]byte(raftServer.Name()))
+	w.Write([]byte(r.name))
 }

+ 47 - 44
raft_server.go

@@ -12,46 +12,58 @@ import (
 	"github.com/coreos/go-raft"
 )
 
-var raftTransporter transporter
-var raftServer *raft.Server
-
-// Start the raft server
-func startRaft(tlsConfig TLSConfig) {
-	if veryVerbose {
-		raft.SetLogLevel(raft.Debug)
-	}
+type raftServer struct {
+	*raft.Server
+	name    string
+	url     string
+	tlsConf *TLSConfig
+	tlsInfo *TLSInfo
+}
 
-	var err error
+var r *raftServer
 
-	raftName := info.Name
+func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer {
 
 	// Create transporter for raft
-	raftTransporter = newTransporter(tlsConfig.Scheme, tlsConfig.Client)
+	raftTransporter := newTransporter(tlsConf.Scheme, tlsConf.Client)
 
 	// Create raft server
-	raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
+	server, err := raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil)
 
-	if err != nil {
-		fatal(err)
+	check(err)
+
+	return &raftServer{
+		Server:  server,
+		name:    name,
+		url:     url,
+		tlsConf: tlsConf,
+		tlsInfo: tlsInfo,
 	}
+}
+
+// Start the raft server
+func (r *raftServer) ListenAndServe() {
+
+	// Setup commands.
+	registerCommands()
 
 	// LoadSnapshot
 	if snapshot {
-		err = raftServer.LoadSnapshot()
+		err := r.LoadSnapshot()
 
 		if err == nil {
-			debugf("%s finished load snapshot", raftServer.Name())
+			debugf("%s finished load snapshot", r.name)
 		} else {
 			debug(err)
 		}
 	}
 
-	raftServer.SetElectionTimeout(ElectionTimeout)
-	raftServer.SetHeartbeatTimeout(HeartbeatTimeout)
+	r.SetElectionTimeout(ElectionTimeout)
+	r.SetHeartbeatTimeout(HeartbeatTimeout)
 
-	raftServer.Start()
+	r.Start()
 
-	if raftServer.IsLogEmpty() {
+	if r.IsLogEmpty() {
 
 		// start as a leader in a new cluster
 		if len(cluster) == 0 {
@@ -60,23 +72,20 @@ func startRaft(tlsConfig TLSConfig) {
 
 			// leader need to join self as a peer
 			for {
-				command := &JoinCommand{
-					Name:    raftServer.Name(),
-					RaftURL: argInfo.RaftURL,
-					EtcdURL: argInfo.EtcdURL,
-				}
-				_, err := raftServer.Do(command)
+				_, err := r.Do(newJoinCommand())
 				if err == nil {
 					break
 				}
 			}
-			debugf("%s start as a leader", raftServer.Name())
+			debugf("%s start as a leader", r.name)
 
 			// start as a follower in a existing cluster
 		} else {
 
 			time.Sleep(time.Millisecond * 20)
 
+			var err error
+
 			for i := 0; i < retryTimes; i++ {
 
 				success := false
@@ -84,7 +93,7 @@ func startRaft(tlsConfig TLSConfig) {
 					if len(machine) == 0 {
 						continue
 					}
-					err = joinCluster(raftServer, machine, tlsConfig.Scheme)
+					err = joinCluster(r.Server, machine, r.tlsConf.Scheme)
 					if err != nil {
 						if err.Error() == errors[103] {
 							fatal(err)
@@ -106,12 +115,12 @@ func startRaft(tlsConfig TLSConfig) {
 			if err != nil {
 				fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
 			}
-			debugf("%s success join to the cluster", raftServer.Name())
+			debugf("%s success join to the cluster", r.name)
 		}
 
 	} else {
 		// rejoin the previous cluster
-		debugf("%s restart as a follower", raftServer.Name())
+		debugf("%s restart as a follower", r.name)
 	}
 
 	// open the snapshot
@@ -120,14 +129,14 @@ func startRaft(tlsConfig TLSConfig) {
 	}
 
 	// start to response to raft requests
-	go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server)
+	go r.startTransport(r.tlsConf.Scheme, r.tlsConf.Server)
 
 }
 
 // Start to listen and response raft command
-func startRaftTransport(info Info, scheme string, tlsConf tls.Config) {
-	u, _ := url.Parse(info.RaftURL)
-	infof("raft server [%s:%s]", info.Name, u)
+func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {
+	u, _ := url.Parse(r.url)
+	infof("raft server [%s:%s]", r.name, u)
 
 	raftMux := http.NewServeMux()
 
@@ -150,7 +159,7 @@ func startRaftTransport(info Info, scheme string, tlsConf tls.Config) {
 	if scheme == "http" {
 		fatal(server.ListenAndServe())
 	} else {
-		fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile))
+		fatal(server.ListenAndServeTLS(r.tlsInfo.CertFile, r.tlsInfo.KeyFile))
 	}
 
 }
@@ -159,16 +168,10 @@ func startRaftTransport(info Info, scheme string, tlsConf tls.Config) {
 func joinCluster(s *raft.Server, raftURL string, scheme string) error {
 	var b bytes.Buffer
 
-	command := &JoinCommand{
-		Name:    s.Name(),
-		RaftURL: info.RaftURL,
-		EtcdURL: info.EtcdURL,
-	}
-
-	json.NewEncoder(&b).Encode(command)
+	json.NewEncoder(&b).Encode(newJoinCommand())
 
 	// t must be ok
-	t, ok := raftServer.Transporter().(transporter)
+	t, ok := r.Transporter().(transporter)
 
 	if !ok {
 		panic("wrong type")
@@ -194,7 +197,7 @@ func joinCluster(s *raft.Server, raftURL string, scheme string) error {
 				address := resp.Header.Get("Location")
 				debugf("Send Join Request to %s", address)
 
-				json.NewEncoder(&b).Encode(command)
+				json.NewEncoder(&b).Encode(newJoinCommand())
 
 				resp, err = t.Post(address, &b)
 

+ 1 - 1
snapshot.go

@@ -29,7 +29,7 @@ func monitorSnapshot() {
 		currentWrites := etcdStore.TotalWrites() - snapConf.lastWrites
 
 		if currentWrites > snapConf.writesThr {
-			raftServer.TakeSnapshot()
+			r.TakeSnapshot()
 			snapConf.lastWrites = etcdStore.TotalWrites()
 		}
 	}

+ 0 - 0
test → test.sh


+ 17 - 31
test.go → test/test.go

@@ -1,4 +1,4 @@
-package main
+package test
 
 import (
 	"fmt"
@@ -18,7 +18,7 @@ var client = http.Client{
 }
 
 // Sending set commands
-func set(stop chan bool) {
+func Set(stop chan bool) {
 
 	stopSet := false
 	i := 0
@@ -50,12 +50,11 @@ func set(stop chan bool) {
 
 		i++
 	}
-	fmt.Println("set stop")
 	stop <- true
 }
 
 // Create a cluster of etcd nodes
-func createCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os.Process, error) {
+func CreateCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os.Process, error) {
 	argGroup := make([][]string, size)
 
 	sslServer1 := []string{"-serverCAFile=./fixtures/ca/ca.crt",
@@ -97,7 +96,7 @@ func createCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os
 		// have to retry. This retry can take upwards of 15 seconds
 		// which slows tests way down and some of them fail.
 		if i == 0 {
-			time.Sleep(time.Second)
+			time.Sleep(time.Second * 2)
 		}
 	}
 
@@ -105,10 +104,9 @@ func createCluster(size int, procAttr *os.ProcAttr, ssl bool) ([][]string, []*os
 }
 
 // Destroy all the nodes in the cluster
-func destroyCluster(etcds []*os.Process) error {
-	for i, etcd := range etcds {
+func DestroyCluster(etcds []*os.Process) error {
+	for _, etcd := range etcds {
 		err := etcd.Kill()
-		fmt.Println("kill ", i)
 		if err != nil {
 			panic(err.Error())
 		}
@@ -118,7 +116,7 @@ func destroyCluster(etcds []*os.Process) error {
 }
 
 //
-func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
+func Monitor(size int, allowDeadNum int, leaderChan chan string, all chan bool, stop chan bool) {
 	leaderMap := make(map[int]string)
 	baseAddrFormat := "http://0.0.0.0:400%d"
 
@@ -153,6 +151,8 @@ func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
 
 		if i == size {
 			select {
+			case <-stop:
+				return
 			case <-leaderChan:
 				leaderChan <- knownLeader
 			default:
@@ -160,6 +160,14 @@ func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
 			}
 
 		}
+		if dead == 0 {
+			select {
+			case <-all:
+				all <- true
+			default:
+				all <- true
+			}
+		}
 
 		time.Sleep(time.Millisecond * 10)
 	}
@@ -191,28 +199,6 @@ func getLeader(addr string) (string, error) {
 
 }
 
-func directSet() {
-	c := make(chan bool, 1000)
-	for i := 0; i < 1000; i++ {
-		go send(c)
-	}
-
-	for i := 0; i < 1000; i++ {
-		<-c
-	}
-}
-
-func send(c chan bool) {
-	for i := 0; i < 10; i++ {
-		command := &SetCommand{}
-		command.Key = "foo"
-		command.Value = "bar"
-		command.ExpireTime = time.Unix(0, 0)
-		raftServer.Do(command)
-	}
-	c <- true
-}
-
 // Dial with timeout
 func dialTimeoutFast(network, addr string) (net.Conn, error) {
 	return net.DialTimeout(network, addr, time.Millisecond*10)

+ 58 - 0
util.go

@@ -10,6 +10,8 @@ import (
 	"net/http"
 	"net/url"
 	"os"
+	"os/signal"
+	"runtime/pprof"
 	"strconv"
 	"time"
 )
@@ -48,6 +50,15 @@ func webHelper() {
 	}
 }
 
+// startWebInterface starts web interface if webURL is not empty
+func startWebInterface() {
+	if argInfo.WebURL != "" {
+		// start web
+		go webHelper()
+		go web.Start(r.Server, argInfo.WebURL)
+	}
+}
+
 //--------------------------------------
 // HTTP Utilities
 //--------------------------------------
@@ -144,3 +155,50 @@ func fatal(v ...interface{}) {
 	logger.Println("FATAL " + fmt.Sprint(v...))
 	os.Exit(1)
 }
+
+//--------------------------------------
+// CPU profile
+//--------------------------------------
+func runCPUProfile() {
+
+	f, err := os.Create(cpuprofile)
+	if err != nil {
+		fatal(err)
+	}
+	pprof.StartCPUProfile(f)
+
+	c := make(chan os.Signal, 1)
+	signal.Notify(c, os.Interrupt)
+	go func() {
+		for sig := range c {
+			infof("captured %v, stopping profiler and exiting..", sig)
+			pprof.StopCPUProfile()
+			os.Exit(1)
+		}
+	}()
+}
+
+//--------------------------------------
+// Testing
+//--------------------------------------
+func directSet() {
+	c := make(chan bool, 1000)
+	for i := 0; i < 1000; i++ {
+		go send(c)
+	}
+
+	for i := 0; i < 1000; i++ {
+		<-c
+	}
+}
+
+func send(c chan bool) {
+	for i := 0; i < 10; i++ {
+		command := &SetCommand{}
+		command.Key = "foo"
+		command.Value = "bar"
+		command.ExpireTime = time.Unix(0, 0)
+		r.Do(command)
+	}
+	c <- true
+}