Browse Source

Merge pull request #109 from xiangli-cmu/fmilo

split raft server logic into separate module
Xiang Li 12 years ago
parent
commit
fe2d1c1b0e
6 changed files with 448 additions and 411 deletions
  1. 139 0
      config.go
  2. 8 408
      etcd.go
  3. 17 3
      etcd_handlers.go
  4. 221 0
      raft_server.go
  5. 27 0
      transporter.go
  6. 36 0
      util.go

+ 139 - 0
config.go

@@ -0,0 +1,139 @@
+package main
+
+import (
+	"crypto/tls"
+	"crypto/x509"
+	"encoding/json"
+	"encoding/pem"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+)
+
+//--------------------------------------
+// Config
+//--------------------------------------
+
+func parseInfo(path string) *Info {
+	file, err := os.Open(path)
+
+	if err != nil {
+		return nil
+	}
+	defer file.Close()
+
+	info := &Info{}
+
+	content, err := ioutil.ReadAll(file)
+	if err != nil {
+		fatalf("Unable to read info: %v", err)
+		return nil
+	}
+
+	if err = json.Unmarshal(content, &info); err != nil {
+		fatalf("Unable to parse info: %v", err)
+		return nil
+	}
+
+	return info
+}
+
+// Get the server info from previous conf file
+// or from the user
+func getInfo(path string) *Info {
+
+	// Read in the server info if available.
+	infoPath := filepath.Join(path, "info")
+
+	// Delete the old configuration if exist
+	if force {
+		logPath := filepath.Join(path, "log")
+		confPath := filepath.Join(path, "conf")
+		snapshotPath := filepath.Join(path, "snapshot")
+		os.Remove(infoPath)
+		os.Remove(logPath)
+		os.Remove(confPath)
+		os.RemoveAll(snapshotPath)
+	}
+
+	info := parseInfo(infoPath)
+	if info != nil {
+		infof("Found node configuration in '%s'. Ignoring flags", infoPath)
+		return info
+	}
+
+	info = &argInfo
+
+	// Write to file.
+	content, _ := json.MarshalIndent(info, "", " ")
+	content = []byte(string(content) + "\n")
+	if err := ioutil.WriteFile(infoPath, content, 0644); err != nil {
+		fatalf("Unable to write info to file: %v", err)
+	}
+
+	infof("Wrote node configuration to '%s'", infoPath)
+
+	return info
+}
+
+func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) {
+	var keyFile, certFile, CAFile string
+	var tlsCert tls.Certificate
+	var err error
+
+	t.Scheme = "http"
+
+	keyFile = info.KeyFile
+	certFile = info.CertFile
+	CAFile = info.CAFile
+
+	// If the user do not specify key file, cert file and
+	// CA file, the type will be HTTP
+	if keyFile == "" && certFile == "" && CAFile == "" {
+		return t, true
+	}
+
+	// both the key and cert must be present
+	if keyFile == "" || certFile == "" {
+		return t, false
+	}
+
+	tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile)
+	if err != nil {
+		fatal(err)
+	}
+
+	t.Scheme = "https"
+	t.Server.ClientAuth, t.Server.ClientCAs = newCertPool(CAFile)
+
+	// The client should trust the RootCA that the Server uses since
+	// everyone is a peer in the network.
+	t.Client.Certificates = []tls.Certificate{tlsCert}
+	t.Client.RootCAs = t.Server.ClientCAs
+
+	return t, true
+}
+
+// newCertPool creates x509 certPool and corresponding Auth Type.
+// If the given CAfile is valid, add the cert into the pool and verify the clients'
+// certs against the cert in the pool.
+// If the given CAfile is empty, do not verify the clients' cert.
+// If the given CAfile is not valid, fatal.
+func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) {
+	if CAFile == "" {
+		return tls.NoClientCert, nil
+	}
+	pemByte, err := ioutil.ReadFile(CAFile)
+	check(err)
+
+	block, pemByte := pem.Decode(pemByte)
+
+	cert, err := x509.ParseCertificate(block.Bytes)
+	check(err)
+
+	certPool := x509.NewCertPool()
+
+	certPool.AddCert(cert)
+
+	return tls.RequireAndVerifyClientCert, certPool
+}

+ 8 - 408
etcd.go

@@ -1,19 +1,11 @@
 package main
 
 import (
-	"path/filepath"
-	"bytes"
 	"crypto/tls"
-	"crypto/x509"
-	"encoding/json"
-	"encoding/pem"
 	"flag"
-	"fmt"
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/web"
-	"github.com/coreos/go-raft"
 	"io/ioutil"
-	"net"
 	"net/http"
 	"net/url"
 	"os"
@@ -133,8 +125,6 @@ type TLSConfig struct {
 //
 //------------------------------------------------------------------------------
 
-var raftServer *raft.Server
-var raftTransporter transporter
 var etcdStore *store.Store
 var info *Info
 
@@ -144,30 +134,6 @@ var info *Info
 //
 //------------------------------------------------------------------------------
 
-// sanitizeURL will cleanup a host string in the format hostname:port and
-// attach a schema.
-func sanitizeURL(host string, defaultScheme string) string {
-	// Blank URLs are fine input, just return it
-	if len(host) == 0 {
-		return host
-	}
-
-	p, err := url.Parse(host)
-	if err != nil {
-		fatal(err)
-	}
-
-	// Make sure the host is in Host:Port format
-	_, _, err = net.SplitHostPort(host)
-	if err != nil {
-		fatal(err)
-	}
-
-	p = &url.URL{Host: host, Scheme: defaultScheme}
-
-	return p.String()
-}
-
 //--------------------------------------
 // Main
 //--------------------------------------
@@ -187,7 +153,7 @@ func main() {
 		signal.Notify(c, os.Interrupt)
 		go func() {
 			for sig := range c {
-				fmt.Printf("captured %v, stopping profiler and exiting..", sig)
+				infof("captured %v, stopping profiler and exiting..", sig)
 				pprof.StopCPUProfile()
 				os.Exit(1)
 			}
@@ -197,7 +163,6 @@ func main() {
 
 	if veryVerbose {
 		verbose = true
-		raft.SetLogLevel(raft.Debug)
 	}
 
 	if machines != "" {
@@ -256,388 +221,23 @@ func main() {
 
 }
 
-// Start the raft server
-func startRaft(tlsConfig TLSConfig) {
-	var err error
-
-	raftName := info.Name
-
-	// Create transporter for raft
-	raftTransporter = newTransporter(tlsConfig.Scheme, tlsConfig.Client)
-
-	// Create raft server
-	raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
-
-	if err != nil {
-		fatal(err)
-	}
-
-	// LoadSnapshot
-	if snapshot {
-		err = raftServer.LoadSnapshot()
-
-		if err == nil {
-			debugf("%s finished load snapshot", raftServer.Name())
-		} else {
-			debug(err)
-		}
-	}
-
-	raftServer.SetElectionTimeout(ElectionTimeout)
-	raftServer.SetHeartbeatTimeout(HeartbeatTimeout)
-
-	raftServer.Start()
-
-	if raftServer.IsLogEmpty() {
-
-		// start as a leader in a new cluster
-		if len(cluster) == 0 {
-
-			time.Sleep(time.Millisecond * 20)
-
-			// leader need to join self as a peer
-			for {
-				command := &JoinCommand{
-					Name:    raftServer.Name(),
-					RaftURL: argInfo.RaftURL,
-					EtcdURL: argInfo.EtcdURL,
-				}
-				_, err := raftServer.Do(command)
-				if err == nil {
-					break
-				}
-			}
-			debugf("%s start as a leader", raftServer.Name())
-
-			// start as a follower in a existing cluster
-		} else {
-
-			time.Sleep(time.Millisecond * 20)
-
-			for i := 0; i < retryTimes; i++ {
-
-				success := false
-				for _, machine := range cluster {
-					if len(machine) == 0 {
-						continue
-					}
-					err = joinCluster(raftServer, machine, tlsConfig.Scheme)
-					if err != nil {
-						if err.Error() == errors[103] {
-							fmt.Println(err)
-							os.Exit(1)
-						}
-						debugf("cannot join to cluster via machine %s %s", machine, err)
-					} else {
-						success = true
-						break
-					}
-				}
-
-				if success {
-					break
-				}
-
-				warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
-				time.Sleep(time.Second * RetryInterval)
-			}
-			if err != nil {
-				fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
-			}
-			debugf("%s success join to the cluster", raftServer.Name())
-		}
-
-	} else {
-		// rejoin the previous cluster
-		debugf("%s restart as a follower", raftServer.Name())
-	}
-
-	// open the snapshot
-	if snapshot {
-		go monitorSnapshot()
-	}
-
-	// start to response to raft requests
-	go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server)
-
-}
-
-// Create transporter using by raft server
-// Create http or https transporter based on
-// whether the user give the server cert and key
-func newTransporter(scheme string, tlsConf tls.Config) transporter {
-	t := transporter{}
-
-	tr := &http.Transport{
-		Dial: dialTimeout,
-	}
-
-	if scheme == "https" {
-		tr.TLSClientConfig = &tlsConf
-		tr.DisableCompression = true
-	}
-
-	t.client = &http.Client{Transport: tr}
-
-	return t
-}
-
-// Dial with timeout
-func dialTimeout(network, addr string) (net.Conn, error) {
-	return net.DialTimeout(network, addr, HTTPTimeout)
-}
-
-// Start to listen and response raft command
-func startRaftTransport(info Info, scheme string, tlsConf tls.Config) {
-	u, _ := url.Parse(info.RaftURL)
-	fmt.Printf("raft server [%s] listening on %s\n", info.Name, u)
-
-	raftMux := http.NewServeMux()
-
-	server := &http.Server{
-		Handler:   raftMux,
-		TLSConfig: &tlsConf,
-		Addr:      u.Host,
-	}
-
-	// internal commands
-	raftMux.HandleFunc("/name", NameHttpHandler)
-	raftMux.HandleFunc("/join", JoinHttpHandler)
-	raftMux.HandleFunc("/vote", VoteHttpHandler)
-	raftMux.HandleFunc("/log", GetLogHttpHandler)
-	raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
-	raftMux.HandleFunc("/snapshot", SnapshotHttpHandler)
-	raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
-	raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler)
-
-	if scheme == "http" {
-		fatal(server.ListenAndServe())
-	} else {
-		fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile))
-	}
-
-}
-
 // Start to listen and response client command
 func startEtcdTransport(info Info, scheme string, tlsConf tls.Config) {
-	u, _ := url.Parse(info.EtcdURL)
-	fmt.Printf("etcd server [%s] listening on %s\n", info.Name, u)
-
-	etcdMux := http.NewServeMux()
+	u, err := url.Parse(info.EtcdURL)
+	if err != nil {
+		fatalf("invalid url '%s': %s", info.EtcdURL, err)
+	}
+	infof("etcd server [%s:%s]", info.Name, u)
 
-	server := &http.Server{
-		Handler:   etcdMux,
+	server := http.Server{
+		Handler:   NewEtcdMuxer(),
 		TLSConfig: &tlsConf,
 		Addr:      u.Host,
 	}
 
-	// external commands
-	etcdMux.HandleFunc("/"+version+"/keys/", Multiplexer)
-	etcdMux.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
-	etcdMux.HandleFunc("/leader", LeaderHttpHandler)
-	etcdMux.HandleFunc("/machines", MachinesHttpHandler)
-	etcdMux.HandleFunc("/", VersionHttpHandler)
-	etcdMux.HandleFunc("/stats", StatsHttpHandler)
-	etcdMux.HandleFunc("/test/", TestHttpHandler)
-
 	if scheme == "http" {
 		fatal(server.ListenAndServe())
 	} else {
 		fatal(server.ListenAndServeTLS(info.EtcdTLS.CertFile, info.EtcdTLS.KeyFile))
 	}
 }
-
-//--------------------------------------
-// Config
-//--------------------------------------
-
-func tlsConfigFromInfo(info TLSInfo) (t TLSConfig, ok bool) {
-	var keyFile, certFile, CAFile string
-	var tlsCert tls.Certificate
-	var err error
-
-	t.Scheme = "http"
-
-	keyFile = info.KeyFile
-	certFile = info.CertFile
-	CAFile = info.CAFile
-
-	// If the user do not specify key file, cert file and
-	// CA file, the type will be HTTP
-	if keyFile == "" && certFile == "" && CAFile == "" {
-		return t, true
-	}
-
-	// both the key and cert must be present
-	if keyFile == "" || certFile == "" {
-		return t, false
-	}
-
-	tlsCert, err = tls.LoadX509KeyPair(certFile, keyFile)
-	if err != nil {
-		fatal(err)
-	}
-
-	t.Scheme = "https"
-	t.Server.ClientAuth, t.Server.ClientCAs = newCertPool(CAFile)
-
-	// The client should trust the RootCA that the Server uses since
-	// everyone is a peer in the network.
-	t.Client.Certificates = []tls.Certificate{tlsCert}
-	t.Client.RootCAs = t.Server.ClientCAs
-
-	return t, true
-}
-
-func parseInfo(path string) *Info {
-	file, err := os.Open(path)
-
-	if err != nil {
-		return nil
-	}
-
-	info := &Info{}
-	defer file.Close()
-
-	content, err := ioutil.ReadAll(file)
-	if err != nil {
-		fatalf("Unable to read info: %v", err)
-		return nil
-	}
-
-	if err = json.Unmarshal(content, &info); err != nil {
-		fatalf("Unable to parse info: %v", err)
-		return nil
-	}
-
-	return info
-}
-
-// Get the server info from previous conf file
-// or from the user
-func getInfo(path string) *Info {
-
-	// Read in the server info if available.
-	infoPath := filepath.Join(path, "info")
-
-	// Delete the old configuration if exist
-	if force {
-		logPath := filepath.Join(path, "log")
-		confPath := filepath.Join(path, "conf")
-		snapshotPath := filepath.Join(path, "snapshot")
-		os.Remove(infoPath)
-		os.Remove(logPath)
-		os.Remove(confPath)
-		os.RemoveAll(snapshotPath)
-	}
-
-	info := parseInfo(infoPath)
-	if info != nil {
-		fmt.Printf("Found node configuration in '%s'. Ignoring flags.\n", infoPath)
-		return info
-	}
-
-	info = &argInfo
-
-	// Write to file.
-	content, _ := json.MarshalIndent(info, "", " ")
-	content = []byte(string(content) + "\n")
-	if err := ioutil.WriteFile(infoPath, content, 0644); err != nil {
-		fatalf("Unable to write info to file: %v", err)
-	}
-
-	fmt.Printf("Wrote node configuration to '%s'.\n", infoPath)
-
-	return info
-}
-
-// newCertPool creates x509 certPool and corresponding Auth Type.
-// If the given CAfile is valid, add the cert into the pool and verify the clients'
-// certs against the cert in the pool.
-// If the given CAfile is empty, do not verify the clients' cert.
-// If the given CAfile is not valid, fatal.
-func newCertPool(CAFile string) (tls.ClientAuthType, *x509.CertPool) {
-	if CAFile == "" {
-		return tls.NoClientCert, nil
-	}
-	pemByte, _ := ioutil.ReadFile(CAFile)
-
-	block, pemByte := pem.Decode(pemByte)
-
-	cert, err := x509.ParseCertificate(block.Bytes)
-
-	if err != nil {
-		fatal(err)
-	}
-
-	certPool := x509.NewCertPool()
-
-	certPool.AddCert(cert)
-
-	return tls.RequireAndVerifyClientCert, certPool
-}
-
-// Send join requests to the leader.
-func joinCluster(s *raft.Server, raftURL string, scheme string) error {
-	var b bytes.Buffer
-
-	command := &JoinCommand{
-		Name:    s.Name(),
-		RaftURL: info.RaftURL,
-		EtcdURL: info.EtcdURL,
-	}
-
-	json.NewEncoder(&b).Encode(command)
-
-	// t must be ok
-	t, ok := raftServer.Transporter().(transporter)
-
-	if !ok {
-		panic("wrong type")
-	}
-
-	joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"}
-
-	debugf("Send Join Request to %s", raftURL)
-
-	resp, err := t.Post(joinURL.String(), &b)
-
-	for {
-		if err != nil {
-			return fmt.Errorf("Unable to join: %v", err)
-		}
-		if resp != nil {
-			defer resp.Body.Close()
-			if resp.StatusCode == http.StatusOK {
-				return nil
-			}
-			if resp.StatusCode == http.StatusTemporaryRedirect {
-
-				address := resp.Header.Get("Location")
-				debugf("Send Join Request to %s", address)
-
-				json.NewEncoder(&b).Encode(command)
-
-				resp, err = t.Post(address, &b)
-
-			} else if resp.StatusCode == http.StatusBadRequest {
-				debug("Reach max number machines in the cluster")
-				return fmt.Errorf(errors[103])
-			} else {
-				return fmt.Errorf("Unable to join")
-			}
-		}
-
-	}
-	return fmt.Errorf("Unable to join: %v", err)
-}
-
-// Register commands to raft server
-func registerCommands() {
-	raft.RegisterCommand(&JoinCommand{})
-	raft.RegisterCommand(&SetCommand{})
-	raft.RegisterCommand(&GetCommand{})
-	raft.RegisterCommand(&DeleteCommand{})
-	raft.RegisterCommand(&WatchCommand{})
-	raft.RegisterCommand(&TestAndSetCommand{})
-}

+ 17 - 3
etcd_handlers.go

@@ -12,6 +12,19 @@ import (
 // Handlers to handle etcd-store related request via etcd url
 //-------------------------------------------------------------------
 
+func NewEtcdMuxer() *http.ServeMux {
+	// external commands
+	etcdMux := http.NewServeMux()
+	etcdMux.HandleFunc("/"+version+"/keys/", Multiplexer)
+	etcdMux.HandleFunc("/"+version+"/watch/", WatchHttpHandler)
+	etcdMux.HandleFunc("/leader", LeaderHttpHandler)
+	etcdMux.HandleFunc("/machines", MachinesHttpHandler)
+	etcdMux.HandleFunc("/", VersionHttpHandler)
+	etcdMux.HandleFunc("/stats", StatsHttpHandler)
+	etcdMux.HandleFunc("/test/", TestHttpHandler)
+	return etcdMux
+}
+
 // Multiplex GET/POST/DELETE request to corresponding handlers
 func Multiplexer(w http.ResponseWriter, req *http.Request) {
 
@@ -152,8 +165,9 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) {
 			return
 		}
 	} else {
+		leader := raftServer.Leader()
 		// current no leader
-		if raftServer.Leader() == "" {
+		if leader == "" {
 			(*w).WriteHeader(http.StatusInternalServerError)
 			(*w).Write(newJsonError(300, ""))
 			return
@@ -166,10 +180,10 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, etcd bool) {
 		var url string
 
 		if etcd {
-			etcdAddr, _ := nameToEtcdURL(raftServer.Leader())
+			etcdAddr, _ := nameToEtcdURL(leader)
 			url = etcdAddr + path
 		} else {
-			raftAddr, _ := nameToRaftURL(raftServer.Leader())
+			raftAddr, _ := nameToRaftURL(leader)
 			url = raftAddr + path
 		}
 

+ 221 - 0
raft_server.go

@@ -0,0 +1,221 @@
+package main
+
+import (
+	"bytes"
+	"crypto/tls"
+	"encoding/json"
+	"fmt"
+	"net/http"
+	"net/url"
+	"time"
+
+	"github.com/coreos/go-raft"
+)
+
+var raftTransporter transporter
+var raftServer *raft.Server
+
+// Start the raft server
+func startRaft(tlsConfig TLSConfig) {
+	if veryVerbose {
+		raft.SetLogLevel(raft.Debug)
+	}
+
+	var err error
+
+	raftName := info.Name
+
+	// Create transporter for raft
+	raftTransporter = newTransporter(tlsConfig.Scheme, tlsConfig.Client)
+
+	// Create raft server
+	raftServer, err = raft.NewServer(raftName, dirPath, raftTransporter, etcdStore, nil)
+
+	if err != nil {
+		fatal(err)
+	}
+
+	// LoadSnapshot
+	if snapshot {
+		err = raftServer.LoadSnapshot()
+
+		if err == nil {
+			debugf("%s finished load snapshot", raftServer.Name())
+		} else {
+			debug(err)
+		}
+	}
+
+	raftServer.SetElectionTimeout(ElectionTimeout)
+	raftServer.SetHeartbeatTimeout(HeartbeatTimeout)
+
+	raftServer.Start()
+
+	if raftServer.IsLogEmpty() {
+
+		// start as a leader in a new cluster
+		if len(cluster) == 0 {
+
+			time.Sleep(time.Millisecond * 20)
+
+			// leader need to join self as a peer
+			for {
+				command := &JoinCommand{
+					Name:    raftServer.Name(),
+					RaftURL: argInfo.RaftURL,
+					EtcdURL: argInfo.EtcdURL,
+				}
+				_, err := raftServer.Do(command)
+				if err == nil {
+					break
+				}
+			}
+			debugf("%s start as a leader", raftServer.Name())
+
+			// start as a follower in a existing cluster
+		} else {
+
+			time.Sleep(time.Millisecond * 20)
+
+			for i := 0; i < retryTimes; i++ {
+
+				success := false
+				for _, machine := range cluster {
+					if len(machine) == 0 {
+						continue
+					}
+					err = joinCluster(raftServer, machine, tlsConfig.Scheme)
+					if err != nil {
+						if err.Error() == errors[103] {
+							fatal(err)
+						}
+						debugf("cannot join to cluster via machine %s %s", machine, err)
+					} else {
+						success = true
+						break
+					}
+				}
+
+				if success {
+					break
+				}
+
+				warnf("cannot join to cluster via given machines, retry in %d seconds", RetryInterval)
+				time.Sleep(time.Second * RetryInterval)
+			}
+			if err != nil {
+				fatalf("Cannot join the cluster via given machines after %x retries", retryTimes)
+			}
+			debugf("%s success join to the cluster", raftServer.Name())
+		}
+
+	} else {
+		// rejoin the previous cluster
+		debugf("%s restart as a follower", raftServer.Name())
+	}
+
+	// open the snapshot
+	if snapshot {
+		go monitorSnapshot()
+	}
+
+	// start to response to raft requests
+	go startRaftTransport(*info, tlsConfig.Scheme, tlsConfig.Server)
+
+}
+
+// Start to listen and response raft command
+func startRaftTransport(info Info, scheme string, tlsConf tls.Config) {
+	u, _ := url.Parse(info.RaftURL)
+	infof("raft server [%s:%s]", info.Name, u)
+
+	raftMux := http.NewServeMux()
+
+	server := &http.Server{
+		Handler:   raftMux,
+		TLSConfig: &tlsConf,
+		Addr:      u.Host,
+	}
+
+	// internal commands
+	raftMux.HandleFunc("/name", NameHttpHandler)
+	raftMux.HandleFunc("/join", JoinHttpHandler)
+	raftMux.HandleFunc("/vote", VoteHttpHandler)
+	raftMux.HandleFunc("/log", GetLogHttpHandler)
+	raftMux.HandleFunc("/log/append", AppendEntriesHttpHandler)
+	raftMux.HandleFunc("/snapshot", SnapshotHttpHandler)
+	raftMux.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
+	raftMux.HandleFunc("/etcdURL", EtcdURLHttpHandler)
+
+	if scheme == "http" {
+		fatal(server.ListenAndServe())
+	} else {
+		fatal(server.ListenAndServeTLS(info.RaftTLS.CertFile, info.RaftTLS.KeyFile))
+	}
+
+}
+
+// Send join requests to the leader.
+func joinCluster(s *raft.Server, raftURL string, scheme string) error {
+	var b bytes.Buffer
+
+	command := &JoinCommand{
+		Name:    s.Name(),
+		RaftURL: info.RaftURL,
+		EtcdURL: info.EtcdURL,
+	}
+
+	json.NewEncoder(&b).Encode(command)
+
+	// t must be ok
+	t, ok := raftServer.Transporter().(transporter)
+
+	if !ok {
+		panic("wrong type")
+	}
+
+	joinURL := url.URL{Host: raftURL, Scheme: scheme, Path: "/join"}
+
+	debugf("Send Join Request to %s", raftURL)
+
+	resp, err := t.Post(joinURL.String(), &b)
+
+	for {
+		if err != nil {
+			return fmt.Errorf("Unable to join: %v", err)
+		}
+		if resp != nil {
+			defer resp.Body.Close()
+			if resp.StatusCode == http.StatusOK {
+				return nil
+			}
+			if resp.StatusCode == http.StatusTemporaryRedirect {
+
+				address := resp.Header.Get("Location")
+				debugf("Send Join Request to %s", address)
+
+				json.NewEncoder(&b).Encode(command)
+
+				resp, err = t.Post(address, &b)
+
+			} else if resp.StatusCode == http.StatusBadRequest {
+				debug("Reach max number machines in the cluster")
+				return fmt.Errorf(errors[103])
+			} else {
+				return fmt.Errorf("Unable to join")
+			}
+		}
+
+	}
+	return fmt.Errorf("Unable to join: %v", err)
+}
+
+// Register commands to raft server
+func registerCommands() {
+	raft.RegisterCommand(&JoinCommand{})
+	raft.RegisterCommand(&SetCommand{})
+	raft.RegisterCommand(&GetCommand{})
+	raft.RegisterCommand(&DeleteCommand{})
+	raft.RegisterCommand(&WatchCommand{})
+	raft.RegisterCommand(&TestAndSetCommand{})
+}

+ 27 - 0
transporter.go

@@ -2,10 +2,12 @@ package main
 
 import (
 	"bytes"
+	"crypto/tls"
 	"encoding/json"
 	"fmt"
 	"github.com/coreos/go-raft"
 	"io"
+	"net"
 	"net/http"
 )
 
@@ -14,6 +16,31 @@ type transporter struct {
 	client *http.Client
 }
 
+// Create transporter using by raft server
+// Create http or https transporter based on
+// whether the user give the server cert and key
+func newTransporter(scheme string, tlsConf tls.Config) transporter {
+	t := transporter{}
+
+	tr := &http.Transport{
+		Dial: dialTimeout,
+	}
+
+	if scheme == "https" {
+		tr.TLSClientConfig = &tlsConf
+		tr.DisableCompression = true
+	}
+
+	t.client = &http.Client{Transport: tr}
+
+	return t
+}
+
+// Dial with timeout
+func dialTimeout(network, addr string) (net.Conn, error) {
+	return net.DialTimeout(network, addr, HTTPTimeout)
+}
+
 // Sends AppendEntries RPCs to a peer when the server is the leader.
 func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Peer, req *raft.AppendEntriesRequest) *raft.AppendEntriesResponse {
 	var aersp *raft.AppendEntriesResponse

+ 36 - 0
util.go

@@ -6,7 +6,9 @@ import (
 	"github.com/coreos/etcd/web"
 	"io"
 	"log"
+	"net"
 	"net/http"
+	"net/url"
 	"os"
 	"strconv"
 	"time"
@@ -69,6 +71,36 @@ func encodeJsonResponse(w http.ResponseWriter, status int, data interface{}) {
 	}
 }
 
+// sanitizeURL will cleanup a host string in the format hostname:port and
+// attach a schema.
+func sanitizeURL(host string, defaultScheme string) string {
+	// Blank URLs are fine input, just return it
+	if len(host) == 0 {
+		return host
+	}
+
+	p, err := url.Parse(host)
+	if err != nil {
+		fatal(err)
+	}
+
+	// Make sure the host is in Host:Port format
+	_, _, err = net.SplitHostPort(host)
+	if err != nil {
+		fatal(err)
+	}
+
+	p = &url.URL{Host: host, Scheme: defaultScheme}
+
+	return p.String()
+}
+
+func check(err error) {
+	if err != nil {
+		fatal(err)
+	}
+}
+
 //--------------------------------------
 // Log
 //--------------------------------------
@@ -79,6 +111,10 @@ func init() {
 	logger = log.New(os.Stdout, "[etcd] ", log.Lmicroseconds)
 }
 
+func infof(msg string, v ...interface{}) {
+	logger.Printf("INFO "+msg+"\n", v...)
+}
+
 func debugf(msg string, v ...interface{}) {
 	if verbose {
 		logger.Printf("DEBUG "+msg+"\n", v...)