Browse Source

Accept maxsize parameter. Deny further join request if reache max size

Xiang Li 12 years ago
parent
commit
79966b6550
5 changed files with 37 additions and 0 deletions
  1. 5 0
      client_handlers.go
  2. 13 0
      command.go
  3. 3 0
      error.go
  4. 11 0
      etcd.go
  5. 5 0
      machines.go

+ 5 - 0
client_handlers.go

@@ -118,6 +118,11 @@ func dispatch(c Command, w *http.ResponseWriter, req *http.Request, client bool)
 				(*w).Write(newJsonError(102, err.Error()))
 				(*w).Write(newJsonError(102, err.Error()))
 				return
 				return
 			}
 			}
+			if err.Error() == errors[103] {
+				(*w).WriteHeader(http.StatusBadRequest)
+				(*w).Write(newJsonError(103, ""))
+				return
+			}
 			(*w).WriteHeader(http.StatusInternalServerError)
 			(*w).WriteHeader(http.StatusInternalServerError)
 			(*w).Write(newJsonError(300, err.Error()))
 			(*w).Write(newJsonError(300, err.Error()))
 			return
 			return

+ 13 - 0
command.go

@@ -119,12 +119,25 @@ func (c *JoinCommand) CommandName() string {
 
 
 // Join a server to the cluster
 // Join a server to the cluster
 func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
 func (c *JoinCommand) Apply(raftServer *raft.Server) (interface{}, error) {
+
+	// check machine number in the cluster
+	num := machineNum()
+	if num == maxClusterSize {
+		return []byte("join fail"), fmt.Errorf(errors[103])
+	}
+
+	// add peer in raft
 	err := raftServer.AddPeer(c.Name)
 	err := raftServer.AddPeer(c.Name)
+
+	// add machine in etcd
 	addMachine(c.Name, c.Hostname, c.RaftPort, c.ClientPort)
 	addMachine(c.Name, c.Hostname, c.RaftPort, c.ClientPort)
+
+	// add machine in etcd storage
 	nodeName := fmt.Sprintf("%s%d", "node", raftServer.CommitIndex())
 	nodeName := fmt.Sprintf("%s%d", "node", raftServer.CommitIndex())
 	key := path.Join("machines", nodeName)
 	key := path.Join("machines", nodeName)
 	value := fmt.Sprintf("%s,%d,%d", c.Hostname, c.RaftPort, c.ClientPort)
 	value := fmt.Sprintf("%s,%d,%d", c.Hostname, c.RaftPort, c.ClientPort)
 	etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
 	etcdStore.Set(key, value, time.Unix(0, 0), raftServer.CommitIndex())
+
 	return []byte("join success"), err
 	return []byte("join success"), err
 }
 }
 
 

+ 3 - 0
error.go

@@ -13,6 +13,8 @@ func init() {
 	errors[100] = "Key Not Found"
 	errors[100] = "Key Not Found"
 	errors[101] = "The given PrevValue is not equal to the value of the key"
 	errors[101] = "The given PrevValue is not equal to the value of the key"
 	errors[102] = "Not A File"
 	errors[102] = "Not A File"
+	errors[103] = "Reach the max number of machines in the cluster"
+
 	// Post form related errors
 	// Post form related errors
 	errors[200] = "Value is Required in POST form"
 	errors[200] = "Value is Required in POST form"
 	errors[201] = "PrevValue is Required in POST form"
 	errors[201] = "PrevValue is Required in POST form"
@@ -24,6 +26,7 @@ func init() {
 
 
 	// keyword
 	// keyword
 	errors[400] = "The prefix of the given key is a keyword in etcd"
 	errors[400] = "The prefix of the given key is a keyword in etcd"
+
 }
 }
 
 
 type jsonError struct {
 type jsonError struct {

+ 11 - 0
etcd.go

@@ -57,6 +57,8 @@ var snapshot bool
 
 
 var retryTimes int
 var retryTimes int
 
 
+var maxClusterSize int
+
 func init() {
 func init() {
 	flag.BoolVar(&verbose, "v", false, "verbose logging")
 	flag.BoolVar(&verbose, "v", false, "verbose logging")
 	flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging")
 	flag.BoolVar(&veryVerbose, "vv", false, "very verbose logging")
@@ -86,6 +88,8 @@ func init() {
 	flag.IntVar(&maxSize, "m", 1024, "the max size of result buffer")
 	flag.IntVar(&maxSize, "m", 1024, "the max size of result buffer")
 
 
 	flag.IntVar(&retryTimes, "r", 3, "the max retry attempts when trying to join a cluster")
 	flag.IntVar(&retryTimes, "r", 3, "the max retry attempts when trying to join a cluster")
+
+	flag.IntVar(&maxClusterSize, "maxsize", 9, "the max size of the cluster")
 }
 }
 
 
 // CONSTANTS
 // CONSTANTS
@@ -276,6 +280,10 @@ func startRaft(securityType int) {
 					}
 					}
 					err = joinCluster(raftServer, machine)
 					err = joinCluster(raftServer, machine)
 					if err != nil {
 					if err != nil {
+						if err.Error() == errors[103] {
+							fmt.Println(err)
+							os.Exit(1)
+						}
 						debug("cannot join to cluster via machine %s %s", machine, err)
 						debug("cannot join to cluster via machine %s %s", machine, err)
 					} else {
 					} else {
 						success = true
 						success = true
@@ -602,6 +610,9 @@ func joinCluster(s *raft.Server, serverName string) error {
 				debug("Send Join Request to %s", address)
 				debug("Send Join Request to %s", address)
 				json.NewEncoder(&b).Encode(command)
 				json.NewEncoder(&b).Encode(command)
 				resp, err = t.Post(fmt.Sprintf("%s/join", address), &b)
 				resp, err = t.Post(fmt.Sprintf("%s/join", address), &b)
+			} else if resp.StatusCode == http.StatusBadRequest {
+				debug("Reach max number machines in the cluster")
+				return fmt.Errorf(errors[103])
 			} else {
 			} else {
 				return fmt.Errorf("Unable to join")
 				return fmt.Errorf("Unable to join")
 			}
 			}

+ 5 - 0
machines.go

@@ -28,3 +28,8 @@ func getClientAddr(name string) (string, bool) {
 
 
 	return addr, true
 	return addr, true
 }
 }
+
+// machineNum returns the number of machines in the cluster
+func machineNum() int {
+	return len(machinesMap)
+}