Browse Source

Merge pull request #9 from xiangli-cmu/master

separate raft handlers and client handlers
polvi 12 years ago
parent
commit
5fcf3cbb6e
9 changed files with 395 additions and 246 deletions
  1. 58 117
      client_handlers.go
  2. 137 99
      etcd.go
  3. 94 0
      raft_handlers.go
  4. 0 4
      util.go
  5. 3 0
      version.go
  6. 28 0
      web/file2gostring.sh
  7. 4 0
      web/index.go
  8. 70 0
      web/index.html
  9. 1 26
      web/web.go

+ 58 - 117
handlers.go → client_handlers.go

@@ -1,101 +1,16 @@
 package main
 package main
 
 
 import (
 import (
-	"encoding/json"
-	"github.com/coreos/go-raft"
 	"net/http"
 	"net/http"
 	"strconv"
 	"strconv"
 	"time"
 	"time"
 )
 )
 
 
-//--------------------------------------
-// Internal HTTP Handlers via server port
-//--------------------------------------
-
-// Get all the current logs
-func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debug("[recv] GET http://%v/log", raftServer.Name())
-	w.Header().Set("Content-Type", "application/json")
-	w.WriteHeader(http.StatusOK)
-	json.NewEncoder(w).Encode(raftServer.LogEntries())
-}
-
-func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
-	rvreq := &raft.RequestVoteRequest{}
-	err := decodeJsonRequest(req, rvreq)
-	if err == nil {
-		debug("[recv] POST http://%v/vote [%s]", raftServer.Name(), rvreq.CandidateName)
-		if resp := raftServer.RequestVote(rvreq); resp != nil {
-			w.WriteHeader(http.StatusOK)
-			json.NewEncoder(w).Encode(resp)
-			return
-		}
-	}
-	warn("[vote] ERROR: %v", err)
-	w.WriteHeader(http.StatusInternalServerError)
-}
-
-func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
-	aereq := &raft.AppendEntriesRequest{}
-	err := decodeJsonRequest(req, aereq)
-
-	if err == nil {
-		debug("[recv] POST http://%s/log/append [%d]", raftServer.Name(), len(aereq.Entries))
-		if resp := raftServer.AppendEntries(aereq); resp != nil {
-			w.WriteHeader(http.StatusOK)
-			json.NewEncoder(w).Encode(resp)
-			if !resp.Success {
-				debug("[Append Entry] Step back")
-			}
-			return
-		}
-	}
-	warn("[Append Entry] ERROR: %v", err)
-	w.WriteHeader(http.StatusInternalServerError)
-}
-
-func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
-	aereq := &raft.SnapshotRequest{}
-	err := decodeJsonRequest(req, aereq)
-	if err == nil {
-		debug("[recv] POST http://%s/snapshot/ ", raftServer.Name())
-		if resp, _ := raftServer.SnapshotRecovery(aereq); resp != nil {
-			w.WriteHeader(http.StatusOK)
-			json.NewEncoder(w).Encode(resp)
-			return
-		}
-	}
-	warn("[Snapshot] ERROR: %v", err)
-	w.WriteHeader(http.StatusInternalServerError)
-}
-
-// Get the port that listening for client connecting of the server
-func clientHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debug("[recv] Get http://%v/client/ ", raftServer.Name())
-	w.WriteHeader(http.StatusOK)
-	client := address + ":" + strconv.Itoa(clientPort)
-	w.Write([]byte(client))
-}
-
-//
-func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
-
-	command := &JoinCommand{}
-
-	if err := decodeJsonRequest(req, command); err == nil {
-		debug("Receive Join Request from %s", command.Name)
-		excute(command, &w, req)
-	} else {
-		w.WriteHeader(http.StatusInternalServerError)
-		return
-	}
-}
-
-//--------------------------------------
-// external HTTP Handlers via client port
-//--------------------------------------
+//-------------------------------------------------------------------
+// Handlers to handle etcd-store related request via raft client port
+//-------------------------------------------------------------------
 
 
-// Dispatch GET/POST/DELETE request to corresponding handlers
+// Multiplex GET/POST/DELETE request to corresponding handlers
 func Multiplexer(w http.ResponseWriter, req *http.Request) {
 func Multiplexer(w http.ResponseWriter, req *http.Request) {
 
 
 	if req.Method == "GET" {
 	if req.Method == "GET" {
@@ -110,6 +25,11 @@ func Multiplexer(w http.ResponseWriter, req *http.Request) {
 	}
 	}
 }
 }
 
 
+//--------------------------------------
+// State sensitive handlers
+// Set/Delte will dispatch to leader
+//--------------------------------------
+
 // Set Command Handler
 // Set Command Handler
 func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]
 	key := req.URL.Path[len("/v1/keys/"):]
@@ -122,23 +42,20 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	command.Value = req.FormValue("value")
 	command.Value = req.FormValue("value")
 	strDuration := req.FormValue("ttl")
 	strDuration := req.FormValue("ttl")
 
 
-	if strDuration != "" {
-		duration, err := strconv.Atoi(strDuration)
+	var err error
 
 
-		if err != nil {
-			warn("Bad duration: %v", err)
-			(*w).WriteHeader(http.StatusInternalServerError)
-			return
-		}
-		command.ExpireTime = time.Now().Add(time.Second * (time.Duration)(duration))
-	} else {
-		command.ExpireTime = time.Unix(0, 0)
+	command.ExpireTime, err = durationToExpireTime(strDuration)
+
+	if err != nil {
+		warn("The given duration is not a number: %v", err)
+		(*w).WriteHeader(http.StatusInternalServerError)
 	}
 	}
 
 
-	excute(command, w, req)
+	dispatch(command, w, req)
 
 
 }
 }
 
 
+// TestAndSet handler
 func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
 func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/testAndSet/"):]
 	key := req.URL.Path[len("/v1/testAndSet/"):]
 
 
@@ -151,23 +68,20 @@ func TestAndSetHttpHandler(w http.ResponseWriter, req *http.Request) {
 	command.Value = req.FormValue("value")
 	command.Value = req.FormValue("value")
 	strDuration := req.FormValue("ttl")
 	strDuration := req.FormValue("ttl")
 
 
-	if strDuration != "" {
-		duration, err := strconv.Atoi(strDuration)
+	var err error
 
 
-		if err != nil {
-			warn("Bad duration: %v", err)
-			w.WriteHeader(http.StatusInternalServerError)
-			return
-		}
-		command.ExpireTime = time.Now().Add(time.Second * (time.Duration)(duration))
-	} else {
-		command.ExpireTime = time.Unix(0, 0)
+	command.ExpireTime, err = durationToExpireTime(strDuration)
+
+	if err != nil {
+		warn("The given duration is not a number: %v", err)
+		w.WriteHeader(http.StatusInternalServerError)
 	}
 	}
 
 
-	excute(command, &w, req)
+	dispatch(command, &w, req)
 
 
 }
 }
 
 
+// Delete Handler
 func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]
 	key := req.URL.Path[len("/v1/keys/"):]
 
 
@@ -176,10 +90,11 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	command := &DeleteCommand{}
 	command := &DeleteCommand{}
 	command.Key = key
 	command.Key = key
 
 
-	excute(command, w, req)
+	dispatch(command, w, req)
 }
 }
 
 
-func excute(c Command, w *http.ResponseWriter, req *http.Request) {
+// Dispatch the command to leader
+func dispatch(c Command, w *http.ResponseWriter, req *http.Request) {
 	if raftServer.State() == "leader" {
 	if raftServer.State() == "leader" {
 		if body, err := raftServer.Do(c); err != nil {
 		if body, err := raftServer.Do(c); err != nil {
 			warn("Commit failed %v", err)
 			warn("Commit failed %v", err)
@@ -208,7 +123,6 @@ func excute(c Command, w *http.ResponseWriter, req *http.Request) {
 		}
 		}
 
 
 		// tell the client where is the leader
 		// tell the client where is the leader
-		debug("Redirect to the leader %s", raftServer.Leader())
 
 
 		path := req.URL.Path
 		path := req.URL.Path
 
 
@@ -220,7 +134,7 @@ func excute(c Command, w *http.ResponseWriter, req *http.Request) {
 
 
 		url := scheme + raftTransporter.GetLeaderClientAddress() + path
 		url := scheme + raftTransporter.GetLeaderClientAddress() + path
 
 
-		debug("redirect to %s", url)
+		debug("Redirect to %s", url)
 
 
 		http.Redirect(*w, req, url, http.StatusTemporaryRedirect)
 		http.Redirect(*w, req, url, http.StatusTemporaryRedirect)
 		return
 		return
@@ -231,11 +145,20 @@ func excute(c Command, w *http.ResponseWriter, req *http.Request) {
 	return
 	return
 }
 }
 
 
-func MasterHttpHandler(w http.ResponseWriter, req *http.Request) {
+//--------------------------------------
+// State non-sensitive handlers
+// will not dispatch to leader
+// TODO: add sensitive version for these
+// command?
+//--------------------------------------
+
+// Handler to return the current leader name
+func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte(raftServer.Leader()))
 	w.Write([]byte(raftServer.Leader()))
 }
 }
 
 
+// Get Handler
 func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]
 	key := req.URL.Path[len("/v1/keys/"):]
 
 
@@ -262,6 +185,7 @@ func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 
 
 }
 }
 
 
+// List Handler
 func ListHttpHandler(w http.ResponseWriter, req *http.Request) {
 func ListHttpHandler(w http.ResponseWriter, req *http.Request) {
 	prefix := req.URL.Path[len("/v1/list/"):]
 	prefix := req.URL.Path[len("/v1/list/"):]
 
 
@@ -288,6 +212,7 @@ func ListHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 
 }
 }
 
 
+// Watch handler
 func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/watch/"):]
 	key := req.URL.Path[len("/v1/watch/"):]
 
 
@@ -299,6 +224,8 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 		command.SinceIndex = 0
 		command.SinceIndex = 0
 
 
 	} else if req.Method == "POST" {
 	} else if req.Method == "POST" {
+		// watch from a specific index
+
 		debug("[recv] POST http://%v/watch/%s", raftServer.Name(), key)
 		debug("[recv] POST http://%v/watch/%s", raftServer.Name(), key)
 		content := req.FormValue("index")
 		content := req.FormValue("index")
 
 
@@ -314,7 +241,7 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 	}
 	}
 
 
 	if body, err := command.Apply(raftServer); err != nil {
 	if body, err := command.Apply(raftServer); err != nil {
-		warn("Unable to write file: %v", err)
+		warn("Unable to do watch command: %v", err)
 		w.WriteHeader(http.StatusInternalServerError)
 		w.WriteHeader(http.StatusInternalServerError)
 		return
 		return
 	} else {
 	} else {
@@ -330,3 +257,17 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 	}
 	}
 
 
 }
 }
+
+// Convert string duration to time format
+func durationToExpireTime(strDuration string) (time.Time, error) {
+	if strDuration != "" {
+		duration, err := strconv.Atoi(strDuration)
+
+		if err != nil {
+			return time.Unix(0, 0), err
+		}
+		return time.Now().Add(time.Second * (time.Duration)(duration)), nil
+	} else {
+		return time.Unix(0, 0), nil
+	}
+}

+ 137 - 99
etcd.go

@@ -11,12 +11,10 @@ import (
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/web"
 	"github.com/coreos/etcd/web"
 	"github.com/coreos/go-raft"
 	"github.com/coreos/go-raft"
-	//"io"
 	"io/ioutil"
 	"io/ioutil"
 	"log"
 	"log"
 	"net/http"
 	"net/http"
 	"os"
 	"os"
-	//"strconv"
 	"strings"
 	"strings"
 	"time"
 	"time"
 )
 )
@@ -103,6 +101,14 @@ type Info struct {
 	ServerPort int    `json:"serverPort"`
 	ServerPort int    `json:"serverPort"`
 	ClientPort int    `json:"clientPort"`
 	ClientPort int    `json:"clientPort"`
 	WebPort    int    `json:"webPort"`
 	WebPort    int    `json:"webPort"`
+
+	ServerCertFile string `json:"serverCertFile"`
+	ServerKeyFile  string `json:"serverKeyFile"`
+	ServerCAFile   string `json:"serverCAFile"`
+
+	ClientCertFile string `json:"clientCertFile"`
+	ClientKeyFile  string `json:"clientKeyFile"`
+	ClientCAFile   string `json:"clientCAFile"`
 }
 }
 
 
 //------------------------------------------------------------------------------
 //------------------------------------------------------------------------------
@@ -114,6 +120,7 @@ type Info struct {
 var raftServer *raft.Server
 var raftServer *raft.Server
 var raftTransporter transporter
 var raftTransporter transporter
 var etcdStore *store.Store
 var etcdStore *store.Store
+var info *Info
 
 
 //------------------------------------------------------------------------------
 //------------------------------------------------------------------------------
 //
 //
@@ -126,62 +133,72 @@ var etcdStore *store.Store
 //--------------------------------------
 //--------------------------------------
 
 
 func main() {
 func main() {
-	var err error
 	flag.Parse()
 	flag.Parse()
 
 
 	// Setup commands.
 	// Setup commands.
-	raft.RegisterCommand(&JoinCommand{})
-	raft.RegisterCommand(&SetCommand{})
-	raft.RegisterCommand(&GetCommand{})
-	raft.RegisterCommand(&DeleteCommand{})
-	raft.RegisterCommand(&WatchCommand{})
-	raft.RegisterCommand(&ListCommand{})
-	raft.RegisterCommand(&TestAndSetCommand{})
+	registerCommands()
 
 
+	// Read server info from file or grab it from user.
 	if err := os.MkdirAll(dirPath, 0744); err != nil {
 	if err := os.MkdirAll(dirPath, 0744); err != nil {
 		fatal("Unable to create path: %v", err)
 		fatal("Unable to create path: %v", err)
 	}
 	}
 
 
-	// Read server info from file or grab it from user.
-	var info *Info = getInfo(dirPath)
-
-	name := fmt.Sprintf("%s:%d", info.Address, info.ServerPort)
-
-	fmt.Printf("ServerName: %s\n\n", name)
+	info = getInfo(dirPath)
 
 
 	// secrity type
 	// secrity type
 	st := securityType(SERVER)
 	st := securityType(SERVER)
 
 
-	if st == -1 {
-		panic("ERROR type")
-	}
+	clientSt := securityType(CLIENT)
 
 
-	raftTransporter = createTransporter(st)
+	if st == -1 || clientSt == -1 {
+		fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
+	}
 
 
-	// Setup new raft server.
+	// Create etcd key-value store
 	etcdStore = store.CreateStore(maxSize)
 	etcdStore = store.CreateStore(maxSize)
 
 
-	// create raft server
-	raftServer, err = raft.NewServer(name, dirPath, raftTransporter, etcdStore, nil)
+	startRaft(st)
 
 
-	if err != nil {
-		fatal("%v", err)
+	if webPort != -1 {
+		// start web
+		etcdStore.SetMessager(&storeMsg)
+		go webHelper()
+		go web.Start(raftServer, webPort)
 	}
 	}
 
 
-	err = raftServer.LoadSnapshot()
+	startClientTransport(info.ClientPort, clientSt)
 
 
-	if err == nil {
-		debug("%s finished load snapshot", raftServer.Name())
-	} else {
+}
+
+// Start the raft server
+func startRaft(securityType int) {
+	var err error
+
+	raftName := fmt.Sprintf("%s:%d", info.Address, info.ServerPort)
+
+	// Create transporter for raft
+	raftTransporter = createTransporter(securityType)
+
+	// Create raft server
+	raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
+
+	if err != nil {
 		fmt.Println(err)
 		fmt.Println(err)
-		debug("%s bad snapshot", raftServer.Name())
+		os.Exit(1)
 	}
 	}
 
 
+	// LoadSnapshot
+	// err = raftServer.LoadSnapshot()
+
+	// if err == nil {
+	// 	debug("%s finished load snapshot", raftServer.Name())
+	// } else {
+	// 	debug(err)
+	// }
+
 	raftServer.Initialize()
 	raftServer.Initialize()
-	debug("%s finished init", raftServer.Name())
 	raftServer.SetElectionTimeout(ELECTIONTIMTOUT)
 	raftServer.SetElectionTimeout(ELECTIONTIMTOUT)
 	raftServer.SetHeartbeatTimeout(HEARTBEATTIMEOUT)
 	raftServer.SetHeartbeatTimeout(HEARTBEATTIMEOUT)
-	debug("%s finished set timeout", raftServer.Name())
 
 
 	if raftServer.IsLogEmpty() {
 	if raftServer.IsLogEmpty() {
 
 
@@ -206,9 +223,9 @@ func main() {
 		} else {
 		} else {
 			raftServer.StartFollower()
 			raftServer.StartFollower()
 
 
-			err := Join(raftServer, cluster)
+			err := joinCluster(raftServer, cluster)
 			if err != nil {
 			if err != nil {
-				panic(err)
+				fatal(fmt.Sprintln(err))
 			}
 			}
 			debug("%s success join to the cluster", raftServer.Name())
 			debug("%s success join to the cluster", raftServer.Name())
 		}
 		}
@@ -220,20 +237,16 @@ func main() {
 	}
 	}
 
 
 	// open the snapshot
 	// open the snapshot
-	//go server.Snapshot()
-
-	if webPort != -1 {
-		// start web
-		etcdStore.SetMessager(&storeMsg)
-		go webHelper()
-		go web.Start(raftServer, webPort)
-	}
+	// go server.Snapshot()
 
 
-	go startServTransport(info.ServerPort, st)
-	startClientTransport(info.ClientPort, securityType(CLIENT))
+	// start to response to raft requests
+	go startRaftTransport(info.ServerPort, securityType)
 
 
 }
 }
 
 
+// Create transporter using by raft server
+// Create http or https transporter based on
+// wether the user give the server cert and key
 func createTransporter(st int) transporter {
 func createTransporter(st int) transporter {
 	t := transporter{}
 	t := transporter{}
 
 
@@ -248,7 +261,7 @@ func createTransporter(st int) transporter {
 		tlsCert, err := tls.LoadX509KeyPair(serverCertFile, serverKeyFile)
 		tlsCert, err := tls.LoadX509KeyPair(serverCertFile, serverKeyFile)
 
 
 		if err != nil {
 		if err != nil {
-			panic(err)
+			fatal(fmt.Sprintln(err))
 		}
 		}
 
 
 		tr := &http.Transport{
 		tr := &http.Transport{
@@ -267,7 +280,8 @@ func createTransporter(st int) transporter {
 	return transporter{}
 	return transporter{}
 }
 }
 
 
-func startServTransport(port int, st int) {
+// Start to listen and response raft command
+func startRaftTransport(port int, st int) {
 
 
 	// internal commands
 	// internal commands
 	http.HandleFunc("/join", JoinHttpHandler)
 	http.HandleFunc("/join", JoinHttpHandler)
@@ -275,41 +289,29 @@ func startServTransport(port int, st int) {
 	http.HandleFunc("/log", GetLogHttpHandler)
 	http.HandleFunc("/log", GetLogHttpHandler)
 	http.HandleFunc("/log/append", AppendEntriesHttpHandler)
 	http.HandleFunc("/log/append", AppendEntriesHttpHandler)
 	http.HandleFunc("/snapshot", SnapshotHttpHandler)
 	http.HandleFunc("/snapshot", SnapshotHttpHandler)
-	http.HandleFunc("/client", clientHttpHandler)
+	http.HandleFunc("/client", ClientHttpHandler)
 
 
 	switch st {
 	switch st {
 
 
 	case HTTP:
 	case HTTP:
-		debug("raft server [%s] listen on http port %v", address, port)
+		fmt.Printf("raft server [%s] listen on http port %v\n", address, port)
 		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
 		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
 
 
 	case HTTPS:
 	case HTTPS:
-		debug("raft server [%s] listen on https port %v", address, port)
+		fmt.Printf("raft server [%s] listen on https port %v\n", address, port)
 		log.Fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil))
 		log.Fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil))
 
 
 	case HTTPSANDVERIFY:
 	case HTTPSANDVERIFY:
-		pemByte, _ := ioutil.ReadFile(serverCAFile)
-
-		block, pemByte := pem.Decode(pemByte)
-
-		cert, err := x509.ParseCertificate(block.Bytes)
-
-		if err != nil {
-			fmt.Println(err)
-		}
-
-		certPool := x509.NewCertPool()
-
-		certPool.AddCert(cert)
 
 
 		server := &http.Server{
 		server := &http.Server{
 			TLSConfig: &tls.Config{
 			TLSConfig: &tls.Config{
 				ClientAuth: tls.RequireAndVerifyClientCert,
 				ClientAuth: tls.RequireAndVerifyClientCert,
-				ClientCAs:  certPool,
+				ClientCAs:  createCertPool(serverCAFile),
 			},
 			},
 			Addr: fmt.Sprintf(":%d", port),
 			Addr: fmt.Sprintf(":%d", port),
 		}
 		}
-		err = server.ListenAndServeTLS(serverCertFile, serverKeyFile)
+		fmt.Printf("raft server [%s] listen on https port %v\n", address, port)
+		err := server.ListenAndServeTLS(serverCertFile, serverKeyFile)
 
 
 		if err != nil {
 		if err != nil {
 			log.Fatal(err)
 			log.Fatal(err)
@@ -318,49 +320,40 @@ func startServTransport(port int, st int) {
 
 
 }
 }
 
 
+// Start to listen and response client command
 func startClientTransport(port int, st int) {
 func startClientTransport(port int, st int) {
 	// external commands
 	// external commands
-	http.HandleFunc("/v1/keys/", Multiplexer)
-	http.HandleFunc("/v1/watch/", WatchHttpHandler)
-	http.HandleFunc("/v1/list/", ListHttpHandler)
-	http.HandleFunc("/v1/testAndSet/", TestAndSetHttpHandler)
-	http.HandleFunc("/master", MasterHttpHandler)
+	http.HandleFunc("/"+version+"/keys/", Multiplexer)
+	http.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
+	http.HandleFunc("/"+version+"/list/", ListHttpHandler)
+	http.HandleFunc("/"+version+"/testAndSet/", TestAndSetHttpHandler)
+	http.HandleFunc("/leader", LeaderHttpHandler)
 
 
 	switch st {
 	switch st {
 
 
 	case HTTP:
 	case HTTP:
-		debug("etcd [%s] listen on http port %v", address, clientPort)
+		fmt.Printf("etcd [%s] listen on http port %v\n", address, clientPort)
 		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
 		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
 
 
 	case HTTPS:
 	case HTTPS:
+		fmt.Printf("etcd [%s] listen on https port %v\n", address, clientPort)
 		http.ListenAndServeTLS(fmt.Sprintf(":%d", port), clientCertFile, clientKeyFile, nil)
 		http.ListenAndServeTLS(fmt.Sprintf(":%d", port), clientCertFile, clientKeyFile, nil)
 
 
 	case HTTPSANDVERIFY:
 	case HTTPSANDVERIFY:
-		pemByte, _ := ioutil.ReadFile(clientCAFile)
-
-		block, pemByte := pem.Decode(pemByte)
-
-		cert, err := x509.ParseCertificate(block.Bytes)
-
-		if err != nil {
-			fmt.Println(err)
-		}
-
-		certPool := x509.NewCertPool()
-
-		certPool.AddCert(cert)
 
 
 		server := &http.Server{
 		server := &http.Server{
 			TLSConfig: &tls.Config{
 			TLSConfig: &tls.Config{
 				ClientAuth: tls.RequireAndVerifyClientCert,
 				ClientAuth: tls.RequireAndVerifyClientCert,
-				ClientCAs:  certPool,
+				ClientCAs:  createCertPool(clientCAFile),
 			},
 			},
 			Addr: fmt.Sprintf(":%d", port),
 			Addr: fmt.Sprintf(":%d", port),
 		}
 		}
-		err = server.ListenAndServeTLS(clientCertFile, clientKeyFile)
+		fmt.Printf("etcd [%s] listen on https port %v\n", address, clientPort)
+		err := server.ListenAndServeTLS(clientCertFile, clientKeyFile)
 
 
 		if err != nil {
 		if err != nil {
 			log.Fatal(err)
 			log.Fatal(err)
+			os.Exit(1)
 		}
 		}
 	}
 	}
 }
 }
@@ -369,22 +362,26 @@ func startClientTransport(port int, st int) {
 // Config
 // Config
 //--------------------------------------
 //--------------------------------------
 
 
+// Get the security type
 func securityType(source int) int {
 func securityType(source int) int {
 
 
 	var keyFile, certFile, CAFile string
 	var keyFile, certFile, CAFile string
 
 
 	switch source {
 	switch source {
+
 	case SERVER:
 	case SERVER:
-		keyFile = serverKeyFile
-		certFile = serverCertFile
-		CAFile = serverCAFile
+		keyFile = info.ServerKeyFile
+		certFile = info.ServerCertFile
+		CAFile = info.ServerCAFile
 
 
 	case CLIENT:
 	case CLIENT:
-		keyFile = clientKeyFile
-		certFile = clientCertFile
-		CAFile = clientCAFile
+		keyFile = info.ClientKeyFile
+		certFile = info.ClientCertFile
+		CAFile = info.ClientCAFile
 	}
 	}
 
 
+	// If the user do not specify key file, cert file and
+	// CA file, the type will be HTTP
 	if keyFile == "" && certFile == "" && CAFile == "" {
 	if keyFile == "" && certFile == "" && CAFile == "" {
 
 
 		return HTTP
 		return HTTP
@@ -392,24 +389,30 @@ func securityType(source int) int {
 	}
 	}
 
 
 	if keyFile != "" && certFile != "" {
 	if keyFile != "" && certFile != "" {
-
 		if CAFile != "" {
 		if CAFile != "" {
+			// If the user specify all the three file, the type
+			// will be HTTPS with client cert auth
 			return HTTPSANDVERIFY
 			return HTTPSANDVERIFY
 		}
 		}
-
+		// If the user specify key file and cert file but not
+		// CA file, the type will be HTTPS without client cert
+		// auth
 		return HTTPS
 		return HTTPS
 	}
 	}
 
 
+	// bad specification
 	return -1
 	return -1
 }
 }
 
 
+// Get the server info from previous conf file
+// or from the user
 func getInfo(path string) *Info {
 func getInfo(path string) *Info {
 	info := &Info{}
 	info := &Info{}
 
 
 	// Read in the server info if available.
 	// Read in the server info if available.
 	infoPath := fmt.Sprintf("%s/info", path)
 	infoPath := fmt.Sprintf("%s/info", path)
 
 
-	// delete the old configuration if exist
+	// Delete the old configuration if exist
 	if ignore {
 	if ignore {
 		logPath := fmt.Sprintf("%s/log", path)
 		logPath := fmt.Sprintf("%s/log", path)
 		snapshotPath := fmt.Sprintf("%s/snapshotPath", path)
 		snapshotPath := fmt.Sprintf("%s/snapshotPath", path)
@@ -429,8 +432,8 @@ func getInfo(path string) *Info {
 		}
 		}
 		file.Close()
 		file.Close()
 
 
-		// Otherwise ask user for info and write it to file.
 	} else {
 	} else {
+		// Otherwise ask user for info and write it to file.
 
 
 		if address == "" {
 		if address == "" {
 			fatal("Please give the address of the local machine")
 			fatal("Please give the address of the local machine")
@@ -444,6 +447,14 @@ func getInfo(path string) *Info {
 		info.ClientPort = clientPort
 		info.ClientPort = clientPort
 		info.WebPort = webPort
 		info.WebPort = webPort
 
 
+		info.ClientCAFile = clientCAFile
+		info.ClientCertFile = clientCertFile
+		info.ClientKeyFile = clientKeyFile
+
+		info.ServerCAFile = serverCAFile
+		info.ServerKeyFile = serverKeyFile
+		info.ServerCertFile = serverCertFile
+
 		// Write to file.
 		// Write to file.
 		content, _ := json.Marshal(info)
 		content, _ := json.Marshal(info)
 		content = []byte(string(content) + "\n")
 		content = []byte(string(content) + "\n")
@@ -455,12 +466,28 @@ func getInfo(path string) *Info {
 	return info
 	return info
 }
 }
 
 
-//--------------------------------------
-// Handlers
-//--------------------------------------
+// Create client auth certpool
+func createCertPool(CAFile string) *x509.CertPool {
+	pemByte, _ := ioutil.ReadFile(CAFile)
+
+	block, pemByte := pem.Decode(pemByte)
+
+	cert, err := x509.ParseCertificate(block.Bytes)
+
+	if err != nil {
+		fmt.Println(err)
+		os.Exit(1)
+	}
+
+	certPool := x509.NewCertPool()
+
+	certPool.AddCert(cert)
+
+	return certPool
+}
 
 
 // Send join requests to the leader.
 // Send join requests to the leader.
-func Join(s *raft.Server, serverName string) error {
+func joinCluster(s *raft.Server, serverName string) error {
 	var b bytes.Buffer
 	var b bytes.Buffer
 
 
 	command := &JoinCommand{}
 	command := &JoinCommand{}
@@ -493,3 +520,14 @@ func Join(s *raft.Server, serverName string) error {
 	}
 	}
 	return fmt.Errorf("Unable to join: %v", err)
 	return fmt.Errorf("Unable to join: %v", err)
 }
 }
+
+// Register commands to raft server
+func registerCommands() {
+	raft.RegisterCommand(&JoinCommand{})
+	raft.RegisterCommand(&SetCommand{})
+	raft.RegisterCommand(&GetCommand{})
+	raft.RegisterCommand(&DeleteCommand{})
+	raft.RegisterCommand(&WatchCommand{})
+	raft.RegisterCommand(&ListCommand{})
+	raft.RegisterCommand(&TestAndSetCommand{})
+}

+ 94 - 0
raft_handlers.go

@@ -0,0 +1,94 @@
+package main
+
+import (
+	"encoding/json"
+	"github.com/coreos/go-raft"
+	"net/http"
+	"strconv"
+)
+
+//-------------------------------------------------------------
+// Handlers to handle raft related request via raft server port
+//-------------------------------------------------------------
+
+// Get all the current logs
+func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
+	debug("[recv] GET http://%v/log", raftServer.Name())
+	w.Header().Set("Content-Type", "application/json")
+	w.WriteHeader(http.StatusOK)
+	json.NewEncoder(w).Encode(raftServer.LogEntries())
+}
+
+// Response to vote request
+func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
+	rvreq := &raft.RequestVoteRequest{}
+	err := decodeJsonRequest(req, rvreq)
+	if err == nil {
+		debug("[recv] POST http://%v/vote [%s]", raftServer.Name(), rvreq.CandidateName)
+		if resp := raftServer.RequestVote(rvreq); resp != nil {
+			w.WriteHeader(http.StatusOK)
+			json.NewEncoder(w).Encode(resp)
+			return
+		}
+	}
+	warn("[vote] ERROR: %v", err)
+	w.WriteHeader(http.StatusInternalServerError)
+}
+
+// Response to append entries request
+func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
+	aereq := &raft.AppendEntriesRequest{}
+	err := decodeJsonRequest(req, aereq)
+
+	if err == nil {
+		debug("[recv] POST http://%s/log/append [%d]", raftServer.Name(), len(aereq.Entries))
+		if resp := raftServer.AppendEntries(aereq); resp != nil {
+			w.WriteHeader(http.StatusOK)
+			json.NewEncoder(w).Encode(resp)
+			if !resp.Success {
+				debug("[Append Entry] Step back")
+			}
+			return
+		}
+	}
+	warn("[Append Entry] ERROR: %v", err)
+	w.WriteHeader(http.StatusInternalServerError)
+}
+
+// Response to recover from snapshot request
+func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
+	aereq := &raft.SnapshotRequest{}
+	err := decodeJsonRequest(req, aereq)
+	if err == nil {
+		debug("[recv] POST http://%s/snapshot/ ", raftServer.Name())
+		if resp, _ := raftServer.SnapshotRecovery(aereq); resp != nil {
+			w.WriteHeader(http.StatusOK)
+			json.NewEncoder(w).Encode(resp)
+			return
+		}
+	}
+	warn("[Snapshot] ERROR: %v", err)
+	w.WriteHeader(http.StatusInternalServerError)
+}
+
+// Get the port that listening for client connecting of the server
+func ClientHttpHandler(w http.ResponseWriter, req *http.Request) {
+	debug("[recv] Get http://%v/client/ ", raftServer.Name())
+	w.WriteHeader(http.StatusOK)
+	client := address + ":" + strconv.Itoa(clientPort)
+	w.Write([]byte(client))
+}
+
+// Response to the join request
+func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
+
+	command := &JoinCommand{}
+
+	if err := decodeJsonRequest(req, command); err == nil {
+		debug("Receive Join Request from %s", command.Name)
+		dispatch(command, &w, req)
+	} else {
+		w.WriteHeader(http.StatusInternalServerError)
+		return
+	}
+}

+ 0 - 4
util.go

@@ -63,10 +63,6 @@ func debug(msg string, v ...interface{}) {
 	}
 	}
 }
 }
 
 
-func info(msg string, v ...interface{}) {
-	logger.Printf("INFO  "+msg+"\n", v...)
-}
-
 func warn(msg string, v ...interface{}) {
 func warn(msg string, v ...interface{}) {
 	logger.Printf("WARN  "+msg+"\n", v...)
 	logger.Printf("WARN  "+msg+"\n", v...)
 }
 }

+ 3 - 0
version.go

@@ -0,0 +1,3 @@
+package main
+
+var version = "v1"

+ 28 - 0
web/file2gostring.sh

@@ -0,0 +1,28 @@
+#!/bin/sh
+
+# this file is copied from doozerd. 
+
+set -e
+
+munge() {
+    printf %s "$1" | tr . _ | tr -d -c '[:alnum:]_'
+}
+
+quote() {
+    sed 's/\\/\\\\/g' | sed 's/"/\\"/g' | sed 's/$/\\n/' | tr -d '\n'
+}
+
+pkg_path=$1 ; shift
+file=$1     ; shift
+
+pkg=`basename $pkg_path`
+
+printf 'package %s\n' "$pkg"
+printf '\n'
+printf '// This file was generated from %s.\n' "$file"
+printf '\n'
+printf 'var '
+munge "`basename $file`"
+printf ' string = "'
+quote
+printf '"\n'

File diff suppressed because it is too large
+ 4 - 0
web/index.go


+ 70 - 0
web/index.html

@@ -0,0 +1,70 @@
+<html>
+<head>
+<title>etcd Web Interface</title>
+<script type="text/javascript" src="//ajax.googleapis.com/ajax/libs/jquery/1.10.1/jquery.min.js"></script>
+<script type="text/javascript">
+    $(function() {
+
+    var conn;
+    var content = $("#content");
+
+    function update(response) {
+        // if set
+        if (response.action == "SET") {
+
+            if (response.expiration > "1970") {
+                t = response.key + "=" + response.value
+                        + "  " + response.expiration
+            } else {
+                t = response.key + "=" + response.value
+            }
+
+            id = response.key.replace(new RegExp("/", 'g'), "\\/");
+
+            if ($("#store_" + id).length == 0) {
+                if (response.expiration > "1970") {
+                    t = response.key + "=" + response.value
+                        + "  " + response.expiration
+                } else {
+                    t = response.key + "=" + response.value
+                }
+
+                var e = $('<div id="store_' + response.key + '"/>')
+                    .text(t)
+                e.appendTo(content)
+            }
+            else {
+
+                $("#store_" + id)
+                    .text(t)
+            }
+        }
+        // if delete
+        else if (response.action == "DELETE") {
+            id = response.key.replace(new RegExp("/", 'g'), "\\/");
+
+            $("#store_" + id).remove()
+        }
+    }
+
+
+    if (window["WebSocket"]) {
+        conn = new WebSocket("ws://{{.Address}}/ws");
+        conn.onclose = function(evt) {
+
+        }
+        conn.onmessage = function(evt) {
+            var response = JSON.parse(evt.data)
+            update(response)
+        }
+    } else {
+        appendLog($("<div><b>Your browser does not support WebSockets.</b></div>"))
+    }
+    });
+</script>
+</head>
+<body>
+    <div id="leader">Leader: {{.Leader}}</div>
+    <div id="content"></div>
+</body>
+</html>

+ 1 - 26
web/web.go

@@ -4,10 +4,8 @@ import (
 	"code.google.com/p/go.net/websocket"
 	"code.google.com/p/go.net/websocket"
 	"fmt"
 	"fmt"
 	"github.com/coreos/go-raft"
 	"github.com/coreos/go-raft"
-	//"github.com/xiangli-cmu/raft-etcd/store"
 	"html/template"
 	"html/template"
 	"net/http"
 	"net/http"
-	//"time"
 )
 )
 
 
 var s *raft.Server
 var s *raft.Server
@@ -18,28 +16,6 @@ type MainPage struct {
 	Address string
 	Address string
 }
 }
 
 
-func handler(w http.ResponseWriter, r *http.Request) {
-	fmt.Fprintf(w, "Leader:\n%s\n", s.Leader())
-	fmt.Fprintf(w, "Peers:\n")
-
-	for peerName, _ := range s.Peers() {
-		fmt.Fprintf(w, "%s\n", peerName)
-	}
-
-	fmt.Fprintf(w, "Data\n")
-
-	//s := store.GetStore()
-
-	// for key, node := range s.Nodes {
-	// 	if node.ExpireTime.Equal(time.Unix(0, 0)) {
-	// 		fmt.Fprintf(w, "%s %s\n", key, node.Value)
-	// 	} else {
-	// 		fmt.Fprintf(w, "%s %s %s\n", key, node.Value, node.ExpireTime)
-	// 	}
-	// }
-
-}
-
 func mainHandler(c http.ResponseWriter, req *http.Request) {
 func mainHandler(c http.ResponseWriter, req *http.Request) {
 
 
 	p := &MainPage{Leader: s.Leader(),
 	p := &MainPage{Leader: s.Leader(),
@@ -49,14 +25,13 @@ func mainHandler(c http.ResponseWriter, req *http.Request) {
 }
 }
 
 
 func Start(server *raft.Server, port int) {
 func Start(server *raft.Server, port int) {
-	mainTempl = template.Must(template.ParseFiles("home.html"))
+	mainTempl = template.Must(template.New("index.html").Parse(index_html))
 	s = server
 	s = server
 
 
 	go h.run()
 	go h.run()
 	http.HandleFunc("/", mainHandler)
 	http.HandleFunc("/", mainHandler)
 	http.Handle("/ws", websocket.Handler(wsHandler))
 	http.Handle("/ws", websocket.Handler(wsHandler))
 
 
-	//http.HandleFunc("/", handler)
 	fmt.Println("web listening at port ", port)
 	fmt.Println("web listening at port ", port)
 	http.ListenAndServe(fmt.Sprintf(":%v", port), nil)
 	http.ListenAndServe(fmt.Sprintf(":%v", port), nil)
 }
 }

Some files were not shown because too many files changed in this diff