Browse Source

make raft and etcd server

Xiang Li 12 years ago
parent
commit
ca4b5815f7
8 changed files with 130 additions and 88 deletions
  1. 10 29
      etcd.go
  2. 11 11
      etcd_handlers.go
  3. 45 0
      etcd_server.go
  4. 3 3
      machines.go
  5. 13 13
      raft_handlers.go
  6. 45 29
      raft_server.go
  7. 1 1
      snapshot.go
  8. 2 2
      util.go

+ 10 - 29
etcd.go

@@ -6,8 +6,6 @@ import (
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/go-raft"
 	"io/ioutil"
-	"net/http"
-	"net/url"
 	"os"
 	"strings"
 	"time"
@@ -119,6 +117,9 @@ type TLSConfig struct {
 	Client tls.Config
 }
 
+type EtcdServer struct {
+}
+
 //------------------------------------------------------------------------------
 //
 // Variables
@@ -126,7 +127,6 @@ type TLSConfig struct {
 //------------------------------------------------------------------------------
 
 var etcdStore *store.Store
-var info *Info
 
 //------------------------------------------------------------------------------
 //
@@ -186,37 +186,18 @@ func main() {
 		fatalf("Unable to create path: %s", err)
 	}
 
-	info = getInfo(dirPath)
+	info := getInfo(dirPath)
 
 	// Create etcd key-value store
 	etcdStore = store.CreateStore(maxSize)
 	snapConf = newSnapshotConf()
 
-	startWebInterface()
-
-	startRaft(raftTLSConfig)
-
-	startEtcd(etcdTLSConfig)
-
-}
-
-// Start to listen and response etcd client command
-func startEtcd(tlsConf TLSConfig) {
-	u, err := url.Parse(info.EtcdURL)
-	if err != nil {
-		fatalf("invalid url '%s': %s", info.EtcdURL, err)
-	}
-	infof("etcd server [%s:%s]", info.Name, u)
+	// Create etcd and raft server
+	e = newEtcdServer(info.Name, info.EtcdURL, &etcdTLSConfig, &info.EtcdTLS)
+	r = newRaftServer(info.Name, info.RaftURL, &raftTLSConfig, &info.RaftTLS)
 
-	server := http.Server{
-		Handler:   NewEtcdMuxer(),
-		TLSConfig: &tlsConf.Server,
-		Addr:      u.Host,
-	}
+	startWebInterface()
+	r.start()
+	e.start()
 
-	if tlsConf.Scheme == "http" {
-		fatal(server.ListenAndServe())
-	} else {
-		fatal(server.ListenAndServeTLS(info.EtcdTLS.CertFile, info.EtcdTLS.KeyFile))
-	}
 }

+ 11 - 11
etcd_handlers.go

@@ -59,7 +59,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	debugf("[recv] POST %v/v1/keys/%s [%s]", info.EtcdURL, key, req.RemoteAddr)
+	debugf("[recv] POST %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
 
 	value := req.FormValue("value")
 
@@ -110,7 +110,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]
 
-	debugf("[recv] DELETE %v/v1/keys/%s [%s]", info.EtcdURL, key, req.RemoteAddr)
+	debugf("[recv] DELETE %v/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
 
 	command := &DeleteCommand{
 		Key: key,
@@ -122,8 +122,8 @@ func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 // Dispatch the command to leader
 func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) {
 
-	if raftServer.State() == raft.Leader {
-		if body, err := raftServer.Do(c); err != nil {
+	if r.server.State() == raft.Leader {
+		if body, err := r.server.Do(c); err != nil {
 
 			if _, ok := err.(store.NotFoundError); ok {
 				(*w).WriteHeader(http.StatusNotFound)
@@ -167,7 +167,7 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) {
 			return
 		}
 	} else {
-		leader := raftServer.Leader()
+		leader := r.server.Leader()
 		// current no leader
 		if leader == "" {
 			(*w).WriteHeader(http.StatusInternalServerError)
@@ -211,7 +211,7 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) {
 
 // Handler to return the current leader's raft address
 func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
-	leader := raftServer.Leader()
+	leader := r.server.Leader()
 
 	if leader != "" {
 		w.WriteHeader(http.StatusOK)
@@ -250,13 +250,13 @@ func StatsHttpHandler(w http.ResponseWriter, req *http.Request) {
 func GetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]
 
-	debugf("[recv] GET %s/v1/keys/%s [%s]", info.EtcdURL, key, req.RemoteAddr)
+	debugf("[recv] GET %s/v1/keys/%s [%s]", e.url, key, req.RemoteAddr)
 
 	command := &GetCommand{
 		Key: key,
 	}
 
-	if body, err := command.Apply(raftServer); err != nil {
+	if body, err := command.Apply(r.server); err != nil {
 
 		if _, ok := err.(store.NotFoundError); ok {
 			(*w).WriteHeader(http.StatusNotFound)
@@ -289,13 +289,13 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 	}
 
 	if req.Method == "GET" {
-		debugf("[recv] GET %s/watch/%s [%s]", info.EtcdURL, key, req.RemoteAddr)
+		debugf("[recv] GET %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
 		command.SinceIndex = 0
 
 	} else if req.Method == "POST" {
 		// watch from a specific index
 
-		debugf("[recv] POST %s/watch/%s [%s]", info.EtcdURL, key, req.RemoteAddr)
+		debugf("[recv] POST %s/watch/%s [%s]", e.url, key, req.RemoteAddr)
 		content := req.FormValue("index")
 
 		sinceIndex, err := strconv.ParseUint(string(content), 10, 64)
@@ -310,7 +310,7 @@ func WatchHttpHandler(w http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	if body, err := command.Apply(raftServer); err != nil {
+	if body, err := command.Apply(r.server); err != nil {
 		w.WriteHeader(http.StatusInternalServerError)
 		w.Write(newJsonError(500, key))
 	} else {

+ 45 - 0
etcd_server.go

@@ -0,0 +1,45 @@
+package main
+
+import (
+	"net/http"
+	"net/url"
+)
+
+type etcdServer struct {
+	name    string
+	url     string
+	tlsConf *TLSConfig
+	tlsInfo *TLSInfo
+}
+
+var e *etcdServer
+
+func newEtcdServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *etcdServer {
+	return &etcdServer{
+		name:    name,
+		url:     url,
+		tlsConf: tlsConf,
+		tlsInfo: tlsInfo,
+	}
+}
+
+// Start to listen and response etcd client command
+func (e *etcdServer) start() {
+	u, err := url.Parse(e.url)
+	if err != nil {
+		fatalf("invalid url '%s': %s", e.url, err)
+	}
+	infof("etcd server [%s:%s]", e.name, u)
+
+	server := http.Server{
+		Handler:   NewEtcdMuxer(),
+		TLSConfig: &e.tlsConf.Server,
+		Addr:      u.Host,
+	}
+
+	if e.tlsConf.Scheme == "http" {
+		fatal(server.ListenAndServe())
+	} else {
+		fatal(server.ListenAndServeTLS(e.tlsInfo.CertFile, e.tlsInfo.KeyFile))
+	}
+}

+ 3 - 3
machines.go

@@ -10,12 +10,12 @@ func machineNum() int {
 // getMachines gets the current machines in the cluster
 func getMachines() []string {
 
-	peers := raftServer.Peers()
+	peers := r.server.Peers()
 
 	machines := make([]string, len(peers)+1)
 
-	leader, ok := nameToEtcdURL(raftServer.Leader())
-	self := info.EtcdURL
+	leader, ok := nameToEtcdURL(r.server.Leader())
+	self := e.url
 	i := 1
 
 	if ok {

+ 13 - 13
raft_handlers.go

@@ -12,10 +12,10 @@ import (
 
 // Get all the current logs
 func GetLogHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debugf("[recv] GET %s/log", info.RaftURL)
+	debugf("[recv] GET %s/log", r.url)
 	w.Header().Set("Content-Type", "application/json")
 	w.WriteHeader(http.StatusOK)
-	json.NewEncoder(w).Encode(raftServer.LogEntries())
+	json.NewEncoder(w).Encode(r.server.LogEntries())
 }
 
 // Response to vote request
@@ -23,8 +23,8 @@ func VoteHttpHandler(w http.ResponseWriter, req *http.Request) {
 	rvreq := &raft.RequestVoteRequest{}
 	err := decodeJsonRequest(req, rvreq)
 	if err == nil {
-		debugf("[recv] POST %s/vote [%s]", info.RaftURL, rvreq.CandidateName)
-		if resp := raftServer.RequestVote(rvreq); resp != nil {
+		debugf("[recv] POST %s/vote [%s]", r.url, rvreq.CandidateName)
+		if resp := r.server.RequestVote(rvreq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			return
@@ -40,8 +40,8 @@ func AppendEntriesHttpHandler(w http.ResponseWriter, req *http.Request) {
 	err := decodeJsonRequest(req, aereq)
 
 	if err == nil {
-		debugf("[recv] POST %s/log/append [%d]", info.RaftURL, len(aereq.Entries))
-		if resp := raftServer.AppendEntries(aereq); resp != nil {
+		debugf("[recv] POST %s/log/append [%d]", r.url, len(aereq.Entries))
+		if resp := r.server.AppendEntries(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			if !resp.Success {
@@ -59,8 +59,8 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
 	aereq := &raft.SnapshotRequest{}
 	err := decodeJsonRequest(req, aereq)
 	if err == nil {
-		debugf("[recv] POST %s/snapshot/ ", info.RaftURL)
-		if resp := raftServer.RequestSnapshot(aereq); resp != nil {
+		debugf("[recv] POST %s/snapshot/ ", r.url)
+		if resp := r.server.RequestSnapshot(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			return
@@ -75,8 +75,8 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
 	aereq := &raft.SnapshotRecoveryRequest{}
 	err := decodeJsonRequest(req, aereq)
 	if err == nil {
-		debugf("[recv] POST %s/snapshotRecovery/ ", info.RaftURL)
-		if resp := raftServer.SnapshotRecoveryRequest(aereq); resp != nil {
+		debugf("[recv] POST %s/snapshotRecovery/ ", r.url)
+		if resp := r.server.SnapshotRecoveryRequest(aereq); resp != nil {
 			w.WriteHeader(http.StatusOK)
 			json.NewEncoder(w).Encode(resp)
 			return
@@ -88,7 +88,7 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 // Get the port that listening for etcd connecting of the server
 func EtcdURLHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debugf("[recv] Get %s/etcdURL/ ", info.RaftURL)
+	debugf("[recv] Get %s/etcdURL/ ", r.url)
 	w.WriteHeader(http.StatusOK)
 	w.Write([]byte(argInfo.EtcdURL))
 }
@@ -109,7 +109,7 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 // Response to the name request
 func NameHttpHandler(w http.ResponseWriter, req *http.Request) {
-	debugf("[recv] Get %s/name/ ", info.RaftURL)
+	debugf("[recv] Get %s/name/ ", r.url)
 	w.WriteHeader(http.StatusOK)
-	w.Write([]byte(raftServer.Name()))
+	w.Write([]byte(r.server.Name()))
 }

+ 45 - 29
raft_server.go

@@ -12,44 +12,60 @@ import (
 	"github.com/coreos/go-raft"
 )
 
-var raftServer *raft.Server
+type raftServer struct {
+	name    string
+	url     string
+	tlsConf *TLSConfig
+	tlsInfo *TLSInfo
+	server  *raft.Server
+}
 
-// Start the raft server
-func startRaft(tlsConfig TLSConfig) {
+var r *raftServer
 
-	raftName := info.Name
+func newRaftServer(name string, url string, tlsConf *TLSConfig, tlsInfo *TLSInfo) *raftServer {
+	return &raftServer{
+		name:    name,
+		url:     url,
+		tlsConf: tlsConf,
+		tlsInfo: tlsInfo,
+	}
+}
+
+// Start the raft server
+func (r *raftServer) start() {
 
 	// Setup commands.
 	registerCommands()
 
 	// Create transporter for raft
-	raftTransporter := newTransporter(tlsConfig.Scheme, tlsConfig.Client)
+	raftTransporter := newTransporter(r.tlsConf.Scheme, r.tlsConf.Client)
 
 	// Create raft server
-	var err error
-	raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
+	server, err := raft.NewServer(r.name, dirPath, raftTransporter, etcdStore, nil)
 
 	if err != nil {
 		fatal(err)
 	}
 
+	r.server = server
+
 	// LoadSnapshot
 	if snapshot {
-		err = raftServer.LoadSnapshot()
+		err = server.LoadSnapshot()
 
 		if err == nil {
-			debugf("%s finished load snapshot", raftServer.Name())
+			debugf("%s finished load snapshot", r.name)
 		} else {
 			debug(err)
 		}
 	}
 
-	raftServer.SetElectionTimeout(ElectionTimeout)
-	raftServer.SetHeartbeatTimeout(HeartbeatTimeout)
+	server.SetElectionTimeout(ElectionTimeout)
+	server.SetHeartbeatTimeout(HeartbeatTimeout)
 
-	raftServer.Start()
+	server.Start()
 
-	if raftServer.IsLogEmpty() {
+	if server.IsLogEmpty() {
 
 		// start as a leader in a new cluster
 		if len(cluster) == 0 {
@@ -59,16 +75,16 @@ func startRaft(tlsConfig TLSConfig) {
 			// leader need to join self as a peer
 			for {
 				command := &JoinCommand{
-					Name:    raftServer.Name(),
-					RaftURL: argInfo.RaftURL,
-					EtcdURL: argInfo.EtcdURL,
+					Name:    r.name,
+					RaftURL: r.url,
+					EtcdURL: e.url,
 				}
-				_, err := raftServer.Do(command)
+				_, err := server.Do(command)
 				if err == nil {
 					break
 				}
 			}
-			debugf("%s start as a leader", raftServer.Name())
+			debugf("%s start as a leader", r.name)
 
 			// start as a follower in a existing cluster
 		} else {
@@ -82,7 +98,7 @@ func startRaft(tlsConfig TLSConfig) {
 					if len(machine) == 0 {
 						continue
 					}
-					err = joinCluster(raftServer, machine, tlsConfig.Scheme)
+					err = joinCluster(server, machine, r.tlsConf.Scheme)
 					if err != nil {
 						if err.Error() == errors[103] {
 							fatal(err)
@@ -104,12 +120,12 @@ func startRaft(tlsConfig TLSConfig) {
 			if err != nil {
 				fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
 			}
-			debugf("%s success join to the cluster", raftServer.Name())
+			debugf("%s success join to the cluster", r.name)
 		}
 
 	} else {
 		// rejoin the previous cluster
-		debugf("%s restart as a follower", raftServer.Name())
+		debugf("%s restart as a follower", r.name)
 	}
 
 	// open the snapshot
@@ -118,14 +134,14 @@ func startRaft(tlsConfig TLSConfig) {
 	}
 
 	// start to response to raft requests
-	go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server)
+	go r.startTransport(r.tlsConf.Scheme, r.tlsConf.Server)
 
 }
 
 // Start to listen and response raft command
-func startRaftTransport(info Info, scheme string, tlsConf tls.Config) {
-	u, _ := url.Parse(info.RaftURL)
-	infof("raft server [%s:%s]", info.Name, u)
+func (r *raftServer) startTransport(scheme string, tlsConf tls.Config) {
+	u, _ := url.Parse(r.url)
+	infof("raft server [%s:%s]", r.name, u)
 
 	raftMux := http.NewServeMux()
 
@@ -148,7 +164,7 @@ func startRaftTransport(info Info, scheme string, tlsConf tls.Config) {
 	if scheme == "http" {
 		fatal(server.ListenAndServe())
 	} else {
-		fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile))
+		fatal(server.ListenAndServeTLS(r.tlsInfo.CertFile, r.tlsInfo.KeyFile))
 	}
 
 }
@@ -159,14 +175,14 @@ func joinCluster(s *raft.Server, raftURL string, scheme string) error {
 
 	command := &JoinCommand{
 		Name:    s.Name(),
-		RaftURL: info.RaftURL,
-		EtcdURL: info.EtcdURL,
+		RaftURL: r.url,
+		EtcdURL: e.url,
 	}
 
 	json.NewEncoder(&b).Encode(command)
 
 	// t must be ok
-	t, ok := raftServer.Transporter().(transporter)
+	t, ok := r.server.Transporter().(transporter)
 
 	if !ok {
 		panic("wrong type")

+ 1 - 1
snapshot.go

@@ -29,7 +29,7 @@ func monitorSnapshot() {
 		currentWrites := etcdStore.TotalWrites() - snapConf.lastWrites
 
 		if currentWrites > snapConf.writesThr {
-			raftServer.TakeSnapshot()
+			r.server.TakeSnapshot()
 			snapConf.lastWrites = etcdStore.TotalWrites()
 		}
 	}

+ 2 - 2
util.go

@@ -55,7 +55,7 @@ func startWebInterface() {
 	if argInfo.WebURL != "" {
 		// start web
 		go webHelper()
-		go web.Start(raftServer, argInfo.WebURL)
+		go web.Start(r.server, argInfo.WebURL)
 	}
 }
 
@@ -198,7 +198,7 @@ func send(c chan bool) {
 		command.Key = "foo"
 		command.Value = "bar"
 		command.ExpireTime = time.Unix(0, 0)
-		raftServer.Do(command)
+		r.server.Do(command)
 	}
 	c <- true
 }