Browse Source

Minor fixes.

Ben Johnson 11 years ago
parent
commit
fe4dee03ab
3 changed files with 16 additions and 13 deletions
  1. 1 0
      error/error.go
  2. 1 1
      server/demote_command.go
  3. 14 12
      server/peer_server.go

+ 1 - 0
error/error.go

@@ -93,6 +93,7 @@ func init() {
 	errors[EcodeProxyInternal] = "Proxy Internal Error"
 	errors[EcodeInvalidActiveSize] = "Invalid active size"
 	errors[EcodeInvalidPromoteDelay] = "Proxy promote delay"
+	errors[EcodePromoteError] = "Proxy promotion error"
 
 }
 

+ 1 - 1
server/demote_command.go

@@ -43,7 +43,7 @@ func (c *DemoteCommand) Apply(context raft.Context) (interface{}, error) {
 	// Update mode if this change applies to this server.
 	if c.Name == ps.Config.Name {
 		log.Infof("Set mode after demotion: %s", c.Name)
-		ps.SetMode(ProxyMode)
+		ps.setMode(ProxyMode)
 	}
 
 	return nil, nil

+ 14 - 12
server/peer_server.go

@@ -119,16 +119,16 @@ func (s *PeerServer) Mode() Mode {
 // SetMode updates the current mode of the server.
 // Switching to a peer mode will start the Raft server.
 // Switching to a proxy mode will stop the Raft server.
-func (s *PeerServer) SetMode(mode Mode) {
+func (s *PeerServer) setMode(mode Mode) {
 	s.mode = mode
 
 	switch mode {
 	case PeerMode:
-		if s.raftServer.Running() {
+		if !s.raftServer.Running() {
 			s.raftServer.Start()
 		}
 	case ProxyMode:
-		if !s.raftServer.Running() {
+		if s.raftServer.Running() {
 			s.raftServer.Stop()
 		}
 	}
@@ -140,10 +140,9 @@ func (s *PeerServer) ClusterConfig() *ClusterConfig {
 }
 
 // SetClusterConfig updates the current cluster configuration.
-// Adjusting the active size will 
+// Adjusting the active size will cause the PeerServer to demote peers or
+// promote proxies to match the new size.
 func (s *PeerServer) SetClusterConfig(c *ClusterConfig) error {
-	s.clusterConfig = c
-
 	// Validate configuration.
 	if c.ActiveSize < 1 {
 		return etcdErr.NewError(etcdErr.EcodeInvalidActiveSize, "Post", 0)
@@ -151,6 +150,8 @@ func (s *PeerServer) SetClusterConfig(c *ClusterConfig) error {
 		return etcdErr.NewError(etcdErr.EcodeInvalidPromoteDelay, "Post", 0)
 	}
 
+	s.clusterConfig = c
+
 	return nil
 }
 
@@ -458,9 +459,9 @@ func (s *PeerServer) joinByPeer(server raft.Server, peer string, scheme string)
 
 				switch mode {
 				case 0:
-					s.SetMode(PeerMode)
+					s.setMode(PeerMode)
 				case 1:
-					s.SetMode(ProxyMode)
+					s.setMode(ProxyMode)
 					s.proxyClientURL = resp.Header.Get("X-Leader-Client-URL")
 					s.proxyPeerURL = resp.Header.Get("X-Leader-Peer-URL")
 				default:
@@ -614,8 +615,8 @@ func (s *PeerServer) monitorTimeoutThreshold(closeChan chan bool) {
 	}
 }
 
-// monitorActive periodically checks the status of cluster nodes and swaps them
-// out for proxies as needed.
+// monitorActive has the leader periodically check the status of cluster nodes
+// and swaps them out for proxies as needed.
 func (s *PeerServer) monitorActive(closeChan chan bool) {
 	for {
 		select {
@@ -641,6 +642,7 @@ func (s *PeerServer) monitorActive(closeChan chan bool) {
 		// If we have more active nodes than we should then demote.
 		if peerCount > activeSize {
 			peer := peers[rand.Intn(len(peers))]
+			log.Infof("%s: demoting: %v", s.Config.Name, peer)
 			if _, err := s.raftServer.Do(&DemoteCommand{Name: peer}); err != nil {
 				log.Infof("%s: warning: demotion error: %v", s.Config.Name, err)
 			}
@@ -676,7 +678,7 @@ func (s *PeerServer) monitorActive(closeChan chan bool) {
 	}
 }
 
-// monitorPeerActivity periodically checks for dead nodes and demotes them.
+// monitorPeerActivity has the leader periodically for dead nodes and demotes them.
 func (s *PeerServer) monitorPeerActivity(closeChan chan bool) {
 	for {
 		select {
@@ -708,7 +710,7 @@ func (s *PeerServer) monitorPeerActivity(closeChan chan bool) {
 	}
 }
 
-// Mode represents whether the server is an active peer or if the server is 
+// Mode represents whether the server is an active peer or if the server is
 // simply acting as a proxy.
 type Mode string