Browse Source

Add automatic node promotion / demotion.

Ben Johnson 11 years ago
parent
commit
fddbf35df2
3 changed files with 77 additions and 5 deletions
  1. 49 0
      server/demote_command.go
  2. 2 2
      server/peer_server.go
  3. 26 3
      tests/functional/proxy_test.go

+ 49 - 0
server/demote_command.go

@@ -0,0 +1,49 @@
+package server
+
+import (
+	"github.com/coreos/etcd/log"
+	"github.com/coreos/etcd/third_party/github.com/coreos/raft"
+)
+
+func init() {
+	raft.RegisterCommand(&DemoteCommand{})
+}
+
+// DemoteCommand represents a command to change a peer to a proxy.
+type DemoteCommand struct {
+	Name string `json:"name"`
+}
+
+// CommandName returns the name of the command.
+func (c *DemoteCommand) CommandName() string {
+	return "etcd:demote"
+}
+
+// Apply executes the command.
+func (c *DemoteCommand) Apply(context raft.Context) (interface{}, error) {
+	ps, _ := context.Server().Context().(*PeerServer)
+
+	// Save URLs.
+	clientURL, _ := ps.registry.ClientURL(c.Name)
+	peerURL, _ := ps.registry.PeerURL(c.Name)
+
+	// Perform a removal.
+	(&RemoveCommand{Name: c.Name}).Apply(context)
+
+	// Register node as a proxy.
+	ps.registry.RegisterProxy(c.Name, peerURL, clientURL)
+
+	// Update mode if this change applies to this server.
+	if c.Name == ps.Config.Name {
+		log.Infof("Set mode after demotion: %s", c.Name)
+		ps.SetMode(ProxyMode)
+	}
+
+	return nil, nil
+}
+
+// NodeName returns the name of the affected node.
+func (c *DemoteCommand) NodeName() string {
+	return c.Name
+}
+

+ 2 - 2
server/peer_server.go

@@ -26,7 +26,7 @@ import (
 )
 
 const ThresholdMonitorTimeout = 5 * time.Second
-const ActiveMonitorTimeout = 5 * time.Second
+const ActiveMonitorTimeout = 1 * time.Second
 
 type PeerServerConfig struct {
 	Name           string
@@ -644,7 +644,7 @@ func (s *PeerServer) monitorActive(closeChan chan bool) {
 		if peerCount > activeSize {
 			peer := peers[rand.Intn(len(peers))]
 			fmt.Println("active.demote»", peer)
-			if _, err := s.raftServer.Do(&RemoveCommand{Name: peer}); err != nil {
+			if _, err := s.raftServer.Do(&DemoteCommand{Name: peer}); err != nil {
 				log.Infof("%s: warning: demotion error: %v", s.Config.Name, err)
 			}
 			continue

+ 26 - 3
tests/functional/proxy_test.go

@@ -45,7 +45,10 @@ func TestProxy(t *testing.T) {
 		}
 	}
 
-	time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
+	// Verify that we have one proxy.
+	result, err := c.Get("_etcd/proxies", false, true)
+	assert.NoError(t, err)
+	assert.Equal(t, len(result.Node.Nodes), 1)
 
 	// Reconfigure with larger active size (10 nodes) and wait for promotion.
 	resp, _ := tests.Put("http://localhost:7001/config", "application/json", bytes.NewBufferString(`{"activeSize":10, "promoteDelay":1800}`))
@@ -56,6 +59,26 @@ func TestProxy(t *testing.T) {
 	time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
 
 	// Verify that the proxy node is now a peer.
-	fmt.Println("CHECK!")
-	time.Sleep(30 * time.Second)
+	result, err = c.Get("_etcd/proxies", false, true)
+	assert.NoError(t, err)
+	assert.Equal(t, len(result.Node.Nodes), 0)
+
+	// Reconfigure with a smaller active size (8 nodes).
+	resp, _ = tests.Put("http://localhost:7001/config", "application/json", bytes.NewBufferString(`{"activeSize":8, "promoteDelay":1800}`))
+	if !assert.Equal(t, resp.StatusCode, 200) {
+		t.FailNow()
+	}
+
+	// Wait for two monitor cycles before checking for demotion.
+	time.Sleep((2 * server.ActiveMonitorTimeout) + (1 * time.Second))
+
+	// Verify that we now have eight peers.
+	result, err = c.Get("_etcd/machines", false, true)
+	assert.NoError(t, err)
+	assert.Equal(t, len(result.Node.Nodes), 8)
+
+	// Verify that we now have two proxies.
+	result, err = c.Get("_etcd/proxies", false, true)
+	assert.NoError(t, err)
+	assert.Equal(t, len(result.Node.Nodes), 2)
 }