Browse Source

clean up transporter.go

Xiang Li 12 years ago
parent
commit
5749b9db50
5 changed files with 186 additions and 174 deletions
  1. 37 36
      etcd.go
  2. 31 27
      handlers.go
  3. 0 79
      trans_handler.go
  4. 118 0
      transporter.go
  5. 0 32
      util.go

+ 37 - 36
etcd.go

@@ -111,8 +111,8 @@ type Info struct {
 //
 //
 //------------------------------------------------------------------------------
 //------------------------------------------------------------------------------
 
 
-var server *raft.Server
-var serverTransHandler transHandler
+var raftServer *raft.Server
+var raftTransporter transporter
 var etcdStore *store.Store
 var etcdStore *store.Store
 
 
 //------------------------------------------------------------------------------
 //------------------------------------------------------------------------------
@@ -156,66 +156,67 @@ func main() {
 		panic("ERROR type")
 		panic("ERROR type")
 	}
 	}
 
 
-	serverTransHandler = createTranHandler(st)
+	raftTransporter = createTransporter(st)
 
 
 	// Setup new raft server.
 	// Setup new raft server.
 	etcdStore = store.CreateStore(maxSize)
 	etcdStore = store.CreateStore(maxSize)
 
 
 	// create raft server
 	// create raft server
-	server, err = raft.NewServer(name, dirPath, serverTransHandler, etcdStore, nil)
+	raftServer, err = raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil)
 
 
 	if err != nil {
 	if err != nil {
 		fatal("%v", err)
 		fatal("%v", err)
 	}
 	}
 
 
-	err = server.LoadSnapshot()
+	err = raftServer.LoadSnapshot()
 
 
 	if err == nil {
 	if err == nil {
-		debug("%s finished load snapshot", server.Name())
+		debug("%s finished load snapshot", raftServer.Name())
 	} else {
 	} else {
 		fmt.Println(err)
 		fmt.Println(err)
-		debug("%s bad snapshot", server.Name())
+		debug("%s bad snapshot", raftServer.Name())
 	}
 	}
-	server.Initialize()
-	debug("%s finished init", server.Name())
-	server.SetElectionTimeout(ELECTIONTIMTOUT)
-	server.SetHeartbeatTimeout(HEARTBEATTIMEOUT)
-	debug("%s finished set timeout", server.Name())
+	
+	raftServer.Initialize()
+	debug("%s finished init", raftServer.Name())
+	raftServer.SetElectionTimeout(ELECTIONTIMTOUT)
+	raftServer.SetHeartbeatTimeout(HEARTBEATTIMEOUT)
+	debug("%s finished set timeout", raftServer.Name())
 
 
-	if server.IsLogEmpty() {
+	if raftServer.IsLogEmpty() {
 
 
 		// start as a leader in a new cluster
 		// start as a leader in a new cluster
 		if cluster == "" {
 		if cluster == "" {
-			server.StartLeader()
+			raftServer.StartLeader()
 
 
 			time.Sleep(time.Millisecond * 20)
 			time.Sleep(time.Millisecond * 20)
 
 
-			// join self as a peer
+			// leader need to join self as a peer
 			for {
 			for {
 				command := &JoinCommand{}
 				command := &JoinCommand{}
-				command.Name = server.Name()
-				_, err := server.Do(command)
+				command.Name = raftServer.Name()
+				_, err := raftServer.Do(command)
 				if err == nil {
 				if err == nil {
 					break
 					break
 				}
 				}
 			}
 			}
-			debug("%s start as a leader", server.Name())
+			debug("%s start as a leader", raftServer.Name())
 
 
-			// start as a fellower in a existing cluster
+			// start as a follower in a existing cluster
 		} else {
 		} else {
-			server.StartFollower()
+			raftServer.StartFollower()
 
 
-			err := Join(server, cluster)
+			err := Join(raftServer, cluster)
 			if err != nil {
 			if err != nil {
 				panic(err)
 				panic(err)
 			}
 			}
-			fmt.Println("success join")
+			debug("%s success join to the cluster", raftServer.Name())
 		}
 		}
 
 
-		// rejoin the previous cluster
 	} else {
 	} else {
-		server.StartFollower()
-		debug("%s start as a follower", server.Name())
+		// rejoin the previous cluster
+		raftServer.StartFollower()
+		debug("%s restart as a follower", raftServer.Name())
 	}
 	}
 
 
 	// open the snapshot
 	// open the snapshot
@@ -225,7 +226,7 @@ func main() {
 		// start web
 		// start web
 		etcdStore.SetMessager(&storeMsg)
 		etcdStore.SetMessager(&storeMsg)
 		go webHelper()
 		go webHelper()
-		go web.Start(server, webPort)
+		go web.Start(raftServer, webPort)
 	}
 	}
 
 
 	go startServTransport(info.ServerPort, st)
 	go startServTransport(info.ServerPort, st)
@@ -237,12 +238,11 @@ func usage() {
 	fatal("usage: raftd [PATH]")
 	fatal("usage: raftd [PATH]")
 }
 }
 
 
-func createTranHandler(st int) transHandler {
-	t := transHandler{}
+func createTransporter(st int) transporter {
+	t := transporter{}
 
 
 	switch st {
 	switch st {
 	case HTTP:
 	case HTTP:
-		t := transHandler{}
 		t.client = nil
 		t.client = nil
 		return t
 		return t
 
 
@@ -268,7 +268,7 @@ func createTranHandler(st int) transHandler {
 	}
 	}
 
 
 	// for complier
 	// for complier
-	return transHandler{}
+	return transporter{}
 }
 }
 
 
 func startServTransport(port int, st int) {
 func startServTransport(port int, st int) {
@@ -284,11 +284,12 @@ func startServTransport(port int, st int) {
 	switch st {
 	switch st {
 
 
 	case HTTP:
 	case HTTP:
-		debug("raft server [%s] listen on http", server.Name())
+		debug("raft server [%s] listen on http port %v", address, port)
 		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
 		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
 
 
 	case HTTPS:
 	case HTTPS:
-		http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil)
+		debug("raft server [%s] listen on https port %v", address, port)
+		log.Fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil))
 
 
 	case HTTPSANDVERIFY:
 	case HTTPSANDVERIFY:
 		pemByte, _ := ioutil.ReadFile(serverCAFile)
 		pemByte, _ := ioutil.ReadFile(serverCAFile)
@@ -332,7 +333,7 @@ func startClientTransport(port int, st int) {
 	switch st {
 	switch st {
 
 
 	case HTTP:
 	case HTTP:
-		debug("etcd [%s] listen on http", server.Name())
+		debug("etcd [%s] listen on http port %v", address, clientPort)
 		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
 		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
 
 
 	case HTTPS:
 	case HTTPS:
@@ -472,9 +473,9 @@ func Join(s *raft.Server, serverName string) error {
 	json.NewEncoder(&b).Encode(command)
 	json.NewEncoder(&b).Encode(command)
 
 
 	// t must be ok
 	// t must be ok
-	t, _ := server.Transporter().(transHandler)
+	t, _ := raftServer.Transporter().(transporter)
 	debug("Send Join Request to %s", serverName)
 	debug("Send Join Request to %s", serverName)
-	resp, err := Post(&t, fmt.Sprintf("%s/join", serverName), &b)
+	resp, err := t.Post(fmt.Sprintf("%s/join", serverName), &b)
 
 
 	for {
 	for {
 		if resp != nil {
 		if resp != nil {
@@ -490,7 +491,7 @@ func Join(s *raft.Server, serverName string) error {
 				debug("Leader is %s", address)
 				debug("Leader is %s", address)
 				debug("Send Join Request to %s", address)
 				debug("Send Join Request to %s", address)
 				json.NewEncoder(&b).Encode(command)
 				json.NewEncoder(&b).Encode(command)
-				resp, err = Post(&t, fmt.Sprintf("%s/join", address), &b)
+				resp, err = t.Post(fmt.Sprintf("%s/join", address), &b)
 			}
 			}
 		}
 		}
 	}
 	}

+ 31 - 27
handlers.go

@@ -14,18 +14,18 @@ import (
 
 
 // Get all the current logs
 // Get all the current logs
 func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
 func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debug("[recv] GET http://%v/log", server.Name())
+	debug("[recv] GET http://%v/log", raftServer.Name())
 	w.Header().Set("Content-Type", "application/json")
 	w.Header().Set("Content-Type", "application/json")
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
-	json.NewEncoder(w).Encode(server.LogEntries())
+	json.NewEncoder(w).Encode(raftServer.LogEntries())
 }
 }
 
 
 func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
 func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
 	rvreq := &raft.RequestVoteRequest{}
 	rvreq := &raft.RequestVoteRequest{}
 	err := decodeJsonRequest(req, rvreq)
 	err := decodeJsonRequest(req, rvreq)
 	if err == nil {
 	if err == nil {
-		debug("[recv] POST http://%v/vote [%s]", server.Name(), rvreq.CandidateName)
-		if resp := server.RequestVote(rvreq); resp != nil {
+		debug("[recv] POST http://%v/vote [%s]", raftServer.Name(), rvreq.CandidateName)
+		if resp := raftServer.RequestVote(rvreq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			json.NewEncoder(w).Encode(resp)
 			return
 			return
@@ -40,8 +40,8 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
 	err := decodeJsonRequest(req, aereq)
 	err := decodeJsonRequest(req, aereq)
 	
 	
 	if err == nil {
 	if err == nil {
-		debug("[recv] POST http://%s/log/append [%d]", server.Name(), len(aereq.Entries))
-		if resp := server.AppendEntries(aereq); resp != nil {
+		debug("[recv] POST http://%s/log/append [%d]", raftServer.Name(), len(aereq.Entries))
+		if resp := raftServer.AppendEntries(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			json.NewEncoder(w).Encode(resp)
 			if !resp.Success {
 			if !resp.Success {
@@ -50,7 +50,7 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
 			return
 			return
 		}
 		}
 	}
 	}
-	warn("[append] ERROR: %v", err)
+	warn("[Append Entry] ERROR: %v", err)
 	w.WriteHeader(http.StatusInternalServerError)
 	w.WriteHeader(http.StatusInternalServerError)
 }
 }
 
 
@@ -58,24 +58,26 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
 	aereq := &raft.SnapshotRequest{}
 	aereq := &raft.SnapshotRequest{}
 	err := decodeJsonRequest(req, aereq)
 	err := decodeJsonRequest(req, aereq)
 	if err == nil {
 	if err == nil {
-		debug("[recv] POST http://%s/snapshot/ ", server.Name())
-		if resp, _ := server.SnapshotRecovery(aereq); resp != nil {
+		debug("[recv] POST http://%s/snapshot/ ", raftServer.Name())
+		if resp, _ := raftServer.SnapshotRecovery(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			json.NewEncoder(w).Encode(resp)
 			return
 			return
 		}
 		}
 	}
 	}
-	warn("[snapshot] ERROR: %v", err)
+	warn("[Snapshot] ERROR: %v", err)
 	w.WriteHeader(http.StatusInternalServerError)
 	w.WriteHeader(http.StatusInternalServerError)
 }
 }
 
 
+// Get the port that listening for client connecting of the server
 func clientHttpHandler(w http.ResponseWriter, req *http.Request) {
 func clientHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debug("[recv] Get http://%v/client/ ", server.Name())
+	debug("[recv] Get http://%v/client/ ", raftServer.Name())
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
 	client := address + ":" + strconv.Itoa(clientPort)
 	client := address + ":" + strconv.Itoa(clientPort)
 	w.Write([]byte(client))
 	w.Write([]byte(client))
 }
 }
 
 
+// 
 func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
 func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 
 	command := &JoinCommand{}
 	command := &JoinCommand{}
@@ -93,6 +95,7 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
 // external HTTP Handlers via client port
 // external HTTP Handlers via client port
 //--------------------------------------
 //--------------------------------------
 
 
+// Dispatch GET/POST/DELETE request to corresponding handlers
 func Multiplexer(w http.ResponseWriter, req *http.Request) {
 func Multiplexer(w http.ResponseWriter, req *http.Request) {
 
 
 	if req.Method == "GET" {
 	if req.Method == "GET" {
@@ -107,10 +110,11 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) {
 	}
 	}
 }
 }
 
 
+// Set Command Handler
 func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]
 	key := req.URL.Path[len("/v1/keys/"):]
 
 
-	debug("[recv] POST http://%v/v1/keys/%s", server.Name(), key)
+	debug("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key)
 
 
 	command := &SetCommand{}
 	command := &SetCommand{}
 	command.Key = key
 	command.Key = key
@@ -138,7 +142,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
 func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/testAndSet/"):]
 	key := req.URL.Path[len("/v1/testAndSet/"):]
 
 
-	debug("[recv] POST http://%v/v1/testAndSet/%s", server.Name(), key)
+	debug("[recv] POST http://%v/v1/testAndSet/%s", raftServer.Name(), key)
 
 
 	command := &TestAndSetCommand{}
 	command := &TestAndSetCommand{}
 	command.Key = key
 	command.Key = key
@@ -167,7 +171,7 @@ func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
 func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]
 	key := req.URL.Path[len("/v1/keys/"):]
 
 
-	debug("[recv] DELETE http://%v/v1/keys/%s", server.Name(), key)
+	debug("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key)
 
 
 	command := &DeleteCommand{}
 	command := &DeleteCommand{}
 	command.Key = key
 	command.Key = key
@@ -176,8 +180,8 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 }
 }
 
 
 func excute(c Command, w *http.ResponseWriter, req *http.Request) {
 func excute(c Command, w *http.ResponseWriter, req *http.Request) {
-	if server.State() == "leader" {
-		if body, err := server.Do(c); err != nil {
+	if raftServer.State() == "leader" {
+		if body, err := raftServer.Do(c); err != nil {
 			warn("Commit failed %v", err)
 			warn("Commit failed %v", err)
 			(*w).WriteHeader(http.StatusInternalServerError)
 			(*w).WriteHeader(http.StatusInternalServerError)
 			return
 			return
@@ -198,13 +202,13 @@ func excute(c Command, w *http.ResponseWriter, req *http.Request) {
 		}
 		}
 	} else {
 	} else {
 		// current no leader
 		// current no leader
-		if server.Leader() == "" {
+		if raftServer.Leader() == "" {
 			(*w).WriteHeader(http.StatusInternalServerError)
 			(*w).WriteHeader(http.StatusInternalServerError)
 			return
 			return
 		}
 		}
 
 
 		// tell the client where is the leader
 		// tell the client where is the leader
-		debug("Redirect to the leader %s", server.Leader())
+		debug("Redirect to the leader %s", raftServer.Leader())
 
 
 		path := req.URL.Path
 		path := req.URL.Path
 
 
@@ -214,7 +218,7 @@ func excute(c Command, w *http.ResponseWriter, req *http.Request) {
 			scheme = "http://"
 			scheme = "http://"
 		}
 		}
 
 
-		url := scheme + leaderClient() + path
+		url := scheme + raftTransporter.GetLeaderClientAddress() + path
 
 
 		debug("redirect to %s", url)
 		debug("redirect to %s", url)
 
 
@@ -229,18 +233,18 @@ func excute(c Command, w *http.ResponseWriter, req *http.Request) {
 
 
 func MasterHttpHandler(w http.ResponseWriter, req *http.Request) {
 func MasterHttpHandler(w http.ResponseWriter, req *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
-	w.Write([]byte(server.Leader()))
+	w.Write([]byte(raftServer.Leader()))
 }
 }
 
 
 func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]
 	key := req.URL.Path[len("/v1/keys/"):]
 
 
-	debug("[recv] GET http://%v/v1/keys/%s", server.Name(), key)
+	debug("[recv] GET http://%v/v1/keys/%s", raftServer.Name(), key)
 
 
 	command := &GetCommand{}
 	command := &GetCommand{}
 	command.Key = key
 	command.Key = key
 
 
-	if body, err := command.Apply(server); err != nil {
+	if body, err := command.Apply(raftServer); err != nil {
 		warn("raftd: Unable to write file: %v", err)
 		warn("raftd: Unable to write file: %v", err)
 		(*w).WriteHeader(http.StatusInternalServerError)
 		(*w).WriteHeader(http.StatusInternalServerError)
 		return
 		return
@@ -261,12 +265,12 @@ func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 func ListHttpHandler(w http.ResponseWriter, req *http.Request) {
 func ListHttpHandler(w http.ResponseWriter, req *http.Request) {
 	prefix := req.URL.Path[len("/v1/list/"):]
 	prefix := req.URL.Path[len("/v1/list/"):]
 
 
-	debug("[recv] GET http://%v/v1/list/%s", server.Name(), prefix)
+	debug("[recv] GET http://%v/v1/list/%s", raftServer.Name(), prefix)
 
 
 	command := &ListCommand{}
 	command := &ListCommand{}
 	command.Prefix = prefix
 	command.Prefix = prefix
 
 
-	if body, err := command.Apply(server); err != nil {
+	if body, err := command.Apply(raftServer); err != nil {
 		warn("Unable to write file: %v", err)
 		warn("Unable to write file: %v", err)
 		w.WriteHeader(http.StatusInternalServerError)
 		w.WriteHeader(http.StatusInternalServerError)
 		return
 		return
@@ -291,11 +295,11 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 	command.Key = key
 	command.Key = key
 
 
 	if req.Method == "GET" {
 	if req.Method == "GET" {
-		debug("[recv] GET http://%v/watch/%s", server.Name(), key)
+		debug("[recv] GET http://%v/watch/%s", raftServer.Name(), key)
 		command.SinceIndex = 0
 		command.SinceIndex = 0
 
 
 	} else if req.Method == "POST" {
 	} else if req.Method == "POST" {
-		debug("[recv] POST http://%v/watch/%s", server.Name(), key)
+		debug("[recv] POST http://%v/watch/%s", raftServer.Name(), key)
 		content := req.FormValue("index")
 		content := req.FormValue("index")
 
 
 		sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
 		sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
@@ -309,7 +313,7 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 		return
 		return
 	}
 	}
 
 
-	if body, err := command.Apply(server); err != nil {
+	if body, err := command.Apply(raftServer); err != nil {
 		warn("Unable to write file: %v", err)
 		warn("Unable to write file: %v", err)
 		w.WriteHeader(http.StatusInternalServerError)
 		w.WriteHeader(http.StatusInternalServerError)
 		return
 		return

+ 0 - 79
trans_handler.go

@@ -1,79 +0,0 @@
-package main
-
-import (
-	"bytes"
-	"encoding/json"
-	"fmt"
-	"github.com/coreos/go-raft"
-	"io"
-	"net/http"
-)
-
-type transHandler struct {
-	name   string
-	client *http.Client
-}
-
-// Sends AppendEntries RPCs to a peer when the server is the leader.
-func (t transHandler) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
-	var aersp *raft.AppendEntriesResponse
-	var b bytes.Buffer
-	json.NewEncoder(&b).Encode(req)
-
-	debug("Send LogEntries to %s ", peer.Name())
-
-	resp, _ := Post(&t, fmt.Sprintf("%s/log/append", peer.Name()), &b)
-
-	if resp != nil {
-		defer resp.Body.Close()
-		aersp = &raft.AppendEntriesResponse{}
-		if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
-			return aersp
-		}
-
-	}
-	return aersp
-}
-
-// Sends RequestVote RPCs to a peer when the server is the candidate.
-func (t transHandler) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
-	var rvrsp *raft.RequestVoteResponse
-	var b bytes.Buffer
-	json.NewEncoder(&b).Encode(req)
-
-	debug("Send Vote to %s", peer.Name())
-
-	resp, _ := Post(&t, fmt.Sprintf("%s/vote", peer.Name()), &b)
-
-	if resp != nil {
-		defer resp.Body.Close()
-		rvrsp := &raft.RequestVoteResponse{}
-		if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
-			return rvrsp
-		}
-
-	}
-	return rvrsp
-}
-
-// Sends SnapshotRequest RPCs to a peer when the server is the candidate.
-func (t transHandler) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
-	var aersp *raft.SnapshotResponse
-	var b bytes.Buffer
-	json.NewEncoder(&b).Encode(req)
-
-	debug("Send Snapshot to %s [Last Term: %d, LastIndex %d]", peer.Name(),
-		req.LastTerm, req.LastIndex)
-
-	resp, err := Post(&t, fmt.Sprintf("%s/snapshot", peer.Name()), &b)
-
-	if resp != nil {
-		defer resp.Body.Close()
-		aersp = &raft.SnapshotResponse{}
-		if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
-
-			return aersp
-		}
-	}
-	return aersp
-}

+ 118 - 0
transporter.go

@@ -0,0 +1,118 @@
+package main
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+	"github.com/coreos/go-raft"
+	"io"
+	"net/http"
+)
+
+// Transporter layer for communication between raft nodes
+type transporter struct {
+	name   string
+	// If https is used for server internal communcation,
+	// we will have a http client. Or it will be nil.
+	client *http.Client
+}
+
+// Sends AppendEntries RPCs to a peer when the server is the leader.
+func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
+	var aersp *raft.AppendEntriesResponse
+	var b bytes.Buffer
+	json.NewEncoder(&b).Encode(req)
+
+	debug("Send LogEntries to %s ", peer.Name())
+
+	resp, _ := t.Post(fmt.Sprintf("%s/log/append", peer.Name()), &b)
+
+	if resp != nil {
+		defer resp.Body.Close()
+		aersp = &raft.AppendEntriesResponse{}
+		if err := json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
+			return aersp
+		}
+
+	}
+	return aersp
+}
+
+// Sends RequestVote RPCs to a peer when the server is the candidate.
+func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *raft.RequestVoteRequest) *raft.RequestVoteResponse {
+	var rvrsp *raft.RequestVoteResponse
+	var b bytes.Buffer
+	json.NewEncoder(&b).Encode(req)
+
+	debug("Send Vote to %s", peer.Name())
+
+	resp, _ := t.Post(fmt.Sprintf("%s/vote", peer.Name()), &b)
+
+	if resp != nil {
+		defer resp.Body.Close()
+		rvrsp := &raft.RequestVoteResponse{}
+		if err := json.NewDecoder(resp.Body).Decode(&rvrsp); err == nil || err == io.EOF {
+			return rvrsp
+		}
+
+	}
+	return rvrsp
+}
+
+// Sends SnapshotRequest RPCs to a peer when the server is the candidate.
+func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, req *raft.SnapshotRequest) *raft.SnapshotResponse {
+	var aersp *raft.SnapshotResponse
+	var b bytes.Buffer
+	json.NewEncoder(&b).Encode(req)
+
+	debug("Send Snapshot to %s [Last Term: %d, LastIndex %d]", peer.Name(),
+		req.LastTerm, req.LastIndex)
+
+	resp, err := t.Post(fmt.Sprintf("%s/snapshot", peer.Name()), &b)
+
+	if resp != nil {
+		defer resp.Body.Close()
+		aersp = &raft.SnapshotResponse{}
+		if err = json.NewDecoder(resp.Body).Decode(&aersp); err == nil || err == io.EOF {
+
+			return aersp
+		}
+	}
+	return aersp
+}
+
+// Get the the client address of the leader in the cluster
+func (t transporter) GetLeaderClientAddress() string {
+	resp, _ := t.Get(raftServer.Leader()+"/client")
+	if resp != nil {
+		body, _ := ioutil.ReadAll(resp.Body)
+		resp.Body.Close()
+		return string(body)
+	}
+	return ""
+}
+
+// Send server side POST request
+func (t transporter) Post(path string, body io.Reader) (*http.Response, error) {
+
+	if t.client != nil {
+		resp, err := t.client.Post("https://"+path, "application/json", body)
+		return resp, err
+	} else {
+		resp, err := http.Post("http://"+path, "application/json", body)
+		return resp, err
+	}
+}
+
+
+// Send server side GET request
+func (t transporter) Get(path string) (*http.Response, error) {
+	if t.client != nil {
+		resp, err := t.client.Get("https://" + path)
+		return resp, err
+	} else {
+		resp, err := http.Get("http://" + path)
+		return resp, err
+	}
+}

+ 0 - 32
util.go

@@ -5,7 +5,6 @@ import (
 	"fmt"
 	"fmt"
 	"github.com/coreos/etcd/web"
 	"github.com/coreos/etcd/web"
 	"io"
 	"io"
-	"io/ioutil"
 	"log"
 	"log"
 	"net/http"
 	"net/http"
 	"os"
 	"os"
@@ -48,37 +47,6 @@ func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) {
 	}
 	}
 }
 }
 
 
-func Post(t *transHandler, path string, body io.Reader) (*http.Response, error) {
-
-	if t.client != nil {
-		resp, err := t.client.Post("https://"+path, "application/json", body)
-		return resp, err
-	} else {
-		resp, err := http.Post("http://"+path, "application/json", body)
-		return resp, err
-	}
-}
-
-func Get(t *transHandler, path string) (*http.Response, error) {
-	if t.client != nil {
-		resp, err := t.client.Get("https://" + path)
-		return resp, err
-	} else {
-		resp, err := http.Get("http://" + path)
-		return resp, err
-	}
-}
-
-func leaderClient() string {
-	resp, _ := Get(&serverTransHandler, server.Leader()+"/client")
-	if resp != nil {
-		body, _ := ioutil.ReadAll(resp.Body)
-		resp.Body.Close()
-		return string(body)
-	}
-	return ""
-}
-
 //--------------------------------------
 //--------------------------------------
 // Log
 // Log
 //--------------------------------------
 //--------------------------------------