Browse Source

feat(trasnport): add an independent node name

Don't let the raft algorithm know anything about the transport. Give it
a nodename instead. This will allow us to support more complex
networking setups in the future.
Brandon Philips 12 years ago
parent
commit
a19048841f
9 changed files with 141 additions and 70 deletions
  1. 4 4
      client_handlers.go
  2. 5 4
      command.go
  3. 56 34
      etcd.go
  4. 1 1
      etcd_long_test.go
  5. 6 6
      machines.go
  6. 14 2
      raft_handlers.go
  7. 24 6
      test.go
  8. 29 12
      transporter.go
  9. 2 1
      web/web.go

+ 4 - 4
client_handlers.go

@@ -45,7 +45,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 		return
 	}
 
-	debugf("[recv] POST http://%v/v1/keys/%s", raftServer.Name(), key)
+	debugf("[recv] POST %v/v1/keys/%s", raftServer.Name(), key)
 
 	value := req.FormValue("value")
 
@@ -96,7 +96,7 @@ func SetHttpHandler(w *http.ResponseWriter, req *http.Request) {
 func DeleteHttpHandler(w *http.ResponseWriter, req *http.Request) {
 	key := req.URL.Path[len("/v1/keys/"):]
 
-	debugf("[recv] DELETE http://%v/v1/keys/%s", raftServer.Name(), key)
+	debugf("[recv] DELETE %v/v1/keys/%s", raftServer.Name(), key)
 
 	command := &DeleteCommand{
 		Key: key,
@@ -172,9 +172,9 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
 
 		if client {
 			clientAddr, _ := getClientAddr(raftServer.Leader())
-			url = scheme + clientAddr + path
+			url = clientAddr + path
 		} else {
-			url = scheme + raftServer.Leader() + path
+			url = raftServer.Leader() + path
 		}
 
 		debugf("Redirect to %s", url)

+ 5 - 4
command.go

@@ -111,9 +111,8 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
 // JoinCommand
 type JoinCommand struct {
 	Name       string `json:"name"`
-	Hostname   string `json:"hostName"`
-	RaftPort   int    `json:"raftPort"`
-	ClientPort int    `json:"clientPort"`
+	RaftURL    string `json:"raftURL"`
+	ClientURL  string `json:"clientURL"`
 }
 
 // The name of the join command in the log
@@ -137,12 +136,14 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 		return []byte("join fail"), fmt.Errorf(errors[103])
 	}
 
+	raftTransporter.AddPeer(c)
+
 	// add peer in raft
 	err := raftServer.AddPeer(c.Name)
 
 	// add machine in etcd storage
 	key := path.Join("_etcd/machines", c.Name)
-	value := fmt.Sprintf("%s,%d,%d", c.Hostname, c.RaftPort, c.ClientPort)
+	value := fmt.Sprintf("server=%s&client=%s", c.RaftURL, c.ClientURL)
 	etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
 
 	return []byte("join success"), err

+ 56 - 34
etcd.go

@@ -56,13 +56,14 @@ func init() {
 	flag.BoolVar(&verbose, "v", false, "verbose logging")
 	flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging")
 
+
 	flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma")
 	flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
 
-	flag.StringVar(&argInfo.Hostname, "h", "0.0.0.0", "the hostname of the local machine")
-	flag.IntVar(&argInfo.ClientPort, "c", 4001, "the port to communicate with clients")
-	flag.IntVar(&argInfo.RaftPort, "s", 7001, "the port to communicate with servers")
-	flag.IntVar(&argInfo.WebPort, "w", -1, "the port of web interface (-1 means do not start web interface)")
+	flag.StringVar(&argInfo.Name, "n", "", "the node name (required)")
+	flag.StringVar(&argInfo.ClientURL, "c", "127.0.0.1:4001", "the port to communicate with clients")
+	flag.StringVar(&argInfo.RaftURL, "s", "127.0.0.1:7001", "the port to communicate with servers")
+	flag.StringVar(&argInfo.WebURL, "w", "", "the port of web interface")
 
 	flag.StringVar(&argInfo.ServerCAFile, "serverCAFile", "", "the path of the CAFile")
 	flag.StringVar(&argInfo.ServerCertFile, "serverCert", "", "the cert file of the server")
@@ -111,10 +112,11 @@ const (
 //------------------------------------------------------------------------------
 
 type Info struct {
-	Hostname   string `json:"hostname"`
-	RaftPort   int    `json:"raftPort"`
-	ClientPort int    `json:"clientPort"`
-	WebPort    int    `json:"webPort"`
+	Name       string `json:"name"`
+
+	RaftURL    string `json:"raftURL"`
+	ClientURL  string `json:"clientURL"`
+	WebURL     string `json:"webURL"`
 
 	ServerCertFile string `json:"serverCertFile"`
 	ServerKeyFile  string `json:"serverKeyFile"`
@@ -142,6 +144,21 @@ var info *Info
 //
 //------------------------------------------------------------------------------
 
+// Check a URL and clean it up if the user forgot the schema
+func checkURL(u string, defaultSchema string) string {
+	p, err := url.Parse(u)
+
+	if err != nil {
+		panic(err)
+	}
+
+	if len(p.Host) == 0 && len(defaultSchema) != 0 {
+		return checkURL(fmt.Sprintf("%s://%s", defaultSchema, u), "")
+	}
+
+	return p.String()
+}
+
 //--------------------------------------
 // Main
 //--------------------------------------
@@ -184,6 +201,16 @@ func main() {
 		cluster = strings.Split(string(b), ",")
 	}
 
+	// Otherwise ask user for info and write it to file.
+	argInfo.Name = strings.TrimSpace(argInfo.Name)
+
+	if argInfo.Name == "" {
+		fatal("Please give the name of the server")
+	}
+
+	argInfo.RaftURL = checkURL(argInfo.RaftURL, "http")
+	argInfo.ClientURL = checkURL(argInfo.ClientURL, "http")
+
 	// Setup commands.
 	registerCommands()
 
@@ -209,11 +236,11 @@ func main() {
 
 	startRaft(raftTlsConfs)
 
-	if argInfo.WebPort != -1 {
+	if argInfo.WebURL != "" {
 		// start web
 		etcdStore.SetMessager(storeMsg)
 		go webHelper()
-		go web.Start(raftServer, argInfo.WebPort)
+		go web.Start(raftServer, argInfo.WebURL)
 	}
 
 	startEtcdTransport(*info, etcdTlsConfs[0])
@@ -224,7 +251,7 @@ func main() {
 func startRaft(tlsConfs []*tls.Config) {
 	var err error
 
-	raftName := fmt.Sprintf("%s:%d", info.Hostname, info.RaftPort)
+	raftName := info.Name
 
 	// Create transporter for raft
 	raftTransporter = newTransporter(tlsConfs[1])
@@ -262,10 +289,9 @@ func startRaft(tlsConfs []*tls.Config) {
 			// leader need to join self as a peer
 			for {
 				command := &JoinCommand{
-					Name:       raftServer.Name(),
-					Hostname:   argInfo.Hostname,
-					RaftPort:   argInfo.RaftPort,
-					ClientPort: argInfo.ClientPort,
+					Name:      raftServer.Name(),
+					RaftURL:   argInfo.RaftURL,
+					ClientURL: argInfo.ClientURL,
 				}
 				_, err := raftServer.Do(command)
 				if err == nil {
@@ -333,6 +359,8 @@ func startRaft(tlsConfs []*tls.Config) {
 func newTransporter(tlsConf *tls.Config) transporter {
 	t := transporter{}
 
+	t.names = make(map[string]*JoinCommand)
+
 	if tlsConf == nil {
 		t.scheme = "http://"
 
@@ -366,6 +394,7 @@ func dialTimeout(network, addr string) (net.Conn, error) {
 func startRaftTransport(info Info, tlsConf *tls.Config) {
 
 	// internal commands
+	http.HandleFunc("/name", NameHttpHandler)
 	http.HandleFunc("/join", JoinHttpHandler)
 	http.HandleFunc("/vote", VoteHttpHandler)
 	http.HandleFunc("/log", GetLogHttpHandler)
@@ -374,16 +403,16 @@ func startRaftTransport(info Info, tlsConf *tls.Config) {
 	http.HandleFunc("/snapshotRecovery", SnapshotRecoveryHttpHandler)
 	http.HandleFunc("/client", ClientHttpHandler)
 
-	if tlsConf == nil {
-		fmt.Printf("raft server [%s] listen on http port %v\n", info.Hostname, info.RaftPort)
-		fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.RaftPort), nil))
+	u, _ := url.Parse(info.RaftURL)
+	fmt.Printf("raft server [%s] listening on %s\n", info.Name, u)
 
+	if tlsConf == nil {
+		http.ListenAndServe(u.Host, nil)
 	} else {
 		server := &http.Server{
 			TLSConfig: tlsConf,
-			Addr:      fmt.Sprintf(":%d", info.RaftPort),
+			Addr:      u.Host,
 		}
-		fmt.Printf("raft server [%s] listen on https port %v\n", info.Hostname, info.RaftPort)
 		fatal(server.ListenAndServeTLS(info.ServerCertFile, argInfo.ServerKeyFile))
 	}
 
@@ -400,15 +429,16 @@ func startEtcdTransport(info Info, tlsConf *tls.Config) {
 	http.HandleFunc("/stats", StatsHttpHandler)
 	http.HandleFunc("/test/", TestHttpHandler)
 
+	u, _ := url.Parse(info.ClientURL)
+	fmt.Printf("raft server [%s] listening on %s\n", info.Name, u)
+
 	if tlsConf == nil {
-		fmt.Printf("etcd [%s] listen on http port %v\n", info.Hostname, info.ClientPort)
-		fatal(http.ListenAndServe(fmt.Sprintf(":%d", info.ClientPort), nil))
+		fatal(http.ListenAndServe(u.Host, nil))
 	} else {
 		server := &http.Server{
 			TLSConfig: tlsConf,
-			Addr:      fmt.Sprintf(":%d", info.ClientPort),
+			Addr:      u.Host,
 		}
-		fmt.Printf("etcd [%s] listen on https port %v\n", info.Hostname, info.ClientPort)
 		fatal(server.ListenAndServeTLS(info.ClientCertFile, info.ClientKeyFile))
 	}
 }
@@ -518,13 +548,6 @@ func getInfo(path string) *Info {
 		return info
 	}
 
-	// Otherwise ask user for info and write it to file.
-	argInfo.Hostname = strings.TrimSpace(argInfo.Hostname)
-
-	if argInfo.Hostname == "" {
-		fatal("Please give the address of the local machine")
-	}
-
 	info = &argInfo
 
 	// Write to file.
@@ -567,9 +590,8 @@ func joinCluster(s *raft.Server, serverName string) error {
 
 	command := &JoinCommand{
 		Name:       s.Name(),
-		Hostname:   info.Hostname,
-		RaftPort:   info.RaftPort,
-		ClientPort: info.ClientPort,
+		RaftURL:   info.RaftURL,
+		ClientURL: info.ClientURL,
 	}
 
 	json.NewEncoder(&b).Encode(command)

+ 1 - 1
etcd_long_test.go

@@ -36,7 +36,7 @@ func TestKillLeader(t *testing.T) {
 
 	leader := "127.0.0.1:7001"
 
-	for i := 0; i < 10; i++ {
+	for i := 0; i < clusterSize; i++ {
 		port, _ := strconv.Atoi(strings.Split(leader, ":")[1])
 		num := port - 7001
 		fmt.Println("kill server ", num)

+ 6 - 6
machines.go

@@ -1,20 +1,20 @@
 package main
 
 import (
-	"fmt"
 	"path"
-	"strings"
+	"net/url"
 )
 
 func getClientAddr(name string) (string, bool) {
 	response, _ := etcdStore.RawGet(path.Join("_etcd/machines", name))
 
-	values := strings.Split(response[0].Value, ",")
+	m, err := url.ParseQuery(response[0].Value)
 
-	hostname := values[0]
-	clientPort := values[2]
+	if err != nil {
+		panic("Failed to parse machines entry")
+	}
 
-	addr := fmt.Sprintf("%s:%s", hostname, clientPort)
+	addr := m["client"][0]
 
 	return addr, true
 }

+ 14 - 2
raft_handlers.go

@@ -4,7 +4,6 @@ import (
 	"encoding/json"
 	"github.com/coreos/go-raft"
 	"net/http"
-	"strconv"
 )
 
 //-------------------------------------------------------------
@@ -91,7 +90,7 @@ func SnapshotRecoveryHttpHandler(w http.ResponseWriter, req *http.Request) {
 func ClientHttpHandler(w http.ResponseWriter, req *http.Request) {
 	debugf("[recv] Get %s/client/ ", raftTransporter.scheme+raftServer.Name())
 	w.WriteHeader(http.StatusOK)
-	client := argInfo.Hostname + ":" + strconv.Itoa(argInfo.ClientPort)
+	client := argInfo.ClientURL
 	w.Write([]byte(client))
 }
 
@@ -108,3 +107,16 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
 		return
 	}
 }
+
+// Response to the join request
+func NameHttpHandler(w http.ResponseWriter, req *http.Request) {
+	command := &JoinCommand{}
+
+	if err := decodeJsonRequest(req, command); err == nil {
+		debugf("Receive Join Request from %s", command.Name)
+		dispatch(command, &w, req, false)
+	} else {
+		w.WriteHeader(http.StatusInternalServerError)
+		return
+	}
+}

+ 24 - 6
test.go

@@ -9,6 +9,7 @@ import (
 	"os"
 	"strconv"
 	"time"
+	"net/url"
 )
 
 var client = http.Client{
@@ -59,10 +60,10 @@ func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process,
 	argGroup := make([][]string, size)
 	for i := 0; i < size; i++ {
 		if i == 0 {
-			argGroup[i] = []string{"etcd", "-h=127.0.0.1", "-d=/tmp/node1"}
+			argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1", "-vv"}
 		} else {
 			strI := strconv.Itoa(i + 1)
-			argGroup[i] = []string{"etcd", "-h=127.0.0.1", "-c=400" + strI, "-s=700" + strI, "-d=/tmp/node" + strI, "-C=127.0.0.1:7001"}
+			argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=http://127.0.0.1:7001"}
 		}
 	}
 
@@ -103,7 +104,7 @@ func destroyCluster(etcds []*os.Process) error {
 //
 func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
 	leaderMap := make(map[int]string)
-	baseAddrFormat := "http://0.0.0.0:400%d/leader"
+	baseAddrFormat := "http://0.0.0.0:400%d"
 
 	for {
 		knownLeader := "unknown"
@@ -151,7 +152,7 @@ func leaderMonitor(size int, allowDeadNum int, leaderChan chan string) {
 
 func getLeader(addr string) (string, error) {
 
-	resp, err := client.Get(addr)
+	resp, err := client.Get(addr + "/leader")
 
 	if err != nil {
 		return "", err
@@ -163,14 +164,31 @@ func getLeader(addr string) (string, error) {
 	}
 
 	b, err := ioutil.ReadAll(resp.Body)
-
 	resp.Body.Close()
 
+	c := etcd.NewClient()
+	path := "/_etcd/machines/" + string(b)
+	fmt.Println(path)
+	fmt.Println(addr)
+	response, err := c.GetFrom(path, addr)
+	fmt.Println(response)
+	if err != nil {
+		return "", err
+	}
+
+	m, err := url.ParseQuery(response[0].Value)
+
+	if err != nil {
+		panic("Failed to parse machines entry")
+	}
+
+	addr = m["server"][0]
+
 	if err != nil {
 		return "", err
 	}
 
-	return string(b), nil
+	return addr, nil
 
 }
 

+ 29 - 12
transporter.go

@@ -15,6 +15,19 @@ type transporter struct {
 	client *http.Client
 	// scheme
 	scheme string
+	names map[string]*JoinCommand
+}
+
+func (t transporter) NameToRaftURL(name string) string {
+	return t.names[name].RaftURL
+}
+
+func (t transporter) NameToClientURL(name string) string {
+	return t.names[name].ClientURL
+}
+
+func (t transporter) AddPeer(jc *JoinCommand) {
+	t.names[jc.Name] = jc
 }
 
 // Sends AppendEntries RPCs to a peer when the server is the leader.
@@ -23,12 +36,13 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 
-	debugf("Send LogEntries to %s ", peer.Name())
+	u := t.NameToRaftURL(peer.Name())
+	debugf("Send LogEntries to %s ", u)
 
-	resp, err := t.Post(fmt.Sprintf("%s/log/append", peer.Name()), &b)
+	resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
 
 	if err != nil {
-		debugf("Cannot send AppendEntriesRequest to %s : %s", peer.Name(), err)
+		debugf("Cannot send AppendEntriesRequest to %s: %s", u, err)
 	}
 
 	if resp != nil {
@@ -48,12 +62,13 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 
-	debugf("Send Vote to %s", peer.Name())
+	u := t.NameToRaftURL(peer.Name())
+	debugf("Send Vote to %s", u)
 
-	resp, err := t.Post(fmt.Sprintf("%s/vote", peer.Name()), &b)
+	resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
 
 	if err != nil {
-		debugf("Cannot send VoteRequest to %s : %s", peer.Name(), err)
+		debugf("Cannot send VoteRequest to %s : %s", u, err)
 	}
 
 	if resp != nil {
@@ -73,10 +88,11 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 
-	debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", peer.Name(),
+	u := t.NameToRaftURL(peer.Name())
+	debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
 		req.LastTerm, req.LastIndex)
 
-	resp, err := t.Post(fmt.Sprintf("%s/snapshot", peer.Name()), &b)
+	resp, err := t.Post(fmt.Sprintf("%s/snapshot", u), &b)
 
 	if resp != nil {
 		defer resp.Body.Close()
@@ -95,10 +111,11 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 
-	debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", peer.Name(),
+	u := t.NameToRaftURL(peer.Name())
+	debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
 		req.LastTerm, req.LastIndex)
 
-	resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", peer.Name()), &b)
+	resp, err := t.Post(fmt.Sprintf("%s/snapshotRecovery", u), &b)
 
 	if resp != nil {
 		defer resp.Body.Close()
@@ -123,12 +140,12 @@ func (t transporter) GetLeaderClientAddress() string {
 
 // Send server side POST request
 func (t transporter) Post(path string, body io.Reader) (*http.Response, error) {
-	resp, err := t.client.Post(t.scheme+path, "application/json", body)
+	resp, err := t.client.Post(path, "application/json", body)
 	return resp, err
 }
 
 // Send server side GET request
 func (t transporter) Get(path string) (*http.Response, error) {
-	resp, err := t.client.Get(t.scheme + path)
+	resp, err := t.client.Get(path)
 	return resp, err
 }

+ 2 - 1
web/web.go

@@ -24,7 +24,8 @@ func mainHandler(c http.ResponseWriter, req *http.Request) {
 	mainTempl.Execute(c, p)
 }
 
-func Start(server *raft.Server, port int) {
+func Start(server *raft.Server, webURL string) {
+	port := "4002"
 	mainTempl = template.Must(template.New("index.html").Parse(index_html))
 	s = server