Browse Source

record machine info locally via join command

Xiang Li 12 years ago
parent
commit
268ba2592a
5 changed files with 70 additions and 27 deletions
  1. 6 3
      client_handlers.go
  2. 7 3
      command.go
  3. 26 20
      etcd.go
  4. 30 0
      machines.go
  5. 1 1
      raft_handlers.go

+ 6 - 3
client_handlers.go

@@ -179,7 +179,8 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
 		var url string
 		var url string
 
 
 		if client {
 		if client {
-			url = scheme + raftTransporter.GetLeaderClientAddress() + path
+			clientAddr, _ := getClientAddr(raftServer.Leader())
+			url = scheme + clientAddr + path
 		} else {
 		} else {
 			url = scheme + raftServer.Leader() + path
 			url = scheme + raftServer.Leader() + path
 		}
 		}
@@ -222,14 +223,16 @@ func MachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
 
 
 	// Add itself to the machine list first
 	// Add itself to the machine list first
 	// Since peer map does not contain the server itself
 	// Since peer map does not contain the server itself
-	machines := raftServer.Name()
+	machines, _ := getClientAddr(raftServer.Name())
 
 
 	// Add all peers to the list and sepearte by comma
 	// Add all peers to the list and sepearte by comma
 	// We do not use json here since we accept machines list
 	// We do not use json here since we accept machines list
 	// in the command line seperate by comma.
 	// in the command line seperate by comma.
 
 
 	for peerName, _ := range peers {
 	for peerName, _ := range peers {
-		machines = machines + "," + peerName
+		if addr, ok := getClientAddr(peerName); ok {
+			machines = machines + "," + addr
+		}
 	}
 	}
 
 
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)

+ 7 - 3
command.go

@@ -105,7 +105,10 @@ func (c *WatchCommand) Apply(server *raft.Server) (interface{}, error) {
 
 
 // JoinCommand
 // JoinCommand
 type JoinCommand struct {
 type JoinCommand struct {
-	Name string `json:"name"`
+	Name       string `json:"name"`
+	Hostname   string `json:"hostName"`
+	RaftPort   int    `json:"raftPort"`
+	ClientPort int    `json:"clientPort"`
 }
 }
 
 
 // The name of the join command in the log
 // The name of the join command in the log
@@ -114,8 +117,9 @@ func (c *JoinCommand) CommandName() string {
 }
 }
 
 
 // Join a server to the cluster
 // Join a server to the cluster
-func (c *JoinCommand) Apply(server *raft.Server) (interface{}, error) {
-	err := server.AddPeer(c.Name)
+func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
+	err := raftServer.AddPeer(c.Name)
+	addMachine(c.Name, c.Hostname, c.RaftPort, c.ClientPort)
 
 
 	return []byte("join success"), err
 	return []byte("join success"), err
 }
 }

+ 26 - 20
etcd.go

@@ -33,9 +33,9 @@ var machinesFile string
 
 
 var cluster []string
 var cluster []string
 
 
-var address string
+var hostname string
 var clientPort int
 var clientPort int
-var serverPort int
+var raftPort int
 var webPort int
 var webPort int
 
 
 var serverCertFile string
 var serverCertFile string
@@ -58,9 +58,9 @@ func init() {
 	flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma")
 	flag.StringVar(&machines, "C", "", "the ip address and port of a existing machines in the cluster, sepearate by comma")
 	flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
 	flag.StringVar(&machinesFile, "CF", "", "the file contains a list of existing machines in the cluster, seperate by comma")
 
 
-	flag.StringVar(&address, "a", "0.0.0.0", "the ip address of the local machine")
+	flag.StringVar(&hostname, "h", "0.0.0.0", "the hostname of the local machine")
 	flag.IntVar(&clientPort, "c", 4001, "the port to communicate with clients")
 	flag.IntVar(&clientPort, "c", 4001, "the port to communicate with clients")
-	flag.IntVar(&serverPort, "s", 7001, "the port to communicate with servers")
+	flag.IntVar(&raftPort, "s", 7001, "the port to communicate with servers")
 	flag.IntVar(&webPort, "w", -1, "the port of web interface")
 	flag.IntVar(&webPort, "w", -1, "the port of web interface")
 
 
 	flag.StringVar(&serverCAFile, "serverCAFile", "", "the path of the CAFile")
 	flag.StringVar(&serverCAFile, "serverCAFile", "", "the path of the CAFile")
@@ -107,8 +107,8 @@ const (
 //------------------------------------------------------------------------------
 //------------------------------------------------------------------------------
 
 
 type Info struct {
 type Info struct {
-	Address    string `json:"address"`
-	ServerPort int    `json:"serverPort"`
+	Hostname   string `json:"hostname"`
+	RaftPort   int    `json:"raftPort"`
 	ClientPort int    `json:"clientPort"`
 	ClientPort int    `json:"clientPort"`
 	WebPort    int    `json:"webPort"`
 	WebPort    int    `json:"webPort"`
 
 
@@ -194,7 +194,7 @@ func main() {
 func startRaft(securityType int) {
 func startRaft(securityType int) {
 	var err error
 	var err error
 
 
-	raftName := fmt.Sprintf("%s:%d", info.Address, info.ServerPort)
+	raftName := fmt.Sprintf("%s:%d", info.Hostname, info.RaftPort)
 
 
 	// Create transporter for raft
 	// Create transporter for raft
 	raftTransporter = createTransporter(securityType)
 	raftTransporter = createTransporter(securityType)
@@ -232,6 +232,9 @@ func startRaft(securityType int) {
 			for {
 			for {
 				command := &JoinCommand{}
 				command := &JoinCommand{}
 				command.Name = raftServer.Name()
 				command.Name = raftServer.Name()
+				command.Hostname = hostname
+				command.RaftPort = raftPort
+				command.ClientPort = clientPort
 				_, err := raftServer.Do(command)
 				_, err := raftServer.Do(command)
 				if err == nil {
 				if err == nil {
 					break
 					break
@@ -268,7 +271,7 @@ func startRaft(securityType int) {
 	// go server.Snapshot()
 	// go server.Snapshot()
 
 
 	// start to response to raft requests
 	// start to response to raft requests
-	go startRaftTransport(info.ServerPort, securityType)
+	go startRaftTransport(info.RaftPort, securityType)
 
 
 }
 }
 
 
@@ -338,11 +341,11 @@ func startRaftTransport(port int, st int) {
 	switch st {
 	switch st {
 
 
 	case HTTP:
 	case HTTP:
-		fmt.Printf("raft server [%s] listen on http port %v\n", address, port)
+		fmt.Printf("raft server [%s] listen on http port %v\n", hostname, port)
 		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
 		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
 
 
 	case HTTPS:
 	case HTTPS:
-		fmt.Printf("raft server [%s] listen on https port %v\n", address, port)
+		fmt.Printf("raft server [%s] listen on https port %v\n", hostname, port)
 		log.Fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil))
 		log.Fatal(http.ListenAndServeTLS(fmt.Sprintf(":%d", port), serverCertFile, serverKeyFile, nil))
 
 
 	case HTTPSANDVERIFY:
 	case HTTPSANDVERIFY:
@@ -354,7 +357,7 @@ func startRaftTransport(port int, st int) {
 			},
 			},
 			Addr: fmt.Sprintf(":%d", port),
 			Addr: fmt.Sprintf(":%d", port),
 		}
 		}
-		fmt.Printf("raft server [%s] listen on https port %v\n", address, port)
+		fmt.Printf("raft server [%s] listen on https port %v\n", hostname, port)
 		err := server.ListenAndServeTLS(serverCertFile, serverKeyFile)
 		err := server.ListenAndServeTLS(serverCertFile, serverKeyFile)
 
 
 		if err != nil {
 		if err != nil {
@@ -376,11 +379,11 @@ func startClientTransport(port int, st int) {
 	switch st {
 	switch st {
 
 
 	case HTTP:
 	case HTTP:
-		fmt.Printf("etcd [%s] listen on http port %v\n", address, clientPort)
+		fmt.Printf("etcd [%s] listen on http port %v\n", hostname, clientPort)
 		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
 		log.Fatal(http.ListenAndServe(fmt.Sprintf(":%d", port), nil))
 
 
 	case HTTPS:
 	case HTTPS:
-		fmt.Printf("etcd [%s] listen on https port %v\n", address, clientPort)
+		fmt.Printf("etcd [%s] listen on https port %v\n", hostname, clientPort)
 		http.ListenAndServeTLS(fmt.Sprintf(":%d", port), clientCertFile, clientKeyFile, nil)
 		http.ListenAndServeTLS(fmt.Sprintf(":%d", port), clientCertFile, clientKeyFile, nil)
 
 
 	case HTTPSANDVERIFY:
 	case HTTPSANDVERIFY:
@@ -392,7 +395,7 @@ func startClientTransport(port int, st int) {
 			},
 			},
 			Addr: fmt.Sprintf(":%d", port),
 			Addr: fmt.Sprintf(":%d", port),
 		}
 		}
-		fmt.Printf("etcd [%s] listen on https port %v\n", address, clientPort)
+		fmt.Printf("etcd [%s] listen on https port %v\n", hostname, clientPort)
 		err := server.ListenAndServeTLS(clientCertFile, clientKeyFile)
 		err := server.ListenAndServeTLS(clientCertFile, clientKeyFile)
 
 
 		if err != nil {
 		if err != nil {
@@ -480,15 +483,15 @@ func getInfo(path string) *Info {
 	} else {
 	} else {
 		// Otherwise ask user for info and write it to file.
 		// Otherwise ask user for info and write it to file.
 
 
-		if address == "" {
+		if hostname == "" {
 			fatal("Please give the address of the local machine")
 			fatal("Please give the address of the local machine")
 		}
 		}
 
 
-		info.Address = address
-		info.Address = strings.TrimSpace(info.Address)
-		fmt.Println("address ", info.Address)
+		info.Hostname = hostname
+		info.Hostname = strings.TrimSpace(info.Hostname)
+		fmt.Println("address ", info.Hostname)
 
 
-		info.ServerPort = serverPort
+		info.RaftPort = raftPort
 		info.ClientPort = clientPort
 		info.ClientPort = clientPort
 		info.WebPort = webPort
 		info.WebPort = webPort
 
 
@@ -537,6 +540,9 @@ func joinCluster(s *raft.Server, serverName string) error {
 
 
 	command := &JoinCommand{}
 	command := &JoinCommand{}
 	command.Name = s.Name()
 	command.Name = s.Name()
+	command.Hostname = info.Hostname
+	command.RaftPort = info.RaftPort
+	command.ClientPort = info.ClientPort
 
 
 	json.NewEncoder(&b).Encode(command)
 	json.NewEncoder(&b).Encode(command)
 
 
@@ -561,7 +567,7 @@ func joinCluster(s *raft.Server, serverName string) error {
 				return nil
 				return nil
 			}
 			}
 			if resp.StatusCode == http.StatusTemporaryRedirect {
 			if resp.StatusCode == http.StatusTemporaryRedirect {
-				address = resp.Header.Get("Location")
+				address := resp.Header.Get("Location")
 				debug("Leader is %s", address)
 				debug("Leader is %s", address)
 				debug("Send Join Request to %s", address)
 				debug("Send Join Request to %s", address)
 				json.NewEncoder(&b).Encode(command)
 				json.NewEncoder(&b).Encode(command)

+ 30 - 0
machines.go

@@ -0,0 +1,30 @@
+package main
+
+import (
+	"fmt"
+)
+
+type machine struct {
+	hostname   string
+	raftPort   int
+	clientPort int
+}
+
+var machinesMap = map[string]machine{}
+
+func addMachine(name string, hostname string, raftPort int, clientPort int) {
+
+	machinesMap[name] = machine{hostname, raftPort, clientPort}
+
+}
+
+func getClientAddr(name string) (string, bool) {
+	machine, ok := machinesMap[name]
+	if !ok {
+		return "", false
+	}
+
+	addr := fmt.Sprintf("%s:%v", machine.hostname, machine.clientPort)
+
+	return addr, true
+}

+ 1 - 1
raft_handlers.go

@@ -75,7 +75,7 @@ func SnapshotHttpHandler(w http.ResponseWriter, req *http.Request) {
 func ClientHttpHandler(w http.ResponseWriter, req *http.Request) {
 func ClientHttpHandler(w http.ResponseWriter, req *http.Request) {
 	debug("[recv] Get http://%v/client/ ", raftServer.Name())
 	debug("[recv] Get http://%v/client/ ", raftServer.Name())
 	w.WriteHeader(http.StatusOK)
 	w.WriteHeader(http.StatusOK)
-	client := address + ":" + strconv.Itoa(clientPort)
+	client := hostname + ":" + strconv.Itoa(clientPort)
 	w.Write([]byte(client))
 	w.Write([]byte(client))
 }
 }