Browse Source

Merge pull request #27 from xiangli-cmu/snapshot

Add Snapshot handler
Xiang Li 12 years ago
parent
commit
0e7c3602d3
5 changed files with 62 additions and 16 deletions
  1. 2 1
      client_handlers.go
  2. 0 1
      command.go
  3. 17 8
      etcd.go
  4. 22 6
      raft_handlers.go
  5. 21 0
      transporter.go

+ 2 - 1
client_handlers.go

@@ -110,7 +110,7 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
 				return
 				return
 			}
 			}
 			(*w).WriteHeader(http.StatusInternalServerError)
 			(*w).WriteHeader(http.StatusInternalServerError)
-			(*w).Write(newJsonError(300, "No Leader"))
+			(*w).Write(newJsonError(300, err.Error()))
 			return
 			return
 		} else {
 		} else {
 
 
@@ -118,6 +118,7 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
 				http.NotFound((*w), req)
 				http.NotFound((*w), req)
 			} else {
 			} else {
 				body, ok := body.([]byte)
 				body, ok := body.([]byte)
+				// this should not happen
 				if !ok {
 				if !ok {
 					panic("wrong type")
 					panic("wrong type")
 				}
 				}

+ 0 - 1
command.go

@@ -120,6 +120,5 @@ func (c *JoinCommand) CommandName() string {
 func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 	err := raftServer.AddPeer(c.Name)
 	err := raftServer.AddPeer(c.Name)
 	addMachine(c.Name, c.Hostname, c.RaftPort, c.ClientPort)
 	addMachine(c.Name, c.Hostname, c.RaftPort, c.ClientPort)
-
 	return []byte("join success"), err
 	return []byte("join success"), err
 }
 }

+ 17 - 8
etcd.go

@@ -52,6 +52,8 @@ var ignore bool
 
 
 var maxSize int
 var maxSize int
 
 
+var snapshot bool
+
 func init() {
 func init() {
 	flag.BoolVar(&verbose, "v", false, "verbose logging")
 	flag.BoolVar(&verbose, "v", false, "verbose logging")
 
 
@@ -75,6 +77,8 @@ func init() {
 
 
 	flag.BoolVar(&ignore, "i", false, "ignore the old configuration, create a new node")
 	flag.BoolVar(&ignore, "i", false, "ignore the old configuration, create a new node")
 
 
+	flag.BoolVar(&snapshot, "snapshot", false, "open or close snapshot")
+
 	flag.IntVar(&maxSize, "m", 1024, "the max size of result buffer")
 	flag.IntVar(&maxSize, "m", 1024, "the max size of result buffer")
 }
 }
 
 
@@ -207,13 +211,15 @@ func startRaft(securityType int) {
 	}
 	}
 
 
 	// LoadSnapshot
 	// LoadSnapshot
-	// err = raftServer.LoadSnapshot()
+	if snapshot {
+		err = raftServer.LoadSnapshot()
 
 
-	// if err == nil {
-	// 	debug("%s finished load snapshot", raftServer.Name())
-	// } else {
-	// 	debug(err)
-	// }
+		if err == nil {
+			debug("%s finished load snapshot", raftServer.Name())
+		} else {
+			debug(err.Error())
+		}
+	}
 
 
 	raftServer.Initialize()
 	raftServer.Initialize()
 	raftServer.SetElectionTimeout(ELECTIONTIMTOUT)
 	raftServer.SetElectionTimeout(ELECTIONTIMTOUT)
@@ -249,7 +255,7 @@ func startRaft(securityType int) {
 
 
 				err = joinCluster(raftServer, machine)
 				err = joinCluster(raftServer, machine)
 				if err != nil {
 				if err != nil {
-					debug("cannot join to cluster via machine %s", machine)
+					debug("cannot join to cluster via machine %s %s", machine, err)
 				} else {
 				} else {
 					break
 					break
 				}
 				}
@@ -267,7 +273,9 @@ func startRaft(securityType int) {
 	}
 	}
 
 
 	// open the snapshot
 	// open the snapshot
-	// go server.Snapshot()
+	if snapshot {
+		go raftServer.Snapshot()
+	}
 
 
 	// start to response to raft requests
 	// start to response to raft requests
 	go startRaftTransport(info.RaftPort, securityType)
 	go startRaftTransport(info.RaftPort, securityType)
@@ -332,6 +340,7 @@ func startRaftTransport(port int, st int) {
 	http.HandleFunc("/log", GetLogHttpHandler)
 	http.HandleFunc("/log", GetLogHttpHandler)
 	http.HandleFunc("/log/append", AppendEntriesHttpHandler)
 	http.HandleFunc("/log/append", AppendEntriesHttpHandler)
 	http.HandleFunc("/snapshot", SnapshotHttpHandler)
 	http.HandleFunc("/snapshot", SnapshotHttpHandler)
+	http.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
 	http.HandleFunc("/client", ClientHttpHandler)
 	http.HandleFunc("/client", ClientHttpHandler)
 
 
 	switch st {
 	switch st {

+ 22 - 6
raft_handlers.go

@@ -13,7 +13,7 @@ import (
 
 
 // Get all the current logs
 // Get all the current logs
 func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
 func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debug("[recv] GET http://%v/log", raftServer.Name())
+	debug("[recv] GET %s/log", raftTransporter.scheme+raftServer.Name())
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Content-Type", "application/json")
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(raftServer.LogEntries())
 	json.NewEncoder(w).Encode(raftServer.LogEntries())
@@ -24,7 +24,7 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
 	rvreq := &raft.RequestVoteRequest{}
 	rvreq := &raft.RequestVoteRequest{}
 	err := decodeJsonRequest(req, rvreq)
 	err := decodeJsonRequest(req, rvreq)
 	if err == nil {
 	if err == nil {
-		debug("[recv] POST http://%v/vote [%s]", raftServer.Name(), rvreq.CandidateName)
+		debug("[recv] POST %s/vote [%s]", raftTransporter.scheme+raftServer.Name(), rvreq.CandidateName)
 		if resp := raftServer.RequestVote(rvreq); resp != nil {
 		if resp := raftServer.RequestVote(rvreq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			json.NewEncoder(w).Encode(resp)
@@ -41,7 +41,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
 	err := decodeJsonRequest(req, aereq)
 	err := decodeJsonRequest(req, aereq)
 
 
 	if err == nil {
 	if err == nil {
-		debug("[recv] POST http://%s/log/append [%d]", raftServer.Name(), len(aereq.Entries))
+		debug("[recv] POST %s/log/append [%d]", raftTransporter.scheme+raftServer.Name(), len(aereq.Entries))
 		if resp := raftServer.AppendEntries(aereq); resp != nil {
 		if resp := raftServer.AppendEntries(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			json.NewEncoder(w).Encode(resp)
@@ -60,8 +60,24 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
 	aereq := &raft.SnapshotRequest{}
 	aereq := &raft.SnapshotRequest{}
 	err := decodeJsonRequest(req, aereq)
 	err := decodeJsonRequest(req, aereq)
 	if err == nil {
 	if err == nil {
-		debug("[recv] POST http://%s/snapshot/ ", raftServer.Name())
-		if resp, _ := raftServer.SnapshotRecovery(aereq); resp != nil {
+		debug("[recv] POST %s/snapshot/ ", raftTransporter.scheme+raftServer.Name())
+		if resp := raftServer.RequestSnapshot(aereq); resp != nil {
+			w.WriteHeader(http.StatusOK)
+			json.NewEncoder(w).Encode(resp)
+			return
+		}
+	}
+	warn("[Snapshot] ERROR: %v", err)
+	w.WriteHeader(http.StatusInternalServerError)
+}
+
+// Response to recover from snapshot request
+func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
+	aereq := &raft.SnapshotRecoveryRequest{}
+	err := decodeJsonRequest(req, aereq)
+	if err == nil {
+		debug("[recv] POST %s/snapshotRecovery/ ", raftTransporter.scheme+raftServer.Name())
+		if resp := raftServer.SnapshotRecoveryRequest(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			json.NewEncoder(w).Encode(resp)
 			return
 			return
@@ -73,7 +89,7 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 
 // Get the port that listening for client connecting of the server
 // Get the port that listening for client connecting of the server
 func ClientHttpHandler(w http.ResponseWriter, req *http.Request) {
 func ClientHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debug("[recv] Get http://%v/client/ ", raftServer.Name())
+	debug("[recv] Get %s/client/ ", raftTransporter.scheme+raftServer.Name())
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
 	client := hostname + ":" + strconv.Itoa(clientPort)
 	client := hostname + ":" + strconv.Itoa(clientPort)
 	w.Write([]byte(client))
 	w.Write([]byte(client))

+ 21 - 0
transporter.go

@@ -89,6 +89,27 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r
 	return aersp
 	return aersp
 }
 }
 
 
+// Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
+func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
+	var aersp *raft.SnapshotRecoveryResponse
+	var b bytes.Buffer
+	json.NewEncoder(&b).Encode(req)
+
+	debug("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", peer.Name(),
+		req.LastTerm, req.LastIndex)
+
+	resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", peer.Name()), &b)
+
+	if resp != nil {
+		defer resp.Body.Close()
+		aersp = &raft.SnapshotRecoveryResponse{}
+		if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
+			return aersp
+		}
+	}
+	return aersp
+}
+
 // Get the client address of the leader in the cluster
 // Get the client address of the leader in the cluster
 func (t transporter) GetLeaderClientAddress() string {
 func (t transporter) GetLeaderClientAddress() string {
 	resp, _ := t.Get(raftServer.Leader() + "/client")
 	resp, _ := t.Get(raftServer.Leader() + "/client")