Browse Source

feat use protobuf

Xiang Li 12 years ago
parent
commit
63768e3742
2 changed files with 145 additions and 94 deletions
  1. 92 49
      raft_handlers.go
  2. 53 45
      transporter.go

+ 92 - 49
raft_handlers.go

@@ -38,72 +38,115 @@ func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
 // Response to vote request
 func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
 	rvreq := &raft.RequestVoteRequest{}
-	err := decodeJsonRequest(req, rvreq)
-	if err == nil {
-		debugf("[recv] POST %s/vote [%s]", r.url, rvreq.CandidateName)
-		if resp := r.RequestVote(rvreq); resp != nil {
-			w.WriteHeader(http.StatusOK)
-			json.NewEncoder(w).Encode(resp)
-			return
-		}
-	}
-	warnf("[vote] ERROR: %v", err)
-	w.WriteHeader(http.StatusInternalServerError)
+
+	if _, err := rvreq.Decode(req.Body); err != nil {
+		http.Error(w, "", http.StatusBadRequest)
+		warnf("[recv] BADREQUEST %s/vote [%v]", r.url, err)
+		return
+	}
+
+	debugf("[recv] POST %s/vote [%s]", r.url, rvreq.CandidateName)
+
+	resp := r.RequestVote(rvreq)
+
+	if resp == nil {
+		warn("[vote] Error: nil response")
+		http.Error(w, "", http.StatusInternalServerError)
+		return
+	}
+
+	if _, err := resp.Encode(w); err != nil {
+		warn("[vote] Error: %v", err)
+		http.Error(w, "", http.StatusInternalServerError)
+		return
+	}
 }
 
 // Response to append entries request
 func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
 	aereq := &raft.AppendEntriesRequest{}
-	err := decodeJsonRequest(req, aereq)
 
-	if err == nil {
-		debugf("[recv] POST %s/log/append [%d]", r.url, len(aereq.Entries))
+	if _, err := aereq.Decode(req.Body); err != nil {
+		http.Error(w, "", http.StatusBadRequest)
+		warnf("[recv] BADREQUEST %s/log/append [%v]", r.url, err)
+		return
+	}
+
+	debugf("[recv] POST %s/log/append [%d]", r.url, len(aereq.Entries))
 
-		r.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
+	r.serverStats.RecvAppendReq(aereq.LeaderName, int(req.ContentLength))
 
-		if resp := r.AppendEntries(aereq); resp != nil {
-			w.WriteHeader(http.StatusOK)
-			json.NewEncoder(w).Encode(resp)
-			if !resp.Success {
-				debugf("[Append Entry] Step back")
-			}
-			return
-		}
+	resp := r.AppendEntries(aereq)
+
+	if resp == nil {
+		warn("[ae] Error: nil response")
+		http.Error(w, "", http.StatusInternalServerError)
+		return
+	}
+
+	if !resp.Success {
+		debugf("[Append Entry] Step back")
+	}
+
+	if _, err := resp.Encode(w); err != nil {
+		warn("[ae] Error: %v", err)
+		http.Error(w, "", http.StatusInternalServerError)
+		return
 	}
-	warnf("[Append Entry] ERROR: %v", err)
-	w.WriteHeader(http.StatusInternalServerError)
 }
 
 // Response to recover from snapshot request
 func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
-	aereq := &raft.SnapshotRequest{}
-	err := decodeJsonRequest(req, aereq)
-	if err == nil {
-		debugf("[recv] POST %s/snapshot/ ", r.url)
-		if resp := r.RequestSnapshot(aereq); resp != nil {
-			w.WriteHeader(http.StatusOK)
-			json.NewEncoder(w).Encode(resp)
-			return
-		}
-	}
-	warnf("[Snapshot] ERROR: %v", err)
-	w.WriteHeader(http.StatusInternalServerError)
+	ssreq := &raft.SnapshotRequest{}
+
+	if _, err := ssreq.Decode(req.Body); err != nil {
+		http.Error(w, "", http.StatusBadRequest)
+		warnf("[recv] BADREQUEST %s/snapshot [%v]", r.url, err)
+		return
+	}
+
+	debugf("[recv] POST %s/snapshot", r.url)
+
+	resp := r.RequestSnapshot(ssreq)
+
+	if resp == nil {
+		warn("[ss] Error: nil response")
+		http.Error(w, "", http.StatusInternalServerError)
+		return
+	}
+
+	if _, err := resp.Encode(w); err != nil {
+		warn("[ss] Error: %v", err)
+		http.Error(w, "", http.StatusInternalServerError)
+		return
+	}
 }
 
 // Response to recover from snapshot request
 func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
-	aereq := &raft.SnapshotRecoveryRequest{}
-	err := decodeJsonRequest(req, aereq)
-	if err == nil {
-		debugf("[recv] POST %s/snapshotRecovery/ ", r.url)
-		if resp := r.SnapshotRecoveryRequest(aereq); resp != nil {
-			w.WriteHeader(http.StatusOK)
-			json.NewEncoder(w).Encode(resp)
-			return
-		}
-	}
-	warnf("[Snapshot] ERROR: %v", err)
-	w.WriteHeader(http.StatusInternalServerError)
+	ssrreq := &raft.SnapshotRecoveryRequest{}
+
+	if _, err := ssrreq.Decode(req.Body); err != nil {
+		http.Error(w, "", http.StatusBadRequest)
+		warnf("[recv] BADREQUEST %s/snapshotRecovery [%v]", r.url, err)
+		return
+	}
+
+	debugf("[recv] POST %s/snapshotRecovery", r.url)
+
+	resp := r.SnapshotRecoveryRequest(ssrreq)
+
+	if resp == nil {
+		warn("[ssr] Error: nil response")
+		http.Error(w, "", http.StatusInternalServerError)
+		return
+	}
+
+	if _, err := resp.Encode(w); err != nil {
+		warn("[ssr] Error: %v", err)
+		http.Error(w, "", http.StatusInternalServerError)
+		return
+	}
 }
 
 // Get the port that listening for etcd connecting of the server

+ 53 - 45
transporter.go

@@ -19,7 +19,6 @@ package main
 import (
 	"bytes"
 	"crypto/tls"
-	"encoding/json"
 	"fmt"
 	"io"
 	"net"
@@ -76,10 +75,12 @@ func dialWithTimeout(network, addr string) (net.Conn, error) {
 
 // Sends AppendEntries RPCs to a peer when the server is the leader.
 func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
-	var aersp *raft.AppendEntriesResponse
 	var b bytes.Buffer
 
-	json.NewEncoder(&b).Encode(req)
+	if _, err := req.Encode(&b); err != nil {
+		warn("transporter.ae.encoding.error:", err)
+		return nil
+	}
 
 	size := b.Len()
 
@@ -108,6 +109,7 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
 		if ok {
 			thisFollowerStats.Fail()
 		}
+		return nil
 	} else {
 		if ok {
 			thisFollowerStats.Succ(end.Sub(start))
@@ -119,24 +121,28 @@ func (t *transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.P
 
 		t.CancelWhenTimeout(httpRequest)
 
-		aersp = &raft.AppendEntriesResponse{}
-		if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
-			return aersp
+		aeresp := &raft.AppendEntriesResponse{}
+		if _, err = aeresp.Decode(resp.Body); err != nil && err != io.EOF {
+			warn("transporter.ae.decoding.error:", err)
+			return nil
 		}
-
+		return aeresp
 	}
 
-	return aersp
+	return nil
 }
 
 // Sends RequestVote RPCs to a peer when the server is the candidate.
 func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
-	var rvrsp *raft.RequestVoteResponse
 	var b bytes.Buffer
-	json.NewEncoder(&b).Encode(req)
+
+	if _, err := req.Encode(&b); err != nil {
+		warn("transporter.vr.encoding.error:", err)
+		return nil
+	}
 
 	u, _ := nameToRaftURL(peer.Name)
-	debugf("Send Vote to %s", u)
+	debugf("Send Vote from %s to %s", server.Name(), u)
 
 	resp, httpRequest, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
 
@@ -150,28 +156,31 @@ func (t *transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req
 		t.CancelWhenTimeout(httpRequest)
 
 		rvrsp := &raft.RequestVoteResponse{}
-		if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
-			return rvrsp
+		if _, err = rvrsp.Decode(resp.Body); err != nil && err != io.EOF {
+			warn("transporter.vr.decoding.error:", err)
+			return nil
 		}
-
+		return rvrsp
 	}
-	return rvrsp
+	return nil
 }
 
 // Sends SnapshotRequest RPCs to a peer when the server is the candidate.
 func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
-	var aersp *raft.SnapshotResponse
 	var b bytes.Buffer
-	json.NewEncoder(&b).Encode(req)
+
+	if _, err := req.Encode(&b); err != nil {
+		warn("transporter.ss.encoding.error:", err)
+		return nil
+	}
 
 	u, _ := nameToRaftURL(peer.Name)
-	debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
-		req.LastTerm, req.LastIndex)
+	debugf("Send Snapshot Request from %s to %s", server.Name(), u)
 
 	resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
 
 	if err != nil {
-		debugf("Cannot send SendSnapshotRequest to %s : %s", u, err)
+		debugf("Cannot send Snapshot Request to %s : %s", u, err)
 	}
 
 	if resp != nil {
@@ -179,69 +188,68 @@ func (t *transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer,
 
 		t.CancelWhenTimeout(httpRequest)
 
-		aersp = &raft.SnapshotResponse{}
-		if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
-
-			return aersp
+		ssrsp := &raft.SnapshotResponse{}
+		if _, err = ssrsp.Decode(resp.Body); err != nil && err != io.EOF {
+			warn("transporter.ss.decoding.error:", err)
+			return nil
 		}
+		return ssrsp
 	}
-
-	return aersp
+	return nil
 }
 
 // Sends SnapshotRecoveryRequest RPCs to a peer when the server is the candidate.
 func (t *transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRecoveryRequest) *raft.SnapshotRecoveryResponse {
-	var aersp *raft.SnapshotRecoveryResponse
 	var b bytes.Buffer
-	json.NewEncoder(&b).Encode(req)
+
+	if _, err := req.Encode(&b); err != nil {
+		warn("transporter.ss.encoding.error:", err)
+		return nil
+	}
 
 	u, _ := nameToRaftURL(peer.Name)
-	debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
-		req.LastTerm, req.LastIndex)
+	debugf("Send Snapshot Recovery from %s to %s", server.Name(), u)
 
-	resp, _, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
+	resp, httpRequest, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
 
 	if err != nil {
-		debugf("Cannot send SendSnapshotRecoveryRequest to %s : %s", u, err)
+		debugf("Cannot send Snapshot Recovery to %s : %s", u, err)
 	}
 
 	if resp != nil {
 		defer resp.Body.Close()
-		aersp = &raft.SnapshotRecoveryResponse{}
 
-		if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
-			return aersp
+		t.CancelWhenTimeout(httpRequest)
+
+		ssrrsp := &raft.SnapshotRecoveryResponse{}
+		if _, err = ssrrsp.Decode(resp.Body); err != nil && err != io.EOF {
+			warn("transporter.ssr.decoding.error:", err)
+			return nil
 		}
+		return ssrrsp
 	}
+	return nil
 
-	return aersp
 }
 
 // Send server side POST request
 func (t *transporter) Post(urlStr string, body io.Reader) (*http.Response, *http.Request, error) {
-
 	req, _ := http.NewRequest("POST", urlStr, body)
-
 	resp, err := t.client.Do(req)
-
 	return resp, req, err
-
 }
 
 // Send server side GET request
 func (t *transporter) Get(urlStr string) (*http.Response, *http.Request, error) {
-
 	req, _ := http.NewRequest("GET", urlStr, nil)
-
 	resp, err := t.client.Do(req)
-
 	return resp, req, err
 }
 
-// Cancel the on fly HTTP transaction when timeout happens
+// Cancel the on fly HTTP transaction when timeout happens.
 func (t *transporter) CancelWhenTimeout(req *http.Request) {
 	go func() {
-		time.Sleep(ElectionTimeout)
+		time.Sleep(tranTimeout)
 		t.transport.CancelRequest(req)
 	}()
 }