Browse Source

Merge pull request #290 from xiangli-cmu/feat-use-raft-protobuf

feat use-raft-protobuf
Ben Johnson 12 years ago
parent
commit
71a73d3904
3 changed files with 140 additions and 81 deletions
  1. 89 46
      server/peer_server_handlers.go
  2. 1 0
      server/server.go
  3. 50 35
      server/transporter.go

+ 89 - 46
server/peer_server_handlers.go

@@ -23,72 +23,115 @@ func (ps *PeerServer) GetLogHttpHandler(w http.ResponseWriter, req *http.Request
 // Response to vote request
 func (ps *PeerServer) VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
 	rvreq := &raft.RequestVoteRequest{}
-	err := decodeJsonRequest(req, rvreq)
-	if err == nil {
-		log.Debugf("[recv] POST %s/vote [%s]", ps.url, rvreq.CandidateName)
-		if resp := ps.raftServer.RequestVote(rvreq); resp != nil {
-			w.WriteHeader(http.StatusOK)
-			json.NewEncoder(w).Encode(resp)
-			return
-		}
+
+	if _, err := rvreq.Decode(req.Body); err != nil {
+		http.Error(w, "", http.StatusBadRequest)
+		log.Warnf("[recv] BADREQUEST %s/vote [%v]", ps.url, err)
+		return
+	}
+
+	log.Debugf("[recv] POST %s/vote [%s]", ps.url, rvreq.CandidateName)
+
+	resp := ps.raftServer.RequestVote(rvreq)
+
+	if resp == nil {
+		log.Warn("[vote] Error: nil response")
+		http.Error(w, "", http.StatusInternalServerError)
+		return
+	}
+
+	if _, err := resp.Encode(w); err != nil {
+		log.Warn("[vote] Error: %v", err)
+		http.Error(w, "", http.StatusInternalServerError)
+		return
 	}
-	log.Warnf("[vote] ERROR: %v", err)
-	w.WriteHeader(http.StatusInternalServerError)
 }
 
 // Response to append entries request
 func (ps *PeerServer) AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
 	aereq := &raft.AppendEntriesRequest{}
-	err := decodeJsonRequest(req, aereq)
 
-	if err == nil {
-		log.Debugf("[recv] POST %s/log/append [%d]", ps.url, len(aereq.Entries))
+	if _, err := aereq.Decode(req.Body); err != nil {
+		http.Error(w, "", http.StatusBadRequest)
+		log.Warnf("[recv] BADREQUEST %s/log/append [%v]", ps.url, err)
+		return
+	}
 
-		ps.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
+	log.Debugf("[recv] POST %s/log/append [%d]", ps.url, len(aereq.Entries))
 
-		if resp := ps.raftServer.AppendEntries(aereq); resp != nil {
-			w.WriteHeader(http.StatusOK)
-			json.NewEncoder(w).Encode(resp)
-			if !resp.Success {
-				log.Debugf("[Append Entry] Step back")
-			}
-			return
-		}
+	ps.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
+
+	resp := ps.raftServer.AppendEntries(aereq)
+
+	if resp == nil {
+		log.Warn("[ae] Error: nil response")
+		http.Error(w, "", http.StatusInternalServerError)
+		return
+	}
+
+	if !resp.Success {
+		log.Debugf("[Append Entry] Step back")
+	}
+
+	if _, err := resp.Encode(w); err != nil {
+		log.Warn("[ae] Error: %v", err)
+		http.Error(w, "", http.StatusInternalServerError)
+		return
 	}
-	log.Warnf("[Append Entry] ERROR: %v", err)
-	w.WriteHeader(http.StatusInternalServerError)
 }
 
 // Response to recover from snapshot request
 func (ps *PeerServer) SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
-	aereq := &raft.SnapshotRequest{}
-	err := decodeJsonRequest(req, aereq)
-	if err == nil {
-		log.Debugf("[recv] POST %s/snapshot/ ", ps.url)
-		if resp := ps.raftServer.RequestSnapshot(aereq); resp != nil {
-			w.WriteHeader(http.StatusOK)
-			json.NewEncoder(w).Encode(resp)
-			return
-		}
+	ssreq := &raft.SnapshotRequest{}
+
+	if _, err := ssreq.Decode(req.Body); err != nil {
+		http.Error(w, "", http.StatusBadRequest)
+		log.Warnf("[recv] BADREQUEST %s/snapshot [%v]", ps.url, err)
+		return
+	}
+
+	log.Debugf("[recv] POST %s/snapshot", ps.url)
+
+	resp := ps.raftServer.RequestSnapshot(ssreq)
+
+	if resp == nil {
+		log.Warn("[ss] Error: nil response")
+		http.Error(w, "", http.StatusInternalServerError)
+		return
+	}
+
+	if _, err := resp.Encode(w); err != nil {
+		log.Warn("[ss] Error: %v", err)
+		http.Error(w, "", http.StatusInternalServerError)
+		return
 	}
-	log.Warnf("[Snapshot] ERROR: %v", err)
-	w.WriteHeader(http.StatusInternalServerError)
 }
 
 // Response to recover from snapshot request
 func (ps *PeerServer) SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
-	aereq := &raft.SnapshotRecoveryRequest{}
-	err := decodeJsonRequest(req, aereq)
-	if err == nil {
-		log.Debugf("[recv] POST %s/snapshotRecovery/ ", ps.url)
-		if resp := ps.raftServer.SnapshotRecoveryRequest(aereq); resp != nil {
-			w.WriteHeader(http.StatusOK)
-			json.NewEncoder(w).Encode(resp)
-			return
-		}
+	ssrreq := &raft.SnapshotRecoveryRequest{}
+
+	if _, err := ssrreq.Decode(req.Body); err != nil {
+		http.Error(w, "", http.StatusBadRequest)
+		log.Warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", ps.url, err)
+		return
+	}
+
+	log.Debugf("[recv] POST %s/snapshotRecovery", ps.url)
+
+	resp := ps.raftServer.SnapshotRecoveryRequest(ssrreq)
+
+	if resp == nil {
+		log.Warn("[ssr] Error: nil response")
+		http.Error(w, "", http.StatusInternalServerError)
+		return
+	}
+
+	if _, err := resp.Encode(w); err != nil {
+		log.Warn("[ssr] Error: %v", err)
+		http.Error(w, "", http.StatusInternalServerError)
+		return
 	}
-	log.Warnf("[Snapshot] ERROR: %v", err)
-	w.WriteHeader(http.StatusInternalServerError)
 }
 
 // Get the port that listening for etcd connecting of the server

+ 1 - 0
server/server.go

@@ -116,6 +116,7 @@ func (s *Server) installV2() {
 	s.handleFunc("/v2/stats/self", s.GetStatsHandler).Methods("GET")
 	s.handleFunc("/v2/stats/leader", s.GetLeaderStatsHandler).Methods("GET")
 	s.handleFunc("/v2/stats/store", s.GetStoreStatsHandler).Methods("GET")
+	s.handleFunc("/v2/speedTest", s.SpeedTestHandler).Methods("GET")
 }
 
 // Adds a v1 server handler to the router.

+ 50 - 35
server/transporter.go

@@ -3,7 +3,6 @@ package server
 import (
 	"bytes"
 	"crypto/tls"
-	"encoding/json"
 	"fmt"
 	"io"
 	"net"
@@ -65,10 +64,12 @@ func dialWithTimeout(network, addr string) (net.Conn, error) {
 
 // Sends AppendEntries RPCs to a peer when the server is the leader.
 func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
-	var aersp *raft.AppendEntriesResponse
 	var b bytes.Buffer
 
-	json.NewEncoder(&b).Encode(req)
+	if _, err := req.Encode(&b); err != nil {
+		log.Warn("transporter.ae.encoding.error:", err)
+		return nil
+	}
 
 	size := b.Len()
 
@@ -97,6 +98,7 @@ func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Pe
 		if ok {
 			thisFollowerStats.Fail()
 		}
+		return nil
 	} else {
 		if ok {
 			thisFollowerStats.Succ(end.Sub(start))
@@ -108,21 +110,25 @@ func (t *transporter) SendAppendEntriesRequest(server raft.Server, peer *raft.Pe
 
 		t.CancelWhenTimeout(httpRequest)
 
-		aersp = &raft.AppendEntriesResponse{}
-		if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
-			return aersp
+		aeresp := &raft.AppendEntriesResponse{}
+		if _, err = aeresp.Decode(resp.Body); err != nil && err != io.EOF {
+			log.Warn("transporter.ae.decoding.error:", err)
+			return nil
 		}
-
+		return aeresp
 	}
 
-	return aersp
+	return nil
 }
 
 // Sends RequestVote RPCs to a peer when the server is the candidate.
 func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
-	var rvrsp *raft.RequestVoteResponse
 	var b bytes.Buffer
-	json.NewEncoder(&b).Encode(req)
+
+	if _, err := req.Encode(&b); err != nil {
+		log.Warn("transporter.vr.encoding.error:", err)
+		return nil
+	}
 
 	u, _ := t.peerServer.registry.PeerURL(peer.Name)
 	log.Debugf("Send Vote from %s to %s", server.Name(), u)
@@ -139,28 +145,31 @@ func (t *transporter) SendVoteRequest(server raft.Server, peer *raft.Peer, req *
 		t.CancelWhenTimeout(httpRequest)
 
 		rvrsp := &raft.RequestVoteResponse{}
-		if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
-			return rvrsp
+		if _, err = rvrsp.Decode(resp.Body); err != nil && err != io.EOF {
+			log.Warn("transporter.vr.decoding.error:", err)
+			return nil
 		}
-
+		return rvrsp
 	}
-	return rvrsp
+	return nil
 }
 
 // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
 func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
-	var aersp *raft.SnapshotResponse
 	var b bytes.Buffer
-	json.NewEncoder(&b).Encode(req)
+
+	if _, err := req.Encode(&b); err != nil {
+		log.Warn("transporter.ss.encoding.error:", err)
+		return nil
+	}
 
 	u, _ := t.peerServer.registry.PeerURL(peer.Name)
-	log.Debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
-		req.LastTerm, req.LastIndex)
+	log.Debugf("Send Snapshot Request from %s to %s", server.Name(), u)
 
 	resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
 
 	if err != nil {
-		log.Debugf("Cannot send SendSnapshotRequest to %s : %s", u, err)
+		log.Debugf("Cannot send Snapshot Request to %s : %s", u, err)
 	}
 
 	if resp != nil {
@@ -168,42 +177,48 @@ func (t *transporter) SendSnapshotRequest(server raft.Server, peer *raft.Peer, r
 
 		t.CancelWhenTimeout(httpRequest)
 
-		aersp = &raft.SnapshotResponse{}
-		if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
-
-			return aersp
+		ssrsp := &raft.SnapshotResponse{}
+		if _, err = ssrsp.Decode(resp.Body); err != nil && err != io.EOF {
+			log.Warn("transporter.ss.decoding.error:", err)
+			return nil
 		}
+		return ssrsp
 	}
-
-	return aersp
+	return nil
 }
 
 // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
 func (t *transporter) SendSnapshotRecoveryRequest(server raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
-	var aersp *raft.SnapshotRecoveryResponse
 	var b bytes.Buffer
-	json.NewEncoder(&b).Encode(req)
+
+	if _, err := req.Encode(&b); err != nil {
+		log.Warn("transporter.ss.encoding.error:", err)
+		return nil
+	}
 
 	u, _ := t.peerServer.registry.PeerURL(peer.Name)
-	log.Debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
-		req.LastTerm, req.LastIndex)
+	log.Debugf("Send Snapshot Recovery from %s to %s", server.Name(), u)
 
-	resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
+	resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
 
 	if err != nil {
-		log.Debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err)
+		log.Debugf("Cannot send Snapshot Recovery to %s : %s", u, err)
 	}
 
 	if resp != nil {
 		defer resp.Body.Close()
-		aersp = &raft.SnapshotRecoveryResponse{}
 
-		if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
-			return aersp
+		t.CancelWhenTimeout(httpRequest)
+
+		ssrrsp := &raft.SnapshotRecoveryResponse{}
+		if _, err = ssrrsp.Decode(resp.Body); err != nil && err != io.EOF {
+			log.Warn("transporter.ssr.decoding.error:", err)
+			return nil
 		}
+		return ssrrsp
 	}
+	return nil
 
-	return aersp
 }
 
 // Send server side POST request