Browse Source

election new leader and rejoin works

Xiang Li 12 years ago
parent
commit
71c0ffec3a
2 changed files with 34 additions and 26 deletions
  1. 20 16
      handlers.go
  2. 14 10
      raftd.go

+ 20 - 16
handlers.go

@@ -22,22 +22,6 @@ func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
 	json.NewEncoder(w).Encode(server.LogEntries())
 	json.NewEncoder(w).Encode(server.LogEntries())
 }
 }
 
 
-func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debug("[recv] POST http://%v/join", server.Name())
-	command := &JoinCommand{}
-	if err := decodeJsonRequest(req, command); err == nil {
-		if _, err= server.Do(command); err != nil {
-			warn("raftd: Unable to join: %v", err)
-			w.WriteHeader(http.StatusInternalServerError)
-		} else {
-			w.WriteHeader(http.StatusOK)
-		}
-	} else {
-		warn("[join] ERROR: %v", err)
-		w.WriteHeader(http.StatusInternalServerError)
-	}
-}
-
 func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
 func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
 	rvreq := &raft.RequestVoteRequest{}
 	rvreq := &raft.RequestVoteRequest{}
 	err := decodeJsonRequest(req, rvreq)
 	err := decodeJsonRequest(req, rvreq)
@@ -59,6 +43,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
 		debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries))
 		debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries))
 		debug("My role is %s", server.State())
 		debug("My role is %s", server.State())
 		if resp, _ := server.AppendEntries(aereq); resp != nil {
 		if resp, _ := server.AppendEntries(aereq); resp != nil {
+			debug("write back success")
 			w.WriteHeader(http.StatusOK)
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			json.NewEncoder(w).Encode(resp)
 			if !resp.Success {
 			if !resp.Success {
@@ -68,6 +53,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
 		}
 		}
 	}
 	}
 	warn("[append] ERROR: %v", err)
 	warn("[append] ERROR: %v", err)
+	debug("write back")
 	w.WriteHeader(http.StatusInternalServerError)
 	w.WriteHeader(http.StatusInternalServerError)
 }
 }
 
 
@@ -86,6 +72,24 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
 	w.WriteHeader(http.StatusInternalServerError)
 	w.WriteHeader(http.StatusInternalServerError)
 }
 }
 
 
+
+func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
+	debug("[recv] POST http://%v/join", server.Name())
+	command := &JoinCommand{}
+	if err := decodeJsonRequest(req, command); err == nil {
+		if _, err= server.Do(command); err != nil {
+			warn("raftd: Unable to join: %v", err)
+			w.WriteHeader(http.StatusInternalServerError)
+		} else {
+			w.WriteHeader(http.StatusOK)
+		}
+	} else {
+		warn("[join] ERROR: %v", err)
+		w.WriteHeader(http.StatusInternalServerError)
+	}
+}
+
+
 func SetHttpHandler(w http.ResponseWriter, req *http.Request) {
 func SetHttpHandler(w http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/set/"):]
 	key := req.URL.Path[len("/set/"):]
 
 

+ 14 - 10
raftd.go

@@ -109,23 +109,31 @@ func main() {
 		fmt.Println("3 join as ", server.State(), " term ",  server.Term())
 		fmt.Println("3 join as ", server.State(), " term ",  server.Term())
 		if leaderHost == "" {
 		if leaderHost == "" {
 			fmt.Println("init")
 			fmt.Println("init")
-			server.SetElectionTimeout(1 * time.Second)
+			server.SetElectionTimeout(10 * time.Second)
 			server.SetHeartbeatTimeout(1 * time.Second)
 			server.SetHeartbeatTimeout(1 * time.Second)
 			server.StartLeader()
 			server.StartLeader()
+			// join self 
+
+			command := &JoinCommand{}
+			command.Name = server.Name()
+
+			server.Do(command)
 		} else {
 		} else {
-			server.SetElectionTimeout(1 * time.Second)
+			server.SetElectionTimeout(10 * time.Second)
 			server.SetHeartbeatTimeout(1 * time.Second)
 			server.SetHeartbeatTimeout(1 * time.Second)
 			server.StartFollower()
 			server.StartFollower()
 			fmt.Println("4 join as ", server.State(), " term ",  server.Term())
 			fmt.Println("4 join as ", server.State(), " term ",  server.Term())
 			Join(server, leaderHost)
 			Join(server, leaderHost)
 			fmt.Println("success join")
 			fmt.Println("success join")
 		}
 		}
+	} else {
+		server.SetElectionTimeout(10 * time.Second)
+		server.SetHeartbeatTimeout(1 * time.Second)
+		server.StartFollower()
 	}
 	}
+
 	// open snapshot
 	// open snapshot
 	//go server.Snapshot()
 	//go server.Snapshot()
-	
-	// Create HTTP interface.
-    //r := mux.NewRouter()
 
 
     // internal commands
     // internal commands
     http.HandleFunc("/join", JoinHttpHandler)
     http.HandleFunc("/join", JoinHttpHandler)
@@ -136,14 +144,10 @@ func main() {
 
 
     // external commands
     // external commands
     http.HandleFunc("/set/", SetHttpHandler)
     http.HandleFunc("/set/", SetHttpHandler)
-    //r.HandleFunc("/get/{key}", GetHttpHandler).Methods("GET")
+    http.HandleFunc("/get/", GetHttpHandler)
     http.HandleFunc("/delete/", DeleteHttpHandler)
     http.HandleFunc("/delete/", DeleteHttpHandler)
     http.HandleFunc("/watch/", WatchHttpHandler)
     http.HandleFunc("/watch/", WatchHttpHandler)
 
 
-    //http.Handle("/", r)
-
-    http.HandleFunc("/get/", GetHttpHandler)
-
 	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.Port), nil))
 	log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.Port), nil))
 }
 }