Browse Source

split raft server logic into separate module

Fabrizio (Misto) Milo 12 years ago
parent
commit
e6d8d4046d
2 changed files with 225 additions and 208 deletions
  1. 2 208
      etcd.go
  2. 223 0
      raft_server.go

+ 2 - 208
etcd.go

@@ -1,8 +1,6 @@
 package main
 package main
 
 
 import (
 import (
-	"path/filepath"
-	"bytes"
 	"crypto/tls"
 	"crypto/tls"
 	"crypto/x509"
 	"crypto/x509"
 	"encoding/json"
 	"encoding/json"
@@ -11,13 +9,14 @@ import (
 	"fmt"
 	"fmt"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/web"
 	"github.com/coreos/etcd/web"
-	"github.com/coreos/go-raft"
+
 	"io/ioutil"
 	"io/ioutil"
 	"net"
 	"net"
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
 	"os"
 	"os"
 	"os/signal"
 	"os/signal"
+	"path/filepath"
 	"runtime/pprof"
 	"runtime/pprof"
 	"strings"
 	"strings"
 	"time"
 	"time"
@@ -133,8 +132,6 @@ type TLSConfig struct {
 //
 //
 //------------------------------------------------------------------------------
 //------------------------------------------------------------------------------
 
 
-var raftServer *raft.Server
-var raftTransporter transporter
 var etcdStore *store.Store
 var etcdStore *store.Store
 var info *Info
 var info *Info
 
 
@@ -197,7 +194,6 @@ func main() {
 
 
 	if veryVerbose {
 	if veryVerbose {
 		verbose = true
 		verbose = true
-		raft.SetLogLevel(raft.Debug)
 	}
 	}
 
 
 	if machines != "" {
 	if machines != "" {
@@ -256,112 +252,6 @@ func main() {
 
 
 }
 }
 
 
-// Start the raft server
-func startRaft(tlsConfig TLSConfig) {
-	var err error
-
-	raftName := info.Name
-
-	// Create transporter for raft
-	raftTransporter = newTransporter(tlsConfig.Scheme, tlsConfig.Client)
-
-	// Create raft server
-	raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
-
-	if err != nil {
-		fatal(err)
-	}
-
-	// LoadSnapshot
-	if snapshot {
-		err = raftServer.LoadSnapshot()
-
-		if err == nil {
-			debugf("%s finished load snapshot", raftServer.Name())
-		} else {
-			debug(err)
-		}
-	}
-
-	raftServer.SetElectionTimeout(ElectionTimeout)
-	raftServer.SetHeartbeatTimeout(HeartbeatTimeout)
-
-	raftServer.Start()
-
-	if raftServer.IsLogEmpty() {
-
-		// start as a leader in a new cluster
-		if len(cluster) == 0 {
-
-			time.Sleep(time.Millisecond * 20)
-
-			// leader need to join self as a peer
-			for {
-				command := &JoinCommand{
-					Name:    raftServer.Name(),
-					RaftURL: argInfo.RaftURL,
-					EtcdURL: argInfo.EtcdURL,
-				}
-				_, err := raftServer.Do(command)
-				if err == nil {
-					break
-				}
-			}
-			debugf("%s start as a leader", raftServer.Name())
-
-			// start as a follower in a existing cluster
-		} else {
-
-			time.Sleep(time.Millisecond * 20)
-
-			for i := 0; i < retryTimes; i++ {
-
-				success := false
-				for _, machine := range cluster {
-					if len(machine) == 0 {
-						continue
-					}
-					err = joinCluster(raftServer, machine, tlsConfig.Scheme)
-					if err != nil {
-						if err.Error() == errors[103] {
-							fmt.Println(err)
-							os.Exit(1)
-						}
-						debugf("cannot join to cluster via machine %s %s", machine, err)
-					} else {
-						success = true
-						break
-					}
-				}
-
-				if success {
-					break
-				}
-
-				warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
-				time.Sleep(time.Second * RetryInterval)
-			}
-			if err != nil {
-				fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
-			}
-			debugf("%s success join to the cluster", raftServer.Name())
-		}
-
-	} else {
-		// rejoin the previous cluster
-		debugf("%s restart as a follower", raftServer.Name())
-	}
-
-	// open the snapshot
-	if snapshot {
-		go monitorSnapshot()
-	}
-
-	// start to response to raft requests
-	go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server)
-
-}
-
 // Create transporter using by raft server
 // Create transporter using by raft server
 // Create http or https transporter based on
 // Create http or https transporter based on
 // whether the user give the server cert and key
 // whether the user give the server cert and key
@@ -387,37 +277,6 @@ func dialTimeout(network, addr string) (net.Conn, error) {
 	return net.DialTimeout(network, addr, HTTPTimeout)
 	return net.DialTimeout(network, addr, HTTPTimeout)
 }
 }
 
 
-// Start to listen and response raft command
-func startRaftTransport(info Info, scheme string, tlsConf tls.Config) {
-	u, _ := url.Parse(info.RaftURL)
-	fmt.Printf("raft server [%s] listening on %s\n", info.Name, u)
-
-	raftMux := http.NewServeMux()
-
-	server := &http.Server{
-		Handler:   raftMux,
-		TLSConfig: &tlsConf,
-		Addr:      u.Host,
-	}
-
-	// internal commands
-	raftMux.HandleFunc("/name", NameHttpHandler)
-	raftMux.HandleFunc("/join", JoinHttpHandler)
-	raftMux.HandleFunc("/vote", VoteHttpHandler)
-	raftMux.HandleFunc("/log", GetLogHttpHandler)
-	raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
-	raftMux.HandleFunc("/snapshot", SnapshotHttpHandler)
-	raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
-	raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler)
-
-	if scheme == "http" {
-		fatal(server.ListenAndServe())
-	} else {
-		fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile))
-	}
-
-}
-
 // Start to listen and response client command
 // Start to listen and response client command
 func startEtcdTransport(info Info, scheme string, tlsConf tls.Config) {
 func startEtcdTransport(info Info, scheme string, tlsConf tls.Config) {
 	u, _ := url.Parse(info.EtcdURL)
 	u, _ := url.Parse(info.EtcdURL)
@@ -576,68 +435,3 @@ func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) {
 
 
 	return tls.RequireAndVerifyClientCert, certPool
 	return tls.RequireAndVerifyClientCert, certPool
 }
 }
-
-// Send join requests to the leader.
-func joinCluster(s *raft.Server, raftURL string, scheme string) error {
-	var b bytes.Buffer
-
-	command := &JoinCommand{
-		Name:    s.Name(),
-		RaftURL: info.RaftURL,
-		EtcdURL: info.EtcdURL,
-	}
-
-	json.NewEncoder(&b).Encode(command)
-
-	// t must be ok
-	t, ok := raftServer.Transporter().(transporter)
-
-	if !ok {
-		panic("wrong type")
-	}
-
-	joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"}
-
-	debugf("Send Join Request to %s", raftURL)
-
-	resp, err := t.Post(joinURL.String(), &b)
-
-	for {
-		if err != nil {
-			return fmt.Errorf("Unable to join: %v", err)
-		}
-		if resp != nil {
-			defer resp.Body.Close()
-			if resp.StatusCode == http.StatusOK {
-				return nil
-			}
-			if resp.StatusCode == http.StatusTemporaryRedirect {
-
-				address := resp.Header.Get("Location")
-				debugf("Send Join Request to %s", address)
-
-				json.NewEncoder(&b).Encode(command)
-
-				resp, err = t.Post(address, &b)
-
-			} else if resp.StatusCode == http.StatusBadRequest {
-				debug("Reach max number machines in the cluster")
-				return fmt.Errorf(errors[103])
-			} else {
-				return fmt.Errorf("Unable to join")
-			}
-		}
-
-	}
-	return fmt.Errorf("Unable to join: %v", err)
-}
-
-// Register commands to raft server
-func registerCommands() {
-	raft.RegisterCommand(&JoinCommand{})
-	raft.RegisterCommand(&SetCommand{})
-	raft.RegisterCommand(&GetCommand{})
-	raft.RegisterCommand(&DeleteCommand{})
-	raft.RegisterCommand(&WatchCommand{})
-	raft.RegisterCommand(&TestAndSetCommand{})
-}

+ 223 - 0
raft_server.go

@@ -0,0 +1,223 @@
+package main
+
+import (
+	"bytes"
+	"crypto/tls"
+	"encoding/json"
+	"fmt"
+	"net/http"
+	"net/url"
+	"os"
+	"time"
+
+	"github.com/coreos/go-raft"
+)
+
+var raftTransporter transporter
+var raftServer *raft.Server
+
+// Start the raft server
+func startRaft(tlsConfig TLSConfig) {
+	if veryVerbose {
+		raft.SetLogLevel(raft.Debug)
+	}
+
+	var err error
+
+	raftName := info.Name
+
+	// Create transporter for raft
+	raftTransporter = newTransporter(tlsConfig.Scheme, tlsConfig.Client)
+
+	// Create raft server
+	raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
+
+	if err != nil {
+		fatal(err)
+	}
+
+	// LoadSnapshot
+	if snapshot {
+		err = raftServer.LoadSnapshot()
+
+		if err == nil {
+			debugf("%s finished load snapshot", raftServer.Name())
+		} else {
+			debug(err)
+		}
+	}
+
+	raftServer.SetElectionTimeout(ElectionTimeout)
+	raftServer.SetHeartbeatTimeout(HeartbeatTimeout)
+
+	raftServer.Start()
+
+	if raftServer.IsLogEmpty() {
+
+		// start as a leader in a new cluster
+		if len(cluster) == 0 {
+
+			time.Sleep(time.Millisecond * 20)
+
+			// leader need to join self as a peer
+			for {
+				command := &JoinCommand{
+					Name:    raftServer.Name(),
+					RaftURL: argInfo.RaftURL,
+					EtcdURL: argInfo.EtcdURL,
+				}
+				_, err := raftServer.Do(command)
+				if err == nil {
+					break
+				}
+			}
+			debugf("%s start as a leader", raftServer.Name())
+
+			// start as a follower in a existing cluster
+		} else {
+
+			time.Sleep(time.Millisecond * 20)
+
+			for i := 0; i < retryTimes; i++ {
+
+				success := false
+				for _, machine := range cluster {
+					if len(machine) == 0 {
+						continue
+					}
+					err = joinCluster(raftServer, machine, tlsConfig.Scheme)
+					if err != nil {
+						if err.Error() == errors[103] {
+							fmt.Println(err)
+							os.Exit(1)
+						}
+						debugf("cannot join to cluster via machine %s %s", machine, err)
+					} else {
+						success = true
+						break
+					}
+				}
+
+				if success {
+					break
+				}
+
+				warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
+				time.Sleep(time.Second * RetryInterval)
+			}
+			if err != nil {
+				fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
+			}
+			debugf("%s success join to the cluster", raftServer.Name())
+		}
+
+	} else {
+		// rejoin the previous cluster
+		debugf("%s restart as a follower", raftServer.Name())
+	}
+
+	// open the snapshot
+	if snapshot {
+		go monitorSnapshot()
+	}
+
+	// start to response to raft requests
+	go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server)
+
+}
+
+// Start to listen and response raft command
+func startRaftTransport(info Info, scheme string, tlsConf tls.Config) {
+	u, _ := url.Parse(info.RaftURL)
+	fmt.Printf("raft server [%s] listening on %s\n", info.Name, u)
+
+	raftMux := http.NewServeMux()
+
+	server := &http.Server{
+		Handler:   raftMux,
+		TLSConfig: &tlsConf,
+		Addr:      u.Host,
+	}
+
+	// internal commands
+	raftMux.HandleFunc("/name", NameHttpHandler)
+	raftMux.HandleFunc("/join", JoinHttpHandler)
+	raftMux.HandleFunc("/vote", VoteHttpHandler)
+	raftMux.HandleFunc("/log", GetLogHttpHandler)
+	raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
+	raftMux.HandleFunc("/snapshot", SnapshotHttpHandler)
+	raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
+	raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler)
+
+	if scheme == "http" {
+		fatal(server.ListenAndServe())
+	} else {
+		fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile))
+	}
+
+}
+
+// Send join requests to the leader.
+func joinCluster(s *raft.Server, raftURL string, scheme string) error {
+	var b bytes.Buffer
+
+	command := &JoinCommand{
+		Name:    s.Name(),
+		RaftURL: info.RaftURL,
+		EtcdURL: info.EtcdURL,
+	}
+
+	json.NewEncoder(&b).Encode(command)
+
+	// t must be ok
+	t, ok := raftServer.Transporter().(transporter)
+
+	if !ok {
+		panic("wrong type")
+	}
+
+	joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"}
+
+	debugf("Send Join Request to %s", raftURL)
+
+	resp, err := t.Post(joinURL.String(), &b)
+
+	for {
+		if err != nil {
+			return fmt.Errorf("Unable to join: %v", err)
+		}
+		if resp != nil {
+			defer resp.Body.Close()
+			if resp.StatusCode == http.StatusOK {
+				return nil
+			}
+			if resp.StatusCode == http.StatusTemporaryRedirect {
+
+				address := resp.Header.Get("Location")
+				debugf("Send Join Request to %s", address)
+
+				json.NewEncoder(&b).Encode(command)
+
+				resp, err = t.Post(address, &b)
+
+			} else if resp.StatusCode == http.StatusBadRequest {
+				debug("Reach max number machines in the cluster")
+				return fmt.Errorf(errors[103])
+			} else {
+				return fmt.Errorf("Unable to join")
+			}
+		}
+
+	}
+	return fmt.Errorf("Unable to join: %v", err)
+}
+
+// Register commands to raft server
+func registerCommands() {
+	raft.RegisterCommand(&JoinCommand{})
+	raft.RegisterCommand(&SetCommand{})
+	raft.RegisterCommand(&GetCommand{})
+	raft.RegisterCommand(&DeleteCommand{})
+	raft.RegisterCommand(&WatchCommand{})
+	raft.RegisterCommand(&TestAndSetCommand{})
+}