Browse Source

change snapshot api due to the change in go-raft

Xiang Li 12 years ago
parent
commit
58501151da
5 changed files with 48 additions and 5 deletions
  1. 3 1
      client_handlers.go
  2. 0 1
      command.go
  3. 6 2
      etcd.go
  4. 17 1
      raft_handlers.go
  5. 22 0
      transporter.go

+ 3 - 1
client_handlers.go

@@ -1,6 +1,7 @@
 package main
 
 import (
+	"fmt"
 	"github.com/coreos/etcd/store"
 	"net/http"
 	"strconv"
@@ -110,11 +111,12 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
 				return
 			}
 			(*w).WriteHeader(http.StatusInternalServerError)
-			(*w).Write(newJsonError(300, "No Leader"))
+			(*w).Write(newJsonError(300, err.Error()))
 			return
 		} else {
 
 			if body == nil {
+				fmt.Println("empty but not err!")
 				http.NotFound((*w), req)
 			} else {
 				body, ok := body.([]byte)

+ 0 - 1
command.go

@@ -120,6 +120,5 @@ func (c *JoinCommand) CommandName() string {
 func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 	err := raftServer.AddPeer(c.Name)
 	addMachine(c.Name, c.Hostname, c.RaftPort, c.ClientPort)
-
 	return []byte("join success"), err
 }

+ 6 - 2
etcd.go

@@ -249,7 +249,7 @@ func startRaft(securityType int) {
 
 				err = joinCluster(raftServer, machine)
 				if err != nil {
-					debug("cannot join to cluster via machine %s", machine)
+					debug("cannot join to cluster via machine %s %s", machine, err)
 				} else {
 					break
 				}
@@ -267,7 +267,7 @@ func startRaft(securityType int) {
 	}
 
 	// open the snapshot
-	// go server.Snapshot()
+	go raftServer.Snapshot()
 
 	// start to response to raft requests
 	go startRaftTransport(info.RaftPort, securityType)
@@ -332,6 +332,7 @@ func startRaftTransport(port int, st int) {
 	http.HandleFunc("/log", GetLogHttpHandler)
 	http.HandleFunc("/log/append", AppendEntriesHttpHandler)
 	http.HandleFunc("/snapshot", SnapshotHttpHandler)
+	http.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
 	http.HandleFunc("/client", ClientHttpHandler)
 
 	switch st {
@@ -566,6 +567,9 @@ func joinCluster(s *raft.Server, serverName string) error {
 				json.NewEncoder(&b).Encode(command)
 				resp, err = t.Post(fmt.Sprintf("%s/join", address), &b)
 			} else {
+				b, _ := ioutil.ReadAll(resp.Body)
+				fmt.Println(string(b))
+				resp.Body.Close()
 				return fmt.Errorf("Unable to join")
 			}
 		}

+ 17 - 1
raft_handlers.go

@@ -61,7 +61,23 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
 	err := decodeJsonRequest(req, aereq)
 	if err == nil {
 		debug("[recv] POST http://%s/snapshot/ ", raftServer.Name())
-		if resp, _ := raftServer.SnapshotRecovery(aereq); resp != nil {
+		if resp := raftServer.SnapshotRequest(aereq); resp != nil {
+			w.WriteHeader(http.StatusOK)
+			json.NewEncoder(w).Encode(resp)
+			return
+		}
+	}
+	warn("[Snapshot] ERROR: %v", err)
+	w.WriteHeader(http.StatusInternalServerError)
+}
+
+// Response to recover from snapshot request
+func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
+	aereq := &raft.SnapshotRecoveryRequest{}
+	err := decodeJsonRequest(req, aereq)
+	if err == nil {
+		debug("[recv] POST http://%s/snapshotRecovery/ ", raftServer.Name())
+		if resp := raftServer.SnapshotRecoveryRequest(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			return

+ 22 - 0
transporter.go

@@ -89,6 +89,28 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r
 	return aersp
 }
 
+// Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
+func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
+	var aersp *raft.SnapshotRecoveryResponse
+	var b bytes.Buffer
+	json.NewEncoder(&b).Encode(req)
+
+	debug("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", peer.Name(),
+		req.LastTerm, req.LastIndex)
+
+	resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", peer.Name()), &b)
+
+	if resp != nil {
+		defer resp.Body.Close()
+		aersp = &raft.SnapshotRecoveryResponse{}
+		if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
+
+			return aersp
+		}
+	}
+	return aersp
+}
+
 // Get the client address of the leader in the cluster
 func (t transporter) GetLeaderClientAddress() string {
 	resp, _ := t.Get(raftServer.Leader() + "/client")