Pārlūkot izejas kodu

fix start as a follower(problem with election timeout) and join command

Xiang Li 12 gadi atpakaļ
vecāks
revīzija
b9d789fb84
3 mainītis faili ar 32 papildinājumiem un 16 dzēšanām
  1. 3 3
      command.go
  2. 12 3
      handlers.go
  3. 17 10
      raftd.go

+ 3 - 3
command.go

@@ -121,15 +121,15 @@ func (c *DeleteCommand) GetKey() string{
 }
 
 // joinCommand
-type joinCommand struct {
+type JoinCommand struct {
 	Name string `json:"name"`
 }
 
-func (c *joinCommand) CommandName() string {
+func (c *JoinCommand) CommandName() string {
 	return "join"
 }
 
-func (c *joinCommand) Apply(server *raft.Server) ([]byte, error) {
+func (c *JoinCommand) Apply(server *raft.Server) ([]byte, error) {
 	err := server.AddPeer(c.Name)
 	return nil, err
 }

+ 12 - 3
handlers.go

@@ -25,7 +25,7 @@ func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
 	debug("[recv] POST http://%v/join", server.Name())
-	command := &joinCommand{}
+	command := &JoinCommand{}
 	if err := decodeJsonRequest(req, command); err == nil {
 		if _, err= server.Do(command); err != nil {
 			warn("raftd: Unable to join: %v", err)
@@ -58,6 +58,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
 	err := decodeJsonRequest(req, aereq)
 	if err == nil {
 		debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries))
+		debug("My role is %s", server.State())
 		if resp, _ := server.AppendEntries(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
@@ -134,10 +135,14 @@ func DeleteHttpHandler(w http.ResponseWriter, req *http.Request) {
 func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) {
 	var body []byte
 	var err error
+
+
+	fmt.Println("dispatch")
 	// unlikely to fail twice
 	for {
 		// i am the leader, i will take care of the command
 		if server.State() == "leader" {
+			fmt.Println("i am leader ", server.Name())
 			if body, err = server.Do(command); err != nil {
 				warn("raftd: Unable to write file: %v", err)
 				w.WriteHeader(http.StatusInternalServerError)
@@ -157,6 +162,8 @@ func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) {
 				continue
 			} 
 
+			fmt.Println("forward to ", leaderName)
+
 			path := command.GeneratePath()
 
 			if command.Type() == "POST" {
@@ -167,7 +174,8 @@ func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) {
 				reps, _ := http.Post(fmt.Sprintf("http://%v/%s", 
 					leaderName, command.GeneratePath()), "application/json", reader)
 
-				reps.Body.Read(body)
+				body, _ := ioutil.ReadAll(reps.Body)
+				fmt.Println(body)
 				// good to go
 				w.WriteHeader(http.StatusOK)
 
@@ -179,7 +187,8 @@ func Dispatch(server *raft.Server, command Command, w http.ResponseWriter) {
 				reps, _ := http.Get(fmt.Sprintf("http://%v/%s", 
 					leaderName, command.GeneratePath()))
 				// good to go
-				reps.Body.Read(body)
+				body, _ := ioutil.ReadAll(reps.Body)
+				fmt.Println(body)
 
 				w.WriteHeader(http.StatusOK)
 				

+ 17 - 10
raftd.go

@@ -68,7 +68,7 @@ func main() {
 	}
 
 	// Setup commands.
-	raft.RegisterCommand(&joinCommand{})
+	raft.RegisterCommand(&JoinCommand{})
 	raft.RegisterCommand(&SetCommand{})
 	raft.RegisterCommand(&GetCommand{})
 	raft.RegisterCommand(&DeleteCommand{})
@@ -94,23 +94,30 @@ func main() {
 	// Setup new raft server.
 	server, err = raft.NewServer(name, path, t, nil)
 	//server.DoHandler = DoHandler;
-	server.SetElectionTimeout(2 * time.Second)
-	server.SetHeartbeatTimeout(1 * time.Second)
 	if err != nil {
 		fatal("%v", err)
 	}
-	server.Start()
-
+	server.Initialize()
+	fmt.Println("1 join as ", server.State(), " term ",  server.Term())
 	// Join to another server if we don't have a log.
 	if server.IsLogEmpty() {
 		var leaderHost string
+		fmt.Println("2 join as ", server.State(), " term ",  server.Term())
 		fmt.Println("This server has no log. Please enter a server in the cluster to join\nto or hit enter to initialize a cluster.");
 		fmt.Printf("Join to (host:port)> ");
 		fmt.Scanf("%s", &leaderHost)
+		fmt.Println("3 join as ", server.State(), " term ",  server.Term())
 		if leaderHost == "" {
-			server.Initialize()
+			fmt.Println("init")
+			server.SetElectionTimeout(1 * time.Second)
+			server.SetHeartbeatTimeout(1 * time.Second)
+			server.StartLeader()
 		} else {
-			join(server)
+			server.SetElectionTimeout(1 * time.Second)
+			server.SetHeartbeatTimeout(1 * time.Second)
+			server.StartFollower()
+			fmt.Println("4 join as ", server.State(), " term ",  server.Term())
+			Join(server, leaderHost)
 			fmt.Println("success join")
 		}
 	}
@@ -191,14 +198,14 @@ func getInfo(path string) *Info {
 //--------------------------------------
 
 // Send join requests to the leader.
-func join(s *raft.Server) error {
+func Join(s *raft.Server, serverName string) error {
 	var b bytes.Buffer
-	command := &joinCommand{}
+	command := &JoinCommand{}
 	command.Name = s.Name()
 
 	json.NewEncoder(&b).Encode(command)
 	debug("[send] POST http://%v/join", "localhost:4001")
-	resp, err := http.Post(fmt.Sprintf("http://%s/join", "localhost:4001"), "application/json", &b)
+	resp, err := http.Post(fmt.Sprintf("http://%s/join", serverName), "application/json", &b)
 	if resp != nil {
 		resp.Body.Close()
 		if resp.StatusCode == http.StatusOK {