|
|
@@ -17,7 +17,6 @@ import (
|
|
|
"net/url"
|
|
|
"os"
|
|
|
"os/signal"
|
|
|
- "path"
|
|
|
"runtime/pprof"
|
|
|
"strings"
|
|
|
"time"
|
|
|
@@ -60,17 +59,17 @@ func init() {
|
|
|
flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
|
|
|
|
|
|
flag.StringVar(&argInfo.Name, "n", "", "the node name (required)")
|
|
|
- flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the port to communicate with clients")
|
|
|
- flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the port to communicate with servers")
|
|
|
- flag.StringVar(&argInfo.WebURL, "w", "", "the port of web interface")
|
|
|
+ flag.StringVar(&argInfo.EtcdURL, "c", "127.0.0.1:4001", "the hostname:port for etcd client communication")
|
|
|
+ flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the hostname:port for raft server communication")
|
|
|
+ flag.StringVar(&argInfo.WebURL, "w", "", "the hostname:port of web interface")
|
|
|
|
|
|
- flag.StringVar(&argInfo.ServerCAFile, "serverCAFile", "", "the path of the CAFile")
|
|
|
- flag.StringVar(&argInfo.ServerCertFile, "serverCert", "", "the cert file of the server")
|
|
|
- flag.StringVar(&argInfo.ServerKeyFile, "serverKey", "", "the key file of the server")
|
|
|
+ flag.StringVar(&argInfo.RaftTLS.CAFile, "serverCAFile", "", "the path of the CAFile")
|
|
|
+ flag.StringVar(&argInfo.RaftTLS.CertFile, "serverCert", "", "the cert file of the server")
|
|
|
+ flag.StringVar(&argInfo.RaftTLS.KeyFile, "serverKey", "", "the key file of the server")
|
|
|
|
|
|
- flag.StringVar(&argInfo.ClientCAFile, "clientCAFile", "", "the path of the client CAFile")
|
|
|
- flag.StringVar(&argInfo.ClientCertFile, "clientCert", "", "the cert file of the client")
|
|
|
- flag.StringVar(&argInfo.ClientKeyFile, "clientKey", "", "the key file of the client")
|
|
|
+ flag.StringVar(&argInfo.EtcdTLS.CAFile, "clientCAFile", "", "the path of the client CAFile")
|
|
|
+ flag.StringVar(&argInfo.EtcdTLS.CertFile, "clientCert", "", "the cert file of the client")
|
|
|
+ flag.StringVar(&argInfo.EtcdTLS.KeyFile, "clientKey", "", "the key file of the client")
|
|
|
|
|
|
flag.StringVar(&dirPath, "d", ".", "the directory to store log and snapshot")
|
|
|
|
|
|
@@ -87,12 +86,6 @@ func init() {
|
|
|
flag.StringVar(&cpuprofile, "cpuprofile", "", "write cpu profile to file")
|
|
|
}
|
|
|
|
|
|
-// CONSTANTS
|
|
|
-const (
|
|
|
- RaftServer = iota
|
|
|
- EtcdServer
|
|
|
-)
|
|
|
-
|
|
|
const (
|
|
|
ELECTIONTIMEOUT = 200 * time.Millisecond
|
|
|
HEARTBEATTIMEOUT = 50 * time.Millisecond
|
|
|
@@ -110,6 +103,12 @@ const (
|
|
|
//
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
|
+type TLSInfo struct {
|
|
|
+ CertFile string `json:"CertFile"`
|
|
|
+ KeyFile string `json:"KeyFile"`
|
|
|
+ CAFile string `json:"CAFile"`
|
|
|
+}
|
|
|
+
|
|
|
type Info struct {
|
|
|
Name string `json:"name"`
|
|
|
|
|
|
@@ -117,13 +116,8 @@ type Info struct {
|
|
|
EtcdURL string `json:"etcdURL"`
|
|
|
WebURL string `json:"webURL"`
|
|
|
|
|
|
- ServerCertFile string `json:"serverCertFile"`
|
|
|
- ServerKeyFile string `json:"serverKeyFile"`
|
|
|
- ServerCAFile string `json:"serverCAFile"`
|
|
|
-
|
|
|
- ClientCertFile string `json:"clientCertFile"`
|
|
|
- ClientKeyFile string `json:"clientKeyFile"`
|
|
|
- ClientCAFile string `json:"clientCAFile"`
|
|
|
+ RaftTLS TLSInfo `json:"raftTLS"`
|
|
|
+ EtcdTLS TLSInfo `json:"etcdTLS"`
|
|
|
}
|
|
|
|
|
|
//------------------------------------------------------------------------------
|
|
|
@@ -143,18 +137,27 @@ var info *Info
|
|
|
//
|
|
|
//------------------------------------------------------------------------------
|
|
|
|
|
|
-// Check a URL and clean it up if the user forgot the schema
|
|
|
-func checkURL(u string, defaultSchema string) string {
|
|
|
- p, err := url.Parse(u)
|
|
|
+// sanitizeURL will cleanup a host string in the format hostname:port and
|
|
|
+// attach a schema.
|
|
|
+func sanitizeURL(host string, defaultScheme string) string {
|
|
|
+ // Blank URLs are fine input, just return it
|
|
|
+ if len(host) == 0 {
|
|
|
+ return host
|
|
|
+ }
|
|
|
|
|
|
+ p, err := url.Parse(host)
|
|
|
if err != nil {
|
|
|
- panic(err)
|
|
|
+ fatal(err)
|
|
|
}
|
|
|
|
|
|
- if len(p.Host) == 0 && len(defaultSchema) != 0 {
|
|
|
- return checkURL(fmt.Sprintf("%s://%s", defaultSchema, u), "")
|
|
|
+ // Make sure the host is in Host:Port format
|
|
|
+ _, _, err = net.SplitHostPort(host)
|
|
|
+ if err != nil {
|
|
|
+ fatal(err)
|
|
|
}
|
|
|
|
|
|
+ p = &url.URL{Host: host, Scheme: defaultScheme}
|
|
|
+
|
|
|
return p.String()
|
|
|
}
|
|
|
|
|
|
@@ -200,15 +203,24 @@ func main() {
|
|
|
cluster = strings.Split(string(b), ",")
|
|
|
}
|
|
|
|
|
|
- // Otherwise ask user for info and write it to file.
|
|
|
- argInfo.Name = strings.TrimSpace(argInfo.Name)
|
|
|
+ raftTLSConfig, ok := tlsConfigFromInfo(argInfo.RaftTLS)
|
|
|
+ if !ok {
|
|
|
+ fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
|
|
|
+ }
|
|
|
+
|
|
|
+ etcdTLSConfig, ok := tlsConfigFromInfo(argInfo.EtcdTLS)
|
|
|
+ if !ok {
|
|
|
+ fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
|
|
|
+ }
|
|
|
|
|
|
+ argInfo.Name = strings.TrimSpace(argInfo.Name)
|
|
|
if argInfo.Name == "" {
|
|
|
- fatal("Please give the name of the server")
|
|
|
+ fatal("ERROR: server name required. e.g. '-n=server_name'")
|
|
|
}
|
|
|
|
|
|
- argInfo.RaftURL = checkURL(argInfo.RaftURL, "http")
|
|
|
- argInfo.EtcdURL = checkURL(argInfo.EtcdURL, "http")
|
|
|
+ argInfo.RaftURL = sanitizeURL(argInfo.RaftURL, raftTLSConfig.Scheme)
|
|
|
+ argInfo.EtcdURL = sanitizeURL(argInfo.EtcdURL, etcdTLSConfig.Scheme)
|
|
|
+ argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http")
|
|
|
|
|
|
// Setup commands.
|
|
|
registerCommands()
|
|
|
@@ -220,40 +232,30 @@ func main() {
|
|
|
|
|
|
info = getInfo(dirPath)
|
|
|
|
|
|
- raftTlsConfs, ok := tlsConf(RaftServer)
|
|
|
- if !ok {
|
|
|
- fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
|
|
|
- }
|
|
|
-
|
|
|
- etcdTlsConfs, ok := tlsConf(EtcdServer)
|
|
|
- if !ok {
|
|
|
- fatal("Please specify cert and key file or cert and key file and CAFile or none of the three")
|
|
|
- }
|
|
|
-
|
|
|
// Create etcd key-value store
|
|
|
etcdStore = store.CreateStore(maxSize)
|
|
|
|
|
|
- startRaft(raftTlsConfs)
|
|
|
+ startRaft(raftTLSConfig)
|
|
|
|
|
|
if argInfo.WebURL != "" {
|
|
|
// start web
|
|
|
- etcdStore.SetMessager(storeMsg)
|
|
|
+ argInfo.WebURL = sanitizeURL(argInfo.WebURL, "http")
|
|
|
go webHelper()
|
|
|
go web.Start(raftServer, argInfo.WebURL)
|
|
|
}
|
|
|
|
|
|
- startEtcdTransport(*info, etcdTlsConfs[0])
|
|
|
+ startEtcdTransport(*info, etcdTLSConfig.Scheme, etcdTLSConfig.Server)
|
|
|
|
|
|
}
|
|
|
|
|
|
// Start the raft server
|
|
|
-func startRaft(tlsConfs []*tls.Config) {
|
|
|
+func startRaft(tlsConfig TLSConfig) {
|
|
|
var err error
|
|
|
|
|
|
raftName := info.Name
|
|
|
|
|
|
// Create transporter for raft
|
|
|
- raftTransporter = newTransporter(tlsConfs[1])
|
|
|
+ raftTransporter = newTransporter(tlsConfig.Scheme, tlsConfig.Client)
|
|
|
|
|
|
// Create raft server
|
|
|
raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
|
|
|
@@ -348,37 +350,29 @@ func startRaft(tlsConfs []*tls.Config) {
|
|
|
}
|
|
|
|
|
|
// start to response to raft requests
|
|
|
- go startRaftTransport(*info, tlsConfs[0])
|
|
|
+ go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server)
|
|
|
|
|
|
}
|
|
|
|
|
|
// Create transporter using by raft server
|
|
|
// Create http or https transporter based on
|
|
|
// whether the user give the server cert and key
|
|
|
-func newTransporter(tlsConf *tls.Config) transporter {
|
|
|
+func newTransporter(scheme string, tlsConf tls.Config) transporter {
|
|
|
t := transporter{}
|
|
|
|
|
|
- if tlsConf == nil {
|
|
|
- t.scheme = "http://"
|
|
|
-
|
|
|
- t.client = &http.Client{
|
|
|
- Transport: &http.Transport{
|
|
|
- Dial: dialTimeout,
|
|
|
- },
|
|
|
- }
|
|
|
-
|
|
|
- } else {
|
|
|
- t.scheme = "https://"
|
|
|
+ t.scheme = scheme
|
|
|
|
|
|
- tr := &http.Transport{
|
|
|
- TLSClientConfig: tlsConf,
|
|
|
- Dial: dialTimeout,
|
|
|
- DisableCompression: true,
|
|
|
- }
|
|
|
+ tr := &http.Transport{
|
|
|
+ Dial: dialTimeout,
|
|
|
+ }
|
|
|
|
|
|
- t.client = &http.Client{Transport: tr}
|
|
|
+ if scheme == "https" {
|
|
|
+ tr.TLSClientConfig = &tlsConf
|
|
|
+ tr.DisableCompression = true
|
|
|
}
|
|
|
|
|
|
+ t.client = &http.Client{Transport: tr}
|
|
|
+
|
|
|
return t
|
|
|
}
|
|
|
|
|
|
@@ -388,113 +382,109 @@ func dialTimeout(network, addr string) (net.Conn, error) {
|
|
|
}
|
|
|
|
|
|
// Start to listen and response raft command
|
|
|
-func startRaftTransport(info Info, tlsConf *tls.Config) {
|
|
|
-
|
|
|
- // internal commands
|
|
|
- http.HandleFunc("/name", NameHttpHandler)
|
|
|
- http.HandleFunc("/join", JoinHttpHandler)
|
|
|
- http.HandleFunc("/vote", VoteHttpHandler)
|
|
|
- http.HandleFunc("/log", GetLogHttpHandler)
|
|
|
- http.HandleFunc("/log/append", AppendEntriesHttpHandler)
|
|
|
- http.HandleFunc("/snapshot", SnapshotHttpHandler)
|
|
|
- http.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
|
|
|
- http.HandleFunc("/etcdURL", EtcdURLHttpHandler)
|
|
|
-
|
|
|
+func startRaftTransport(info Info, scheme string, tlsConf tls.Config) {
|
|
|
u, _ := url.Parse(info.RaftURL)
|
|
|
fmt.Printf("raft server [%s] listening on %s\n", info.Name, u)
|
|
|
|
|
|
- if tlsConf == nil {
|
|
|
- http.ListenAndServe(u.Host, nil)
|
|
|
+ raftMux := http.NewServeMux()
|
|
|
+
|
|
|
+ server := &http.Server{
|
|
|
+ Handler: raftMux,
|
|
|
+ TLSConfig: &tlsConf,
|
|
|
+ Addr: u.Host,
|
|
|
+ }
|
|
|
+
|
|
|
+ // internal commands
|
|
|
+ raftMux.HandleFunc("/name", NameHttpHandler)
|
|
|
+ raftMux.HandleFunc("/join", JoinHttpHandler)
|
|
|
+ raftMux.HandleFunc("/vote", VoteHttpHandler)
|
|
|
+ raftMux.HandleFunc("/log", GetLogHttpHandler)
|
|
|
+ raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
|
|
|
+ raftMux.HandleFunc("/snapshot", SnapshotHttpHandler)
|
|
|
+ raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
|
|
|
+ raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler)
|
|
|
+
|
|
|
+ if scheme == "http" {
|
|
|
+ fatal(server.ListenAndServe())
|
|
|
} else {
|
|
|
- server := &http.Server{
|
|
|
- TLSConfig: tlsConf,
|
|
|
- Addr: u.Host,
|
|
|
- }
|
|
|
- fatal(server.ListenAndServeTLS(info.ServerCertFile, argInfo.ServerKeyFile))
|
|
|
+ fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile))
|
|
|
}
|
|
|
|
|
|
}
|
|
|
|
|
|
// Start to listen and response client command
|
|
|
-func startEtcdTransport(info Info, tlsConf *tls.Config) {
|
|
|
- // external commands
|
|
|
- http.HandleFunc("/"+version+"/keys/", Multiplexer)
|
|
|
- http.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
|
|
|
- http.HandleFunc("/leader", LeaderHttpHandler)
|
|
|
- http.HandleFunc("/machines", MachinesHttpHandler)
|
|
|
- http.HandleFunc("/", VersionHttpHandler)
|
|
|
- http.HandleFunc("/stats", StatsHttpHandler)
|
|
|
- http.HandleFunc("/test/", TestHttpHandler)
|
|
|
-
|
|
|
+func startEtcdTransport(info Info, scheme string, tlsConf tls.Config) {
|
|
|
u, _ := url.Parse(info.EtcdURL)
|
|
|
fmt.Printf("etcd server [%s] listening on %s\n", info.Name, u)
|
|
|
|
|
|
- if tlsConf == nil {
|
|
|
- fatal(http.ListenAndServe(u.Host, nil))
|
|
|
+ etcdMux := http.NewServeMux()
|
|
|
+
|
|
|
+ server := &http.Server{
|
|
|
+ Handler: etcdMux,
|
|
|
+ TLSConfig: &tlsConf,
|
|
|
+ Addr: u.Host,
|
|
|
+ }
|
|
|
+
|
|
|
+ // external commands
|
|
|
+ etcdMux.HandleFunc("/"+version+"/keys/", Multiplexer)
|
|
|
+ etcdMux.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
|
|
|
+ etcdMux.HandleFunc("/leader", LeaderHttpHandler)
|
|
|
+ etcdMux.HandleFunc("/machines", MachinesHttpHandler)
|
|
|
+ etcdMux.HandleFunc("/", VersionHttpHandler)
|
|
|
+ etcdMux.HandleFunc("/stats", StatsHttpHandler)
|
|
|
+ etcdMux.HandleFunc("/test/", TestHttpHandler)
|
|
|
+
|
|
|
+ if scheme == "http" {
|
|
|
+ fatal(server.ListenAndServe())
|
|
|
} else {
|
|
|
- server := &http.Server{
|
|
|
- TLSConfig: tlsConf,
|
|
|
- Addr: u.Host,
|
|
|
- }
|
|
|
- fatal(server.ListenAndServeTLS(info.ClientCertFile, info.ClientKeyFile))
|
|
|
+ fatal(server.ListenAndServeTLS(info.EtcdTLS.CertFile, info.EtcdTLS.KeyFile))
|
|
|
}
|
|
|
}
|
|
|
|
|
|
//--------------------------------------
|
|
|
// Config
|
|
|
//--------------------------------------
|
|
|
-func tlsConf(source int) ([]*tls.Config, bool) {
|
|
|
+
|
|
|
+type TLSConfig struct {
|
|
|
+ Scheme string
|
|
|
+ Server tls.Config
|
|
|
+ Client tls.Config
|
|
|
+}
|
|
|
+
|
|
|
+func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) {
|
|
|
var keyFile, certFile, CAFile string
|
|
|
var tlsCert tls.Certificate
|
|
|
- var isAuth bool
|
|
|
var err error
|
|
|
|
|
|
- switch source {
|
|
|
-
|
|
|
- case RaftServer:
|
|
|
- keyFile = info.ServerKeyFile
|
|
|
- certFile = info.ServerCertFile
|
|
|
- CAFile = info.ServerCAFile
|
|
|
+ t.Scheme = "http"
|
|
|
|
|
|
- if keyFile != "" && certFile != "" {
|
|
|
- tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile)
|
|
|
- if err == nil {
|
|
|
- fatal(err)
|
|
|
- }
|
|
|
- isAuth = true
|
|
|
- }
|
|
|
-
|
|
|
- case EtcdServer:
|
|
|
- keyFile = info.ClientKeyFile
|
|
|
- certFile = info.ClientCertFile
|
|
|
- CAFile = info.ClientCAFile
|
|
|
- }
|
|
|
+ keyFile = info.KeyFile
|
|
|
+ certFile = info.CertFile
|
|
|
+ CAFile = info.CAFile
|
|
|
|
|
|
// If the user do not specify key file, cert file and
|
|
|
// CA file, the type will be HTTP
|
|
|
if keyFile == "" && certFile == "" && CAFile == "" {
|
|
|
- return []*tls.Config{nil, nil}, true
|
|
|
+ return t, true
|
|
|
}
|
|
|
|
|
|
- if keyFile != "" && certFile != "" {
|
|
|
- serverConf := &tls.Config{}
|
|
|
- serverConf.ClientAuth, serverConf.ClientCAs = newCertPool(CAFile)
|
|
|
-
|
|
|
- if isAuth {
|
|
|
- raftTransConf := &tls.Config{
|
|
|
- Certificates: []tls.Certificate{tlsCert},
|
|
|
- InsecureSkipVerify: true,
|
|
|
- }
|
|
|
- return []*tls.Config{serverConf, raftTransConf}, true
|
|
|
- }
|
|
|
-
|
|
|
- return []*tls.Config{serverConf, nil}, true
|
|
|
+ // both the key and cert must be present
|
|
|
+ if keyFile == "" || certFile == "" {
|
|
|
+ return t, false
|
|
|
+ }
|
|
|
|
|
|
+ tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile)
|
|
|
+ if err != nil {
|
|
|
+ fatal(err)
|
|
|
}
|
|
|
|
|
|
- // bad specification
|
|
|
- return nil, false
|
|
|
+ t.Scheme = "https"
|
|
|
+ t.Server.ClientAuth, t.Server.ClientCAs = newCertPool(CAFile)
|
|
|
+
|
|
|
+ t.Client.Certificates = []tls.Certificate{tlsCert}
|
|
|
+ t.Client.InsecureSkipVerify = true
|
|
|
|
|
|
+ return t, true
|
|
|
}
|
|
|
|
|
|
func parseInfo(path string) *Info {
|
|
|
@@ -582,7 +572,7 @@ func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) {
|
|
|
}
|
|
|
|
|
|
// Send join requests to the leader.
|
|
|
-func joinCluster(s *raft.Server, serverName string) error {
|
|
|
+func joinCluster(s *raft.Server, raftURL string) error {
|
|
|
var b bytes.Buffer
|
|
|
|
|
|
command := &JoinCommand{
|
|
|
@@ -600,9 +590,11 @@ func joinCluster(s *raft.Server, serverName string) error {
|
|
|
panic("wrong type")
|
|
|
}
|
|
|
|
|
|
- debugf("Send Join Request to %s", serverName)
|
|
|
+ joinURL := url.URL{Host: raftURL, Scheme: raftTransporter.scheme, Path: "/join"}
|
|
|
|
|
|
- resp, err := t.Post(fmt.Sprintf("%s/join", serverName), &b)
|
|
|
+ debugf("Send Join Request to %s", raftURL)
|
|
|
+
|
|
|
+ resp, err := t.Post(joinURL.String(), &b)
|
|
|
|
|
|
for {
|
|
|
if err != nil {
|
|
|
@@ -617,15 +609,10 @@ func joinCluster(s *raft.Server, serverName string) error {
|
|
|
|
|
|
address := resp.Header.Get("Location")
|
|
|
debugf("Send Join Request to %s", address)
|
|
|
- u, err := url.Parse(address)
|
|
|
-
|
|
|
- if err != nil {
|
|
|
- return fmt.Errorf("Unable to join: %s", err.Error())
|
|
|
- }
|
|
|
|
|
|
json.NewEncoder(&b).Encode(command)
|
|
|
|
|
|
- resp, err = t.Post(path.Join(u.Host, u.Path), &b)
|
|
|
+ resp, err = t.Post(address, &b)
|
|
|
|
|
|
} else if resp.StatusCode == http.StatusBadRequest {
|
|
|
debug("Reach max number machines in the cluster")
|