Browse Source

Merge pull request #41 from xiangli-cmu/master

update
polvi 12 years ago
parent
commit
869d1a3c69
11 changed files with 233 additions and 17 deletions
  1. 12 0
      README.md
  2. 21 7
      client_handlers.go
  3. 14 1
      command.go
  4. 6 0
      error.go
  5. 37 0
      etcd.go
  6. 60 1
      etcd_long_test.go
  7. 5 0
      machines.go
  8. 37 0
      store/keyword_test.go
  9. 27 2
      store/keywords.go
  10. 1 1
      store/store_test.go
  11. 13 5
      test.go

+ 12 - 0
README.md

@@ -355,6 +355,18 @@ We should see there are three nodes in the cluster
 0.0.0.0:4001,0.0.0.0:4002,0.0.0.0:4003
 0.0.0.0:4001,0.0.0.0:4002,0.0.0.0:4003
 ```
 ```
 
 
+Machine list is also available via this API
+
+```sh 
+curl -L http://127.0.0.1:4001/v1/keys/_etcd/machines
+```
+
+```json
+[{"action":"GET","key":"/machines/node1","value":"0.0.0.0,7001,4001","index":4},{"action":"GET","key":"/machines/node3","value":"0.0.0.0,7002,4002","index":4},{"action":"GET","key":"/machines/node4","value":"0.0.0.0,7003,4003","index":4}]
+```
+
+The key of the machine is based on the ```commit index``` when it was added. The value of the machine is ```hostname```, ```raft port``` and ```client port```.
+
 Also try to get the current leader in the cluster
 Also try to get the current leader in the cluster
 
 
 ```
 ```

+ 21 - 7
client_handlers.go

@@ -35,6 +35,14 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) {
 func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]
 	key := req.URL.Path[len("/v1/keys/"):]
 
 
+	if store.CheckKeyword(key) {
+
+		(*w).WriteHeader(http.StatusBadRequest)
+
+		(*w).Write(newJsonError(400, "Set"))
+		return
+	}
+
 	debug("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key)
 	debug("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key)
 
 
 	value := req.FormValue("value")
 	value := req.FormValue("value")
@@ -57,6 +65,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 		(*w).WriteHeader(http.StatusBadRequest)
 		(*w).WriteHeader(http.StatusBadRequest)
 
 
 		(*w).Write(newJsonError(202, "Set"))
 		(*w).Write(newJsonError(202, "Set"))
+		return
 	}
 	}
 
 
 	if len(prevValue) != 0 {
 	if len(prevValue) != 0 {
@@ -94,7 +103,8 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
 	if raftServer.State() == "leader" {
 	if raftServer.State() == "leader" {
 		if body, err := raftServer.Do(c); err != nil {
 		if body, err := raftServer.Do(c); err != nil {
 			if _, ok := err.(store.NotFoundError); ok {
 			if _, ok := err.(store.NotFoundError); ok {
-				http.NotFound((*w), req)
+				(*w).WriteHeader(http.StatusNotFound)
+				(*w).Write(newJsonError(100, err.Error()))
 				return
 				return
 			}
 			}
 
 
@@ -109,13 +119,19 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
 				(*w).Write(newJsonError(102, err.Error()))
 				(*w).Write(newJsonError(102, err.Error()))
 				return
 				return
 			}
 			}
+			if err.Error() == errors[103] {
+				(*w).WriteHeader(http.StatusBadRequest)
+				(*w).Write(newJsonError(103, ""))
+				return
+			}
 			(*w).WriteHeader(http.StatusInternalServerError)
 			(*w).WriteHeader(http.StatusInternalServerError)
 			(*w).Write(newJsonError(300, err.Error()))
 			(*w).Write(newJsonError(300, err.Error()))
 			return
 			return
 		} else {
 		} else {
 
 
 			if body == nil {
 			if body == nil {
-				http.NotFound((*w), req)
+				(*w).WriteHeader(http.StatusNotFound)
+				(*w).Write(newJsonError(100, err.Error()))
 			} else {
 			} else {
 				body, ok := body.([]byte)
 				body, ok := body.([]byte)
 				// this should not happen
 				// this should not happen
@@ -221,13 +237,14 @@ func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	if body, err := command.Apply(raftServer); err != nil {
 	if body, err := command.Apply(raftServer); err != nil {
 
 
 		if _, ok := err.(store.NotFoundError); ok {
 		if _, ok := err.(store.NotFoundError); ok {
-			http.NotFound((*w), req)
+			(*w).WriteHeader(http.StatusNotFound)
+			(*w).Write(newJsonError(100, err.Error()))
 			return
 			return
 		}
 		}
 
 
 		(*w).WriteHeader(http.StatusInternalServerError)
 		(*w).WriteHeader(http.StatusInternalServerError)
 		(*w).Write(newJsonError(300, ""))
 		(*w).Write(newJsonError(300, ""))
-		return
+
 	} else {
 	} else {
 		body, ok := body.([]byte)
 		body, ok := body.([]byte)
 		if !ok {
 		if !ok {
@@ -237,7 +254,6 @@ func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 		(*w).WriteHeader(http.StatusOK)
 		(*w).WriteHeader(http.StatusOK)
 		(*w).Write(body)
 		(*w).Write(body)
 
 
-		return
 	}
 	}
 
 
 }
 }
@@ -274,7 +290,6 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 	if body, err := command.Apply(raftServer); err != nil {
 	if body, err := command.Apply(raftServer); err != nil {
 		warn("Unable to do watch command: %v", err)
 		warn("Unable to do watch command: %v", err)
 		w.WriteHeader(http.StatusInternalServerError)
 		w.WriteHeader(http.StatusInternalServerError)
-		return
 	} else {
 	} else {
 		w.WriteHeader(http.StatusOK)
 		w.WriteHeader(http.StatusOK)
 
 
@@ -284,7 +299,6 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 		}
 		}
 
 
 		w.Write(body)
 		w.Write(body)
-		return
 	}
 	}
 
 
 }
 }

+ 14 - 1
command.go

@@ -119,12 +119,25 @@ func (c *JoinCommand) CommandName() string {
 
 
 // Join a server to the cluster
 // Join a server to the cluster
 func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
+
+	// check machine number in the cluster
+	num := machineNum()
+	if num == maxClusterSize {
+		return []byte("join fail"), fmt.Errorf(errors[103])
+	}
+
+	// add peer in raft
 	err := raftServer.AddPeer(c.Name)
 	err := raftServer.AddPeer(c.Name)
+
+	// add machine in etcd
 	addMachine(c.Name, c.Hostname, c.RaftPort, c.ClientPort)
 	addMachine(c.Name, c.Hostname, c.RaftPort, c.ClientPort)
+
+	// add machine in etcd storage
 	nodeName := fmt.Sprintf("%s%d", "node", raftServer.CommitIndex())
 	nodeName := fmt.Sprintf("%s%d", "node", raftServer.CommitIndex())
-	key := path.Join("machines", nodeName)
+	key := path.Join("_etcd/machines", nodeName)
 	value := fmt.Sprintf("%s,%d,%d", c.Hostname, c.RaftPort, c.ClientPort)
 	value := fmt.Sprintf("%s,%d,%d", c.Hostname, c.RaftPort, c.ClientPort)
 	etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
 	etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
+
 	return []byte("join success"), err
 	return []byte("join success"), err
 }
 }
 
 

+ 6 - 0
error.go

@@ -13,6 +13,8 @@ func init() {
 	errors[100] = "Key Not Found"
 	errors[100] = "Key Not Found"
 	errors[101] = "The given PrevValue is not equal to the value of the key"
 	errors[101] = "The given PrevValue is not equal to the value of the key"
 	errors[102] = "Not A File"
 	errors[102] = "Not A File"
+	errors[103] = "Reached the max number of machines in the cluster"
+
 	// Post form related errors
 	// Post form related errors
 	errors[200] = "Value is Required in POST form"
 	errors[200] = "Value is Required in POST form"
 	errors[201] = "PrevValue is Required in POST form"
 	errors[201] = "PrevValue is Required in POST form"
@@ -21,6 +23,10 @@ func init() {
 	// raft related errors
 	// raft related errors
 	errors[300] = "Raft Internal Error"
 	errors[300] = "Raft Internal Error"
 	errors[301] = "During Leader Election"
 	errors[301] = "During Leader Election"
+
+	// keyword
+	errors[400] = "The prefix of the given key is a keyword in etcd"
+
 }
 }
 
 
 type jsonError struct {
 type jsonError struct {

+ 37 - 0
etcd.go

@@ -16,6 +16,8 @@ import (
 	"net"
 	"net"
 	"net/http"
 	"net/http"
 	"os"
 	"os"
+	"os/signal"
+	"runtime/pprof"
 	"strings"
 	"strings"
 	"time"
 	"time"
 )
 )
@@ -57,6 +59,10 @@ var snapshot bool
 
 
 var retryTimes int
 var retryTimes int
 
 
+var maxClusterSize int
+
+var cpuprofile string
+
 func init() {
 func init() {
 	flag.BoolVar(&verbose, "v", false, "verbose logging")
 	flag.BoolVar(&verbose, "v", false, "verbose logging")
 	flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging")
 	flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging")
@@ -86,6 +92,10 @@ func init() {
 	flag.IntVar(&maxSize, "m", 1024, "the max size of result buffer")
 	flag.IntVar(&maxSize, "m", 1024, "the max size of result buffer")
 
 
 	flag.IntVar(&retryTimes, "r", 3, "the max retry attempts when trying to join a cluster")
 	flag.IntVar(&retryTimes, "r", 3, "the max retry attempts when trying to join a cluster")
+
+	flag.IntVar(&maxClusterSize, "maxsize", 9, "the max size of the cluster")
+
+	flag.StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to file")
 }
 }
 
 
 // CONSTANTS
 // CONSTANTS
@@ -156,6 +166,26 @@ var info *Info
 func main() {
 func main() {
 	flag.Parse()
 	flag.Parse()
 
 
+	if cpuprofile != "" {
+		f, err := os.Create(cpuprofile)
+		if err != nil {
+			log.Fatal(err)
+		}
+		pprof.StartCPUProfile(f)
+		defer pprof.StopCPUProfile()
+
+		c := make(chan os.Signal, 1)
+		signal.Notify(c, os.Interrupt)
+		go func() {
+			for sig := range c {
+				fmt.Printf("captured %v, stopping profiler and exiting..", sig)
+				pprof.StopCPUProfile()
+				os.Exit(1)
+			}
+		}()
+
+	}
+
 	if veryVerbose {
 	if veryVerbose {
 		verbose = true
 		verbose = true
 		raft.SetLogLevel(raft.Debug)
 		raft.SetLogLevel(raft.Debug)
@@ -276,6 +306,10 @@ func startRaft(securityType int) {
 					}
 					}
 					err = joinCluster(raftServer, machine)
 					err = joinCluster(raftServer, machine)
 					if err != nil {
 					if err != nil {
+						if err.Error() == errors[103] {
+							fmt.Println(err)
+							os.Exit(1)
+						}
 						debug("cannot join to cluster via machine %s %s", machine, err)
 						debug("cannot join to cluster via machine %s %s", machine, err)
 					} else {
 					} else {
 						success = true
 						success = true
@@ -602,6 +636,9 @@ func joinCluster(s *raft.Server, serverName string) error {
 				debug("Send Join Request to %s", address)
 				debug("Send Join Request to %s", address)
 				json.NewEncoder(&b).Encode(command)
 				json.NewEncoder(&b).Encode(command)
 				resp, err = t.Post(fmt.Sprintf("%s/join", address), &b)
 				resp, err = t.Post(fmt.Sprintf("%s/join", address), &b)
+			} else if resp.StatusCode == http.StatusBadRequest {
+				debug("Reach max number machines in the cluster")
+				return fmt.Errorf(errors[103])
 			} else {
 			} else {
 				return fmt.Errorf("Unable to join")
 				return fmt.Errorf("Unable to join")
 			}
 			}

+ 60 - 1
etcd_long_test.go

@@ -2,6 +2,7 @@ package main
 
 
 import (
 import (
 	"fmt"
 	"fmt"
+	"math/rand"
 	"os"
 	"os"
 	"strconv"
 	"strconv"
 	"strings"
 	"strings"
@@ -34,7 +35,7 @@ func TestKillLeader(t *testing.T) {
 
 
 	leader := "0.0.0.0:7001"
 	leader := "0.0.0.0:7001"
 
 
-	for i := 0; i < 200; i++ {
+	for i := 0; i < 10; i++ {
 		port, _ := strconv.Atoi(strings.Split(leader, ":")[1])
 		port, _ := strconv.Atoi(strings.Split(leader, ":")[1])
 		num := port - 7001
 		num := port - 7001
 		fmt.Println("kill server ", num)
 		fmt.Println("kill server ", num)
@@ -58,5 +59,63 @@ func TestKillLeader(t *testing.T) {
 		fmt.Println("Leader election time average is", avgTime, "with election timeout", ELECTIONTIMEOUT)
 		fmt.Println("Leader election time average is", avgTime, "with election timeout", ELECTIONTIMEOUT)
 		etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr)
 		etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr)
 	}
 	}
+}
+
+// TestKillRandom kills random machines in the cluster and
+// restart them after all other machines agree on the same leader
+func TestKillRandom(t *testing.T) {
+	procAttr := new(os.ProcAttr)
+	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
+
+	clusterSize := 9
+	argGroup, etcds, err := createCluster(clusterSize, procAttr)
+
+	if err != nil {
+		t.Fatal("cannot create cluster")
+	}
+
+	defer destroyCluster(etcds)
+
+	leaderChan := make(chan string, 1)
+
+	time.Sleep(3 * time.Second)
+
+	go leaderMonitor(clusterSize, 4, leaderChan)
+
+	toKill := make(map[int]bool)
+
+	for i := 0; i < 20; i++ {
+		fmt.Printf("TestKillRandom Round[%d/20]\n", i)
+
+		j := 0
+		for {
+
+			r := rand.Int31n(9)
+			if _, ok := toKill[int(r)]; !ok {
+				j++
+				toKill[int(r)] = true
+			}
+
+			if j > 3 {
+				break
+			}
+
+		}
+
+		for num, _ := range toKill {
+			etcds[num].Kill()
+			etcds[num].Release()
+		}
+
+		<-leaderChan
+
+		for num, _ := range toKill {
+			etcds[num], err = os.StartProcess("etcd", argGroup[num], procAttr)
+		}
+
+		toKill = make(map[int]bool)
+	}
+
+	<-leaderChan
 
 
 }
 }

+ 5 - 0
machines.go

@@ -28,3 +28,8 @@ func getClientAddr(name string) (string, bool) {
 
 
 	return addr, true
 	return addr, true
 }
 }
+
+// machineNum returns the number of machines in the cluster
+func machineNum() int {
+	return len(machinesMap)
+}

+ 37 - 0
store/keyword_test.go

@@ -0,0 +1,37 @@
+package store
+
+import (
+	"testing"
+)
+
+func TestKeywords(t *testing.T) {
+	keyword := CheckKeyword("machines")
+	if !keyword {
+		t.Fatal("machines should be keyword")
+	}
+
+	keyword = CheckKeyword("/machines")
+
+	if !keyword {
+		t.Fatal("/machines should be keyword")
+	}
+
+	keyword = CheckKeyword("/machines/")
+
+	if !keyword {
+		t.Fatal("/machines/ contains keyword prefix")
+	}
+
+	keyword = CheckKeyword("/machines/node1")
+
+	if !keyword {
+		t.Fatal("/machines/* contains keyword prefix")
+	}
+
+	keyword = CheckKeyword("/nokeyword/machines/node1")
+
+	if keyword {
+		t.Fatal("this does not contain keyword prefix")
+	}
+
+}

+ 27 - 2
store/keywords.go

@@ -1,8 +1,33 @@
 package store
 package store
 
 
+import (
+	"path"
+	"strings"
+)
+
 // keywords for internal useage
 // keywords for internal useage
+// Key for string keyword; Value for only checking prefix
 var keywords = map[string]bool{
 var keywords = map[string]bool{
-	"/acoounts":       true,
+	"/_etcd":       true,
 	"/ephemeralNodes": true,
 	"/ephemeralNodes": true,
-	"/machines":	   true,
+}
+
+// CheckKeyword will check if the key contains the keyword.
+// For now, we only check for prefix.
+func CheckKeyword(key string) bool {
+	key = path.Clean("/" + key)
+
+	// find the second "/"
+	i := strings.Index(key[1:], "/")
+
+	var prefix string
+
+	if i == -1 {
+		prefix = key
+	} else {
+		prefix = key[:i+1]
+	}
+	_, ok := keywords[prefix]
+
+	return ok
 }
 }

+ 1 - 1
store/store_test.go

@@ -36,7 +36,7 @@ func TestSaveAndRecovery(t *testing.T) {
 
 
 	s := CreateStore(100)
 	s := CreateStore(100)
 	s.Set("foo", "bar", time.Unix(0, 0), 1)
 	s.Set("foo", "bar", time.Unix(0, 0), 1)
-	s.Set("foo2", "bar2", time.Now().Add(time.Second * 5), 2)
+	s.Set("foo2", "bar2", time.Now().Add(time.Second*5), 2)
 	state, err := s.Save()
 	state, err := s.Save()
 
 
 	if err != nil {
 	if err != nil {

+ 13 - 5
test.go

@@ -11,9 +11,10 @@ import (
 	"time"
 	"time"
 )
 )
 
 
-var client = http.Client{Transport: &http.Transport{
-	Dial: dialTimeoutFast,
-},
+var client = http.Client{
+	Transport: &http.Transport{
+		Dial: dialTimeoutFast,
+	},
 }
 }
 
 
 // Sending set commands
 // Sending set commands
@@ -33,8 +34,6 @@ func set(stop chan bool) {
 				stopSet = true
 				stopSet = true
 
 
 			default:
 			default:
-				fmt.Println("Set failed!")
-				return
 			}
 			}
 		}
 		}
 
 
@@ -97,12 +96,15 @@ func destroyCluster(etcds []*os.Process) error {
 func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
 func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
 	leaderMap := make(map[int]string)
 	leaderMap := make(map[int]string)
 	baseAddrFormat := "http://0.0.0.0:400%d/leader"
 	baseAddrFormat := "http://0.0.0.0:400%d/leader"
+
 	for {
 	for {
 		knownLeader := "unknown"
 		knownLeader := "unknown"
 		dead := 0
 		dead := 0
 		var i int
 		var i int
+
 		for i = 0; i < size; i++ {
 		for i = 0; i < size; i++ {
 			leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+1))
 			leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+1))
+
 			if err == nil {
 			if err == nil {
 				leaderMap[i] = leader
 				leaderMap[i] = leader
 
 
@@ -112,14 +114,18 @@ func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
 					if leader != knownLeader {
 					if leader != knownLeader {
 						break
 						break
 					}
 					}
+
 				}
 				}
+
 			} else {
 			} else {
 				dead++
 				dead++
 				if dead > allowDeadNum {
 				if dead > allowDeadNum {
 					break
 					break
 				}
 				}
 			}
 			}
+
 		}
 		}
+
 		if i == size {
 		if i == size {
 			select {
 			select {
 			case <-leaderChan:
 			case <-leaderChan:
@@ -129,8 +135,10 @@ func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
 			}
 			}
 
 
 		}
 		}
+
 		time.Sleep(time.Millisecond * 10)
 		time.Sleep(time.Millisecond * 10)
 	}
 	}
+
 }
 }
 
 
 func getLeader(addr string) (string, error) {
 func getLeader(addr string) (string, error) {