Browse Source

separate_id

Xiang Li 12 years ago
parent
commit
d3471eec7f
10 changed files with 124 additions and 78 deletions
  1. 3 2
      client_handlers.go
  2. 5 5
      command.go
  3. 10 13
      etcd.go
  4. 3 2
      etcd_long_test.go
  5. 2 2
      etcd_test.go
  6. 4 4
      machines.go
  7. 80 0
      name_url_map.go
  8. 4 10
      raft_handlers.go
  9. 5 22
      test.go
  10. 8 18
      transporter.go

+ 3 - 2
client_handlers.go

@@ -194,13 +194,14 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
 // command?
 // command?
 //--------------------------------------
 //--------------------------------------
 
 
-// Handler to return the current leader name
+// Handler to return the current leader's raft address
 func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
 func LeaderHttpHandler(w http.ResponseWriter, req *http.Request) {
 	leader := raftServer.Leader()
 	leader := raftServer.Leader()
 
 
 	if leader != "" {
 	if leader != "" {
 		w.WriteHeader(http.StatusOK)
 		w.WriteHeader(http.StatusOK)
-		w.Write([]byte(raftServer.Leader()))
+		raftURL, _ := nameToRaftURL(leader)
+		w.Write([]byte(raftURL))
 	} else {
 	} else {
 
 
 		// not likely, but it may happen
 		// not likely, but it may happen

+ 5 - 5
command.go

@@ -110,9 +110,9 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
 
 
 // JoinCommand
 // JoinCommand
 type JoinCommand struct {
 type JoinCommand struct {
-	Name       string `json:"name"`
-	RaftURL    string `json:"raftURL"`
-	ClientURL  string `json:"clientURL"`
+	Name    string `json:"name"`
+	RaftURL string `json:"raftURL"`
+	EtcdURL string `json:"etcdURL"`
 }
 }
 
 
 // The name of the join command in the log
 // The name of the join command in the log
@@ -136,14 +136,14 @@ func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 		return []byte("join fail"), fmt.Errorf(errors[103])
 		return []byte("join fail"), fmt.Errorf(errors[103])
 	}
 	}
 
 
-	raftTransporter.AddPeer(c)
+	addNameToURL(c.Name, c.RaftURL, c.EtcdURL)
 
 
 	// add peer in raft
 	// add peer in raft
 	err := raftServer.AddPeer(c.Name)
 	err := raftServer.AddPeer(c.Name)
 
 
 	// add machine in etcd storage
 	// add machine in etcd storage
 	key := path.Join("_etcd/machines", c.Name)
 	key := path.Join("_etcd/machines", c.Name)
-	value := fmt.Sprintf("server=%s&client=%s", c.RaftURL, c.ClientURL)
+	value := fmt.Sprintf("raft=%s&etcd=%s", c.RaftURL, c.EtcdURL)
 	etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
 	etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
 
 
 	return []byte("join success"), err
 	return []byte("join success"), err

+ 10 - 13
etcd.go

@@ -56,7 +56,6 @@ func init() {
 	flag.BoolVar(&verbose, "v", false, "verbose logging")
 	flag.BoolVar(&verbose, "v", false, "verbose logging")
 	flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging")
 	flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging")
 
 
-
 	flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma")
 	flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma")
 	flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
 	flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
 
 
@@ -112,11 +111,11 @@ const (
 //------------------------------------------------------------------------------
 //------------------------------------------------------------------------------
 
 
 type Info struct {
 type Info struct {
-	Name       string `json:"name"`
+	Name string `json:"name"`
 
 
-	RaftURL    string `json:"raftURL"`
-	ClientURL  string `json:"clientURL"`
-	WebURL     string `json:"webURL"`
+	RaftURL   string `json:"raftURL"`
+	ClientURL string `json:"clientURL"`
+	WebURL    string `json:"webURL"`
 
 
 	ServerCertFile string `json:"serverCertFile"`
 	ServerCertFile string `json:"serverCertFile"`
 	ServerKeyFile  string `json:"serverKeyFile"`
 	ServerKeyFile  string `json:"serverKeyFile"`
@@ -289,9 +288,9 @@ func startRaft(tlsConfs []*tls.Config) {
 			// leader need to join self as a peer
 			// leader need to join self as a peer
 			for {
 			for {
 				command := &JoinCommand{
 				command := &JoinCommand{
-					Name:      raftServer.Name(),
-					RaftURL:   argInfo.RaftURL,
-					ClientURL: argInfo.ClientURL,
+					Name:    raftServer.Name(),
+					RaftURL: argInfo.RaftURL,
+					EtcdURL: argInfo.ClientURL,
 				}
 				}
 				_, err := raftServer.Do(command)
 				_, err := raftServer.Do(command)
 				if err == nil {
 				if err == nil {
@@ -359,8 +358,6 @@ func startRaft(tlsConfs []*tls.Config) {
 func newTransporter(tlsConf *tls.Config) transporter {
 func newTransporter(tlsConf *tls.Config) transporter {
 	t := transporter{}
 	t := transporter{}
 
 
-	t.names = make(map[string]*JoinCommand)
-
 	if tlsConf == nil {
 	if tlsConf == nil {
 		t.scheme = "http://"
 		t.scheme = "http://"
 
 
@@ -589,9 +586,9 @@ func joinCluster(s *raft.Server, serverName string) error {
 	var b bytes.Buffer
 	var b bytes.Buffer
 
 
 	command := &JoinCommand{
 	command := &JoinCommand{
-		Name:       s.Name(),
-		RaftURL:   info.RaftURL,
-		ClientURL: info.ClientURL,
+		Name:    s.Name(),
+		RaftURL: info.RaftURL,
+		EtcdURL: info.ClientURL,
 	}
 	}
 
 
 	json.NewEncoder(&b).Encode(command)
 	json.NewEncoder(&b).Encode(command)

+ 3 - 2
etcd_long_test.go

@@ -34,10 +34,11 @@ func TestKillLeader(t *testing.T) {
 
 
 	var totalTime time.Duration
 	var totalTime time.Duration
 
 
-	leader := "127.0.0.1:7001"
+	leader := "http://127.0.0.1:7001"
 
 
 	for i := 0; i < clusterSize; i++ {
 	for i := 0; i < clusterSize; i++ {
-		port, _ := strconv.Atoi(strings.Split(leader, ":")[1])
+		fmt.Println("leader is ", leader)
+		port, _ := strconv.Atoi(strings.Split(leader, ":")[2])
 		num := port - 7001
 		num := port - 7001
 		fmt.Println("kill server ", num)
 		fmt.Println("kill server ", num)
 		etcds[num].Kill()
 		etcds[num].Kill()

+ 2 - 2
etcd_test.go

@@ -14,7 +14,7 @@ import (
 func TestSingleNode(t *testing.T) {
 func TestSingleNode(t *testing.T) {
 	procAttr := new(os.ProcAttr)
 	procAttr := new(os.ProcAttr)
 	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
 	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
-	args := []string{"etcd", "-h=127.0.0.1", "-f", "-d=/tmp/node1"}
+	args := []string{"etcd", "-n=node1", "-f", "-d=/tmp/node1"}
 
 
 	process, err := os.StartProcess("etcd", args, procAttr)
 	process, err := os.StartProcess("etcd", args, procAttr)
 	if err != nil {
 	if err != nil {
@@ -56,7 +56,7 @@ func TestSingleNode(t *testing.T) {
 func TestSingleNodeRecovery(t *testing.T) {
 func TestSingleNodeRecovery(t *testing.T) {
 	procAttr := new(os.ProcAttr)
 	procAttr := new(os.ProcAttr)
 	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
 	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
-	args := []string{"etcd", "-h=127.0.0.1", "-d=/tmp/node1"}
+	args := []string{"etcd", "-n=node1", "-d=/tmp/node1"}
 
 
 	process, err := os.StartProcess("etcd", append(args, "-f"), procAttr)
 	process, err := os.StartProcess("etcd", append(args, "-f"), procAttr)
 	if err != nil {
 	if err != nil {

+ 4 - 4
machines.go

@@ -1,20 +1,20 @@
 package main
 package main
 
 
 import (
 import (
-	"path"
 	"net/url"
 	"net/url"
+	"path"
 )
 )
 
 
 func getClientAddr(name string) (string, bool) {
 func getClientAddr(name string) (string, bool) {
-	response, _ := etcdStore.RawGet(path.Join("_etcd/machines", name))
+	resps, _ := etcdStore.RawGet(path.Join("_etcd/machines", name))
 
 
-	m, err := url.ParseQuery(response[0].Value)
+	m, err := url.ParseQuery(resps[0].Value)
 
 
 	if err != nil {
 	if err != nil {
 		panic("Failed to parse machines entry")
 		panic("Failed to parse machines entry")
 	}
 	}
 
 
-	addr := m["client"][0]
+	addr := m["etcd"][0]
 
 
 	return addr, true
 	return addr, true
 }
 }

+ 80 - 0
name_url_map.go

@@ -0,0 +1,80 @@
+package main
+
+import (
+	"net/url"
+	"path"
+)
+
+// we map node name to url
+type nodeInfo struct {
+	raftURL string
+	etcdURL string
+}
+
+var namesMap = make(map[string]*nodeInfo)
+
+// nameToEtcdURL maps node name to its etcd http address
+func nameToEtcdURL(name string) (string, bool) {
+
+	if info, ok := namesMap[name]; ok {
+		// first try to read from the map
+		return info.etcdURL, true
+	} else {
+		// if fails, try to recover from etcd storage
+		key := path.Join("/_etcd/machines", name)
+
+		resps, err := etcdStore.RawGet(key)
+
+		if err != nil {
+			return "", false
+		}
+
+		m, err := url.ParseQuery(resps[0].Value)
+
+		if err != nil {
+			panic("Failed to parse machines entry")
+		}
+
+		etcdURL := m["etcd"][0]
+
+		return etcdURL, true
+
+	}
+}
+
+// nameToRaftURL maps node name to its raft http address
+func nameToRaftURL(name string) (string, bool) {
+	if info, ok := namesMap[name]; ok {
+		// first try to read from the map
+		return info.raftURL, true
+
+	} else {
+		// if fails, try to recover from etcd storage
+		key := path.Join("/_etcd/machines", name)
+
+		resps, err := etcdStore.RawGet(key)
+
+		if err != nil {
+			return "", false
+		}
+
+		m, err := url.ParseQuery(resps[0].Value)
+
+		if err != nil {
+			panic("Failed to parse machines entry")
+		}
+
+		raftURL := m["raft"][0]
+
+		return raftURL, true
+
+	}
+}
+
+// addNameToURL add a name that maps to raftURL and etcdURL
+func addNameToURL(name string, raftURL string, etcdURL string) {
+	namesMap[name] = &nodeInfo{
+		raftURL: raftURL,
+		etcdURL: etcdURL,
+	}
+}

+ 4 - 10
raft_handlers.go

@@ -108,15 +108,9 @@ func JoinHttpHandler(w http.ResponseWriter, req *http.Request) {
 	}
 	}
 }
 }
 
 
-// Response to the join request
+// Response to the name request
 func NameHttpHandler(w http.ResponseWriter, req *http.Request) {
 func NameHttpHandler(w http.ResponseWriter, req *http.Request) {
-	command := &JoinCommand{}
-
-	if err := decodeJsonRequest(req, command); err == nil {
-		debugf("Receive Join Request from %s", command.Name)
-		dispatch(command, &w, req, false)
-	} else {
-		w.WriteHeader(http.StatusInternalServerError)
-		return
-	}
+	debugf("[recv] Get %s/name/ ", raftTransporter.scheme+raftServer.Name())
+	w.WriteHeader(http.StatusOK)
+	w.Write([]byte(raftServer.Name()))
 }
 }

+ 5 - 22
test.go

@@ -9,7 +9,7 @@ import (
 	"os"
 	"os"
 	"strconv"
 	"strconv"
 	"time"
 	"time"
-	"net/url"
+	//"net/url"
 )
 )
 
 
 var client = http.Client{
 var client = http.Client{
@@ -60,7 +60,7 @@ func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process,
 	argGroup := make([][]string, size)
 	argGroup := make([][]string, size)
 	for i := 0; i < size; i++ {
 	for i := 0; i < size; i++ {
 		if i == 0 {
 		if i == 0 {
-			argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1", "-vv"}
+			argGroup[i] = []string{"etcd", "-d=/tmp/node1", "-n=node1"}
 		} else {
 		} else {
 			strI := strconv.Itoa(i + 1)
 			strI := strconv.Itoa(i + 1)
 			argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=http://127.0.0.1:7001"}
 			argGroup[i] = []string{"etcd", "-n=node" + strI, "-c=127.0.0.1:400" + strI, "-s=127.0.0.1:700" + strI, "-d=/tmp/node" + strI, "-C=http://127.0.0.1:7001"}
@@ -75,7 +75,7 @@ func createCluster(size int, procAttr *os.ProcAttr) ([][]string, []*os.Process,
 		if err != nil {
 		if err != nil {
 			return nil, nil, err
 			return nil, nil, err
 		}
 		}
-		
+
 		// TODOBP: Change this sleep to wait until the master is up.
 		// TODOBP: Change this sleep to wait until the master is up.
 		// The problem is that if the master isn't up then the children
 		// The problem is that if the master isn't up then the children
 		// have to retry. This retry can take upwards of 15 seconds
 		// have to retry. This retry can take upwards of 15 seconds
@@ -164,31 +164,14 @@ func getLeader(addr string) (string, error) {
 	}
 	}
 
 
 	b, err := ioutil.ReadAll(resp.Body)
 	b, err := ioutil.ReadAll(resp.Body)
-	resp.Body.Close()
-
-	c := etcd.NewClient()
-	path := "/_etcd/machines/" + string(b)
-	fmt.Println(path)
-	fmt.Println(addr)
-	response, err := c.GetFrom(path, addr)
-	fmt.Println(response)
-	if err != nil {
-		return "", err
-	}
-
-	m, err := url.ParseQuery(response[0].Value)
-
-	if err != nil {
-		panic("Failed to parse machines entry")
-	}
 
 
-	addr = m["server"][0]
+	resp.Body.Close()
 
 
 	if err != nil {
 	if err != nil {
 		return "", err
 		return "", err
 	}
 	}
 
 
-	return addr, nil
+	return string(b), nil
 
 
 }
 }
 
 

+ 8 - 18
transporter.go

@@ -15,19 +15,6 @@ type transporter struct {
 	client *http.Client
 	client *http.Client
 	// scheme
 	// scheme
 	scheme string
 	scheme string
-	names map[string]*JoinCommand
-}
-
-func (t transporter) NameToRaftURL(name string) string {
-	return t.names[name].RaftURL
-}
-
-func (t transporter) NameToClientURL(name string) string {
-	return t.names[name].ClientURL
-}
-
-func (t transporter) AddPeer(jc *JoinCommand) {
-	t.names[jc.Name] = jc
 }
 }
 
 
 // Sends AppendEntries RPCs to a peer when the server is the leader.
 // Sends AppendEntries RPCs to a peer when the server is the leader.
@@ -36,7 +23,7 @@ func (t transporter) SendAppendEntriesRequest(server *raft.Server, peer *raft.Pe
 	var b bytes.Buffer
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 	json.NewEncoder(&b).Encode(req)
 
 
-	u := t.NameToRaftURL(peer.Name())
+	u, _ := nameToRaftURL(peer.Name())
 	debugf("Send LogEntries to %s ", u)
 	debugf("Send LogEntries to %s ", u)
 
 
 	resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
 	resp, err := t.Post(fmt.Sprintf("%s/log/append", u), &b)
@@ -62,7 +49,7 @@ func (t transporter) SendVoteRequest(server *raft.Server, peer *raft.Peer, req *
 	var b bytes.Buffer
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 	json.NewEncoder(&b).Encode(req)
 
 
-	u := t.NameToRaftURL(peer.Name())
+	u, _ := nameToRaftURL(peer.Name())
 	debugf("Send Vote to %s", u)
 	debugf("Send Vote to %s", u)
 
 
 	resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
 	resp, err := t.Post(fmt.Sprintf("%s/vote", u), &b)
@@ -88,7 +75,7 @@ func (t transporter) SendSnapshotRequest(server *raft.Server, peer *raft.Peer, r
 	var b bytes.Buffer
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 	json.NewEncoder(&b).Encode(req)
 
 
-	u := t.NameToRaftURL(peer.Name())
+	u, _ := nameToRaftURL(peer.Name())
 	debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
 	debugf("Send Snapshot to %s [Last Term: %d, LastIndex %d]", u,
 		req.LastTerm, req.LastIndex)
 		req.LastTerm, req.LastIndex)
 
 
@@ -111,7 +98,7 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft
 	var b bytes.Buffer
 	var b bytes.Buffer
 	json.NewEncoder(&b).Encode(req)
 	json.NewEncoder(&b).Encode(req)
 
 
-	u := t.NameToRaftURL(peer.Name())
+	u, _ := nameToRaftURL(peer.Name())
 	debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
 	debugf("Send SnapshotRecovery to %s [Last Term: %d, LastIndex %d]", u,
 		req.LastTerm, req.LastIndex)
 		req.LastTerm, req.LastIndex)
 
 
@@ -129,7 +116,10 @@ func (t transporter) SendSnapshotRecoveryRequest(server *raft.Server, peer *raft
 
 
 // Get the client address of the leader in the cluster
 // Get the client address of the leader in the cluster
 func (t transporter) GetLeaderClientAddress() string {
 func (t transporter) GetLeaderClientAddress() string {
-	resp, _ := t.Get(raftServer.Leader() + "/client")
+
+	u, _ := nameToRaftURL(raftServer.Leader())
+
+	resp, _ := t.Get(fmt.Sprintf("%s/client", u))
 	if resp != nil {
 	if resp != nil {
 		body, _ := ioutil.ReadAll(resp.Body)
 		body, _ := ioutil.ReadAll(resp.Body)
 		resp.Body.Close()
 		resp.Body.Close()