Browse Source

feat(join): check cluster conditions before join

Yicheng Qin 11 years ago
parent
commit
001b1fcd46
3 changed files with 66 additions and 3 deletions
  1. 26 0
      server/client.go
  2. 37 2
      server/peer_server.go
  3. 3 1
      server/peer_server_handlers.go

+ 26 - 0
server/client.go

@@ -60,6 +60,32 @@ func (c *Client) GetVersion(url string) (int, *etcdErr.Error) {
 	return version, nil
 }
 
+func (c *Client) GetMachines(url string) ([]*machineMessage, *etcdErr.Error) {
+	resp, err := c.Get(url + "/v2/admin/machines")
+	if err != nil {
+		return nil, clientError(err)
+	}
+
+	msgs := new([]*machineMessage)
+	if uerr := c.parseJSONResponse(resp, msgs); uerr != nil {
+		return nil, uerr
+	}
+	return *msgs, nil
+}
+
+func (c *Client) GetClusterConfig(url string) (*ClusterConfig, *etcdErr.Error) {
+	resp, err := c.Get(url + "/v2/admin/config")
+	if err != nil {
+		return nil, clientError(err)
+	}
+
+	config := new(ClusterConfig)
+	if uerr := c.parseJSONResponse(resp, config); uerr != nil {
+		return nil, uerr
+	}
+	return config, nil
+}
+
 // AddMachine adds machine to the cluster.
 // The first return value is the commit index of join command.
 func (c *Client) AddMachine(url string, cmd *JoinCommand) (uint64, *etcdErr.Error) {

+ 37 - 2
server/peer_server.go

@@ -489,10 +489,44 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
 	// Our version must match the leaders version
 	version, err := s.client.GetVersion(u)
 	if err != nil {
-		return fmt.Errorf("Error during join version check: %v", err)
+		log.Debugf("fail checking join version")
+		return err
 	}
 	if version < store.MinVersion() || version > store.MaxVersion() {
-		return fmt.Errorf("Unable to join: cluster version is %d; version compatibility is %d - %d", version, store.MinVersion(), store.MaxVersion())
+		log.Infof("fail passing version compatibility(%d-%d) using %d", store.MinVersion(), store.MaxVersion(), version)
+		return fmt.Errorf("incompatible version")
+	}
+
+	// Fetch current peer list
+	machines, err := s.client.GetMachines(u)
+	if err != nil {
+		log.Debugf("fail getting machine messages")
+		return err
+	}
+	exist := false
+	for _, machine := range machines {
+		if machine.Name == server.Name() {
+			exist = true
+			// TODO(yichengq): cannot set join index for it.
+			// Need discussion about the best way to do it.
+			//
+			// if machine.PeerURL == s.Config.URL {
+			// 	log.Infof("has joined the cluster(%v) before", machines)
+			// 	return nil
+			// }
+			break
+		}
+	}
+
+	// Fetch cluster config to see whether exists some place.
+	clusterConfig, err := s.client.GetClusterConfig(u)
+	if err != nil {
+		log.Debugf("fail getting cluster config")
+		return err
+	}
+	if !exist && clusterConfig.ActiveSize <= len(machines) {
+		log.Infof("stop joining because the cluster is full with %d nodes", len(machines))
+		return fmt.Errorf("out of quota")
 	}
 
 	joinIndex, err := s.client.AddMachine(u,
@@ -504,6 +538,7 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
 			EtcdURL:    s.server.URL(),
 		})
 	if err != nil {
+		log.Debugf("fail on join request")
 		return err
 	}
 

+ 3 - 1
server/peer_server_handlers.go

@@ -224,7 +224,9 @@ func (ps *PeerServer) setClusterConfigHttpHandler(w http.ResponseWriter, req *ht
 func (ps *PeerServer) getMachinesHttpHandler(w http.ResponseWriter, req *http.Request) {
 	machines := make([]*machineMessage, 0)
 	for _, name := range ps.registry.Names() {
-		machines = append(machines, ps.getMachineMessage(name))
+		if msg := ps.getMachineMessage(name); msg != nil {
+			machines = append(machines, msg)
+		}
 	}
 	json.NewEncoder(w).Encode(&machines)
 }