Browse Source

Merge pull request #12 from xiangli-cmu/master

accept machine list to join cluster
polvi 12 years ago
parent
commit
7ab3ccdf44
3 changed files with 44 additions and 17 deletions
  1. 11 5
      client_handlers.go
  2. 32 11
      etcd.go
  3. 1 1
      raft_handlers.go

+ 11 - 5
client_handlers.go

@@ -51,7 +51,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 		(*w).WriteHeader(http.StatusInternalServerError)
 	}
 
-	dispatch(command, w, req)
+	dispatch(command, w, req, true)
 
 }
 
@@ -77,7 +77,7 @@ func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
 		w.WriteHeader(http.StatusInternalServerError)
 	}
 
-	dispatch(command, &w, req)
+	dispatch(command, &w, req, true)
 
 }
 
@@ -90,11 +90,11 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	command := &DeleteCommand{}
 	command.Key = key
 
-	dispatch(command, w, req)
+	dispatch(command, w, req, true)
 }
 
 // Dispatch the command to leader
-func dispatch(c Command, w *http.ResponseWriter, req *http.Request) {
+func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool) {
 	if raftServer.State() == "leader" {
 		if body, err := raftServer.Do(c); err != nil {
 			warn("Commit failed %v", err)
@@ -132,7 +132,13 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request) {
 			scheme = "http://"
 		}
 
-		url := scheme + raftTransporter.GetLeaderClientAddress() + path
+		var url string
+
+		if client {
+			url = scheme + raftTransporter.GetLeaderClientAddress() + path
+		} else {
+			url = scheme + raftServer.Leader() + path
+		}
 
 		debug("Redirect to %s", url)
 

+ 32 - 11
etcd.go

@@ -27,7 +27,8 @@ import (
 
 var verbose bool
 
-var cluster string
+var machines string
+var cluster []string
 
 var address string
 var clientPort int
@@ -51,7 +52,7 @@ var maxSize int
 func init() {
 	flag.BoolVar(&verbose, "v", false, "verbose logging")
 
-	flag.StringVar(&cluster, "C", "", "the ip address and port of a existing cluster")
+	flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in cluster, sepearate by comma")
 
 	flag.StringVar(&address, "a", "0.0.0.0", "the ip address of the local machine")
 	flag.IntVar(&clientPort, "c", 4001, "the port to communicate with clients")
@@ -135,6 +136,8 @@ var info *Info
 func main() {
 	flag.Parse()
 
+	cluster = strings.Split(machines, ",")
+
 	// Setup commands.
 	registerCommands()
 
@@ -203,7 +206,7 @@ func startRaft(securityType int) {
 	if raftServer.IsLogEmpty() {
 
 		// start as a leader in a new cluster
-		if cluster == "" {
+		if len(cluster) == 0 {
 			raftServer.StartLeader()
 
 			time.Sleep(time.Millisecond * 20)
@@ -223,9 +226,17 @@ func startRaft(securityType int) {
 		} else {
 			raftServer.StartFollower()
 
-			err := joinCluster(raftServer, cluster)
+			for _, machine := range cluster {
+
+				err := joinCluster(raftServer, machine)
+				if err != nil {
+					debug("cannot join to cluster via machine %s", machine)
+				} else {
+					break
+				}
+			}
 			if err != nil {
-				fatal(fmt.Sprintln(err))
+				fatal("cannot join to cluster via all given machines!")
 			}
 			debug("%s success join to the cluster", raftServer.Name())
 		}
@@ -414,6 +425,7 @@ func getInfo(path string) *Info {
 
 	// Delete the old configuration if exist
 	if ignore {
+
 		logPath := fmt.Sprintf("%s/log", path)
 		snapshotPath := fmt.Sprintf("%s/snapshotPath", path)
 		os.Remove(infoPath)
@@ -496,11 +508,20 @@ func joinCluster(s *raft.Server, serverName string) error {
 	json.NewEncoder(&b).Encode(command)
 
 	// t must be ok
-	t, _ := raftServer.Transporter().(transporter)
+	t, ok := raftServer.Transporter().(transporter)
+
+	if !ok {
+		panic("wrong type")
+	}
+
 	debug("Send Join Request to %s", serverName)
+
 	resp, err := t.Post(fmt.Sprintf("%s/join", serverName), &b)
 
+	debug("Finish Join Request to %s", serverName)
+
 	for {
+		fmt.Println(err, resp)
 		if err != nil {
 			return fmt.Errorf("Unable to join: %v", err)
 		}
@@ -509,17 +530,17 @@ func joinCluster(s *raft.Server, serverName string) error {
 			if resp.StatusCode == http.StatusOK {
 				return nil
 			}
-			if resp.StatusCode == http.StatusServiceUnavailable {
-				address, err := ioutil.ReadAll(resp.Body)
-				if err != nil {
-					warn("Cannot Read Leader info: %v", err)
-				}
+
+			if resp.StatusCode == http.StatusTemporaryRedirect {
+				fmt.Println("redirect")
+				address = resp.Header.Get("Location")
 				debug("Leader is %s", address)
 				debug("Send Join Request to %s", address)
 				json.NewEncoder(&b).Encode(command)
 				resp, err = t.Post(fmt.Sprintf("%s/join", address), &b)
 			}
 		}
+
 	}
 	return fmt.Errorf("Unable to join: %v", err)
 }

+ 1 - 1
raft_handlers.go

@@ -86,7 +86,7 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 	if err := decodeJsonRequest(req, command); err == nil {
 		debug("Receive Join Request from %s", command.Name)
-		dispatch(command, &w, req)
+		dispatch(command, &w, req, false)
 	} else {
 		w.WriteHeader(http.StatusInternalServerError)
 		return