Browse Source

Proxy promotion.

Ben Johnson 11 years ago
parent
commit
f5698d3566

+ 0 - 4
config/config.go

@@ -60,7 +60,6 @@ type Config struct {
 	KeyFile          string   `toml:"key_file" env:"ETCD_KEY_FILE"`
 	Peers            []string `toml:"peers" env:"ETCD_PEERS"`
 	PeersFile        string   `toml:"peers_file" env:"ETCD_PEERS_FILE"`
-	MaxClusterSize   int      `toml:"max_cluster_size" env:"ETCD_MAX_CLUSTER_SIZE"`
 	MaxResultBuffer  int      `toml:"max_result_buffer" env:"ETCD_MAX_RESULT_BUFFER"`
 	MaxRetryAttempts int      `toml:"max_retry_attempts" env:"ETCD_MAX_RETRY_ATTEMPTS"`
 	RetryInterval    float64  `toml:"retry_interval" env:"ETCD_RETRY_INTERVAL"`
@@ -90,7 +89,6 @@ func New() *Config {
 	c := new(Config)
 	c.SystemPath = DefaultSystemConfigPath
 	c.Addr = "127.0.0.1:4001"
-	c.MaxClusterSize = 9
 	c.MaxResultBuffer = 1024
 	c.MaxRetryAttempts = 3
 	c.RetryInterval = 10.0
@@ -247,7 +245,6 @@ func (c *Config) LoadFlags(arguments []string) error {
 	f.IntVar(&c.MaxResultBuffer, "max-result-buffer", c.MaxResultBuffer, "")
 	f.IntVar(&c.MaxRetryAttempts, "max-retry-attempts", c.MaxRetryAttempts, "")
 	f.Float64Var(&c.RetryInterval, "retry-interval", c.RetryInterval, "")
-	f.IntVar(&c.MaxClusterSize, "max-cluster-size", c.MaxClusterSize, "")
 	f.IntVar(&c.Peer.HeartbeatTimeout, "peer-heartbeat-timeout", c.Peer.HeartbeatTimeout, "")
 	f.IntVar(&c.Peer.ElectionTimeout, "peer-election-timeout", c.Peer.ElectionTimeout, "")
 
@@ -281,7 +278,6 @@ func (c *Config) LoadFlags(arguments []string) error {
 	f.StringVar(&c.DataDir, "d", c.DataDir, "(deprecated)")
 	f.IntVar(&c.MaxResultBuffer, "m", c.MaxResultBuffer, "(deprecated)")
 	f.IntVar(&c.MaxRetryAttempts, "r", c.MaxRetryAttempts, "(deprecated)")
-	f.IntVar(&c.MaxClusterSize, "maxsize", c.MaxClusterSize, "(deprecated)")
 	f.IntVar(&c.SnapshotCount, "snapshotCount", c.SnapshotCount, "(deprecated)")
 	// END DEPRECATED FLAGS
 

+ 0 - 37
config/config_test.go

@@ -51,7 +51,6 @@ func TestConfigTOML(t *testing.T) {
 	assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "")
 	assert.Equal(t, c.Peers, []string{"coreos.com:4001", "coreos.com:4002"}, "")
 	assert.Equal(t, c.PeersFile, "/tmp/peers", "")
-	assert.Equal(t, c.MaxClusterSize, 10, "")
 	assert.Equal(t, c.MaxResultBuffer, 512, "")
 	assert.Equal(t, c.MaxRetryAttempts, 5, "")
 	assert.Equal(t, c.Name, "test-name", "")
@@ -101,7 +100,6 @@ func TestConfigEnv(t *testing.T) {
 	assert.Equal(t, c.BindAddr, "127.0.0.1:4003", "")
 	assert.Equal(t, c.Peers, []string{"coreos.com:4001", "coreos.com:4002"}, "")
 	assert.Equal(t, c.PeersFile, "/tmp/peers", "")
-	assert.Equal(t, c.MaxClusterSize, 10, "")
 	assert.Equal(t, c.MaxResultBuffer, 512, "")
 	assert.Equal(t, c.MaxRetryAttempts, 5, "")
 	assert.Equal(t, c.Name, "test-name", "")
@@ -281,21 +279,6 @@ func TestConfigPeersFileFlag(t *testing.T) {
 	assert.Equal(t, c.PeersFile, "/tmp/peers", "")
 }
 
-// Ensures that the Max Cluster Size can be parsed from the environment.
-func TestConfigMaxClusterSizeEnv(t *testing.T) {
-	withEnv("ETCD_MAX_CLUSTER_SIZE", "5", func(c *Config) {
-		assert.Nil(t, c.LoadEnv(), "")
-		assert.Equal(t, c.MaxClusterSize, 5, "")
-	})
-}
-
-// Ensures that a the Max Cluster Size flag can be parsed.
-func TestConfigMaxClusterSizeFlag(t *testing.T) {
-	c := New()
-	assert.Nil(t, c.LoadFlags([]string{"-max-cluster-size", "5"}), "")
-	assert.Equal(t, c.MaxClusterSize, 5, "")
-}
-
 // Ensures that the Max Result Buffer can be parsed from the environment.
 func TestConfigMaxResultBufferEnv(t *testing.T) {
 	withEnv("ETCD_MAX_RESULT_BUFFER", "512", func(c *Config) {
@@ -600,26 +583,6 @@ func TestConfigDeprecatedPeersFileFlag(t *testing.T) {
 	assert.Equal(t, stderr, "[deprecated] use -peers-file, not -CF\n", "")
 }
 
-func TestConfigDeprecatedMaxClusterSizeFlag(t *testing.T) {
-	_, stderr := capture(func() {
-		c := New()
-		err := c.LoadFlags([]string{"-maxsize", "5"})
-		assert.NoError(t, err)
-		assert.Equal(t, c.MaxClusterSize, 5, "")
-	})
-	assert.Equal(t, stderr, "[deprecated] use -max-cluster-size, not -maxsize\n", "")
-}
-
-func TestConfigDeprecatedMaxResultBufferFlag(t *testing.T) {
-	_, stderr := capture(func() {
-		c := New()
-		err := c.LoadFlags([]string{"-m", "512"})
-		assert.NoError(t, err)
-		assert.Equal(t, c.MaxResultBuffer, 512, "")
-	})
-	assert.Equal(t, stderr, "[deprecated] use -max-result-buffer, not -m\n", "")
-}
-
 func TestConfigDeprecatedMaxRetryAttemptsFlag(t *testing.T) {
 	_, stderr := capture(func() {
 		c := New()

+ 0 - 1
etcd.go

@@ -120,7 +120,6 @@ func main() {
 		Scheme:         config.PeerTLSInfo().Scheme(),
 		URL:            config.Peer.Addr,
 		SnapshotCount:  config.SnapshotCount,
-		MaxClusterSize: config.MaxClusterSize,
 		RetryTimes:     config.MaxRetryAttempts,
 		RetryInterval:  config.RetryInterval,
 	}

+ 1 - 1
server/cluster_config.go

@@ -21,7 +21,7 @@ type ClusterConfig struct {
 
 	// PromoteDelay is the amount of time, in seconds, after a node is
 	// unreachable that it will be swapped out for a proxy node, if available.
-	PromoteDelay int `json:"PromoteDelay"`
+	PromoteDelay int `json:"promoteDelay"`
 }
 
 // NewClusterConfig returns a cluster configuration with default settings.

+ 5 - 0
server/join_command.go

@@ -62,6 +62,11 @@ func (c *JoinCommand) Apply(context raft.Context) (interface{}, error) {
 		return buf.Bytes(), nil
 	}
 
+	// Remove it as a proxy if it is one.
+	if ps.registry.ProxyExists(c.Name) {
+		ps.registry.UnregisterProxy(c.Name)
+	}
+
 	// Add to shared peer registry.
 	ps.registry.RegisterPeer(c.Name, c.RaftURL, c.EtcdURL)
 

+ 29 - 2
server/peer_server.go

@@ -141,7 +141,6 @@ func (s *PeerServer) ClusterConfig() *ClusterConfig {
 // SetClusterConfig updates the current cluster configuration.
 // Adjusting the active size will 
 func (s *PeerServer) SetClusterConfig(c *ClusterConfig) error {
-	prevActiveSize := s.clusterConfig.ActiveSize
 	s.clusterConfig = c
 
 	// Validate configuration.
@@ -294,9 +293,10 @@ func (s *PeerServer) HTTPHandler() http.Handler {
 	router.HandleFunc("/version/{version:[0-9]+}/check", s.VersionCheckHttpHandler)
 	router.HandleFunc("/upgrade", s.UpgradeHttpHandler)
 	router.HandleFunc("/join", s.JoinHttpHandler)
+	router.HandleFunc("/promote", s.PromoteHttpHandler).Methods("POST")
 	router.HandleFunc("/remove/{name:.+}", s.RemoveHttpHandler)
 	router.HandleFunc("/config", s.getClusterConfigHttpHandler).Methods("GET")
-	router.HandleFunc("/config", s.setClusterConfigHttpHandler).Methods("POST")
+	router.HandleFunc("/config", s.setClusterConfigHttpHandler).Methods("PUT")
 	router.HandleFunc("/vote", s.VoteHttpHandler)
 	router.HandleFunc("/log", s.GetLogHttpHandler)
 	router.HandleFunc("/log/append", s.AppendEntriesHttpHandler)
@@ -632,18 +632,45 @@ func (s *PeerServer) monitorActive(closeChan chan bool) {
 		peerCount := s.registry.PeerCount()
 		proxies := s.registry.Proxies()
 		peers := s.registry.Peers()
+		fmt.Println("active.3»", peers)
 		if index := sort.SearchStrings(peers, s.Config.Name); index < len(peers) && peers[index] == s.Config.Name {
 			peers = append(peers[:index], peers[index+1:]...)
 		}
 
+		fmt.Println("active.1»", activeSize, peerCount)
+		fmt.Println("active.2»", proxies)
+
 		// If we have more active nodes than we should then demote.
 		if peerCount > activeSize {
 			peer := peers[rand.Intn(len(peers))]
+			fmt.Println("active.demote»", peer)
 			if _, err := s.raftServer.Do(&RemoveCommand{Name: peer}); err != nil {
 				log.Infof("%s: warning: demotion error: %v", s.Config.Name, err)
 			}
 			continue
 		}
+
+		// If we don't have enough active nodes then try to promote a proxy.
+		if peerCount < activeSize && len(proxies) > 0 {
+			proxy := proxies[rand.Intn(len(proxies))]
+			proxyPeerURL, _ := s.registry.ProxyPeerURL(proxy)
+			log.Infof("%s: promoting: %v (%s)", s.Config.Name, proxy, proxyPeerURL)
+
+			// Notify proxy to promote itself.
+			client := &http.Client{
+				Transport: &http.Transport{
+					DisableKeepAlives: false,
+					ResponseHeaderTimeout: ActiveMonitorTimeout,
+				},
+			}
+			resp, err := client.Post(fmt.Sprintf("%s/promote", proxyPeerURL), "application/json", nil)
+			if err != nil {
+				log.Infof("%s: warning: promotion error: %v", s.Config.Name, err)
+			} else if resp.StatusCode != http.StatusOK {
+				log.Infof("%s: warning: promotion failure: %v", s.Config.Name, resp.StatusCode)
+			}
+			continue
+		}
 	}
 }
 

+ 20 - 0
server/peer_server_handlers.go

@@ -3,6 +3,7 @@ package server
 import (
 	"encoding/json"
 	"net/http"
+	"net/url"
 	"strconv"
 	"time"
 
@@ -171,6 +172,25 @@ func (ps *PeerServer) JoinHttpHandler(w http.ResponseWriter, req *http.Request)
 	}
 }
 
+// Attempt to rejoin the cluster as a peer.
+func (ps *PeerServer) PromoteHttpHandler(w http.ResponseWriter, req *http.Request) {
+	log.Infof("%s attempting to promote in cluster: %s", ps.Config.Name, ps.proxyPeerURL)
+	url, err := url.Parse(ps.proxyPeerURL)
+	if err != nil {
+		w.WriteHeader(http.StatusInternalServerError)
+		return
+	}
+
+	err = ps.joinByPeer(ps.raftServer, url.Host, ps.Config.Scheme)
+	if err != nil {
+		log.Infof("%s error while promoting: %v", ps.Config.Name, err)
+		w.WriteHeader(http.StatusInternalServerError)
+		return
+	}
+	log.Infof("%s promoted in the cluster", ps.Config.Name)
+	w.WriteHeader(http.StatusOK)
+}
+
 // Response to remove request
 func (ps *PeerServer) RemoveHttpHandler(w http.ResponseWriter, req *http.Request) {
 	if req.Method != "DELETE" {

+ 0 - 64
server/promote_command.go

@@ -1,64 +0,0 @@
-package server
-
-import (
-	"github.com/coreos/etcd/log"
-	"github.com/coreos/etcd/third_party/github.com/coreos/raft"
-)
-
-func init() {
-	raft.RegisterCommand(&PromoteCommand{})
-}
-
-// PromoteCommand represents a Raft command for converting a proxy to a peer.
-type PromoteCommand struct {
-	Name string `json:"name"`
-}
-
-// CommandName returns the name of the command.
-func (c *PromoteCommand) CommandName() string {
-	return "etcd:promote"
-}
-
-// Apply promotes a named proxy to a peer.
-func (c *PromoteCommand) Apply(context raft.Context) (interface{}, error) {
-	ps, _ := context.Server().Context().(*PeerServer)
-	config := ps.ClusterConfig()
-
-	// If cluster size is larger than max cluster size then return an error.
-	if ps.registry.PeerCount() >= config.ActiveSize {
-		return etcdErr.NewError(etcdErr.EcodePromoteError, "", 0)
-	}
-
-	// If proxy doesn't exist then return an error.
-	if !ps.registry.ProxyExists(c.Name) {
-		return etcdErr.NewError(etcdErr.EcodePromoteError, "", 0)
-	}
-
-	// Retrieve proxy settings.
-	proxyClientURL := ps.registry.ProxyClientURL()
-	proxyPeerURL := ps.registry.ProxyPeerURL()
-
-	// Remove from registry as a proxy.
-	if err := ps.registry.UnregisterProxy(c.Name); err != nil {
-		log.Info("Cannot remove proxy: ", c.Name)
-		return nil, err
-	}
-
-	// Add to shared peer registry.
-	ps.registry.RegisterPeer(c.Name, c.RaftURL, c.EtcdURL)
-
-	// Add peer in raft
-	err := context.Server().AddPeer(c.Name, "")
-
-	// Add peer stats
-	if c.Name != ps.RaftServer().Name() {
-		ps.followersStats.Followers[c.Name] = &raftFollowerStats{}
-		ps.followersStats.Followers[c.Name].Latency.Minimum = 1 << 63
-	}
-
-	return nil, err
-}
-
-func (c *JoinCommand) NodeName() string {
-	return c.Name
-}

+ 22 - 7
server/registry.go

@@ -5,6 +5,7 @@ import (
 	"net/url"
 	"path"
 	"path/filepath"
+	"sort"
 	"strings"
 	"sync"
 
@@ -48,6 +49,7 @@ func (r *Registry) Peers() []string {
 	for name, _ := range r.peers {
 		names = append(names, name)
 	}
+	sort.Sort(sort.StringSlice(names))
 	return names
 }
 
@@ -57,6 +59,7 @@ func (r *Registry) Proxies() []string {
 	for name, _ := range r.proxies {
 		names = append(names, name)
 	}
+	sort.Sort(sort.StringSlice(names))
 	return names
 }
 
@@ -70,7 +73,11 @@ func (r *Registry) RegisterPeer(name string, peerURL string, machURL string) err
 // RegisterProxy adds a proxy to the registry.
 func (r *Registry) RegisterProxy(name string, peerURL string, machURL string) error {
 	// TODO(benbjohnson): Disallow proxies that are already peers.
-	return r.register(RegistryProxyKey, name, peerURL, machURL)
+	if err := r.register(RegistryProxyKey, name, peerURL, machURL); err != nil {
+		return err
+	}
+	r.proxies[name] = r.load(RegistryProxyKey, name)
+	return nil
 }
 
 func (r *Registry) register(key, name string, peerURL string, machURL string) error {
@@ -153,7 +160,9 @@ func (r *Registry) ClientURL(name string) (string, bool) {
 
 func (r *Registry) clientURL(key, name string) (string, bool) {
 	if r.peers[name] == nil {
-		r.peers[name] = r.load(key, name)
+		if node := r.load(key, name); node != nil {
+			r.peers[name] = node
+		}
 	}
 
 	if node := r.peers[name]; node != nil {
@@ -184,7 +193,9 @@ func (r *Registry) PeerURL(name string) (string, bool) {
 
 func (r *Registry) peerURL(key, name string) (string, bool) {
 	if r.peers[name] == nil {
-		r.peers[name] = r.load(key, name)
+		if node := r.load(key, name); node != nil {
+			r.peers[name] = node
+		}
 	}
 
 	if node := r.peers[name]; node != nil {
@@ -203,7 +214,9 @@ func (r *Registry) ProxyClientURL(name string) (string, bool) {
 
 func (r *Registry) proxyClientURL(key, name string) (string, bool) {
 	if r.proxies[name] == nil {
-		r.proxies[name] = r.load(key, name)
+		if node := r.load(key, name); node != nil {
+			r.proxies[name] = node
+		}
 	}
 	if node := r.proxies[name]; node != nil {
 		return node.url, true
@@ -215,12 +228,14 @@ func (r *Registry) proxyClientURL(key, name string) (string, bool) {
 func (r *Registry) ProxyPeerURL(name string) (string, bool) {
 	r.Lock()
 	defer r.Unlock()
-	return r.proxyPeerURL(RegistryProxyKey,name)
+	return r.proxyPeerURL(RegistryProxyKey, name)
 }
 
 func (r *Registry) proxyPeerURL(key, name string) (string, bool) {
 	if r.proxies[name] == nil {
-		r.proxies[name] = r.load(key, name)
+		if node := r.load(key, name); node != nil {
+			r.proxies[name] = node
+		}
 	}
 	if node := r.proxies[name]; node != nil {
 		return node.peerURL, true
@@ -278,7 +293,7 @@ func (r *Registry) load(key, name string) *node {
 	}
 
 	// Retrieve from store.
-	e, err := r.store.Get(path.Join(RegistryPeerKey, name), false, false)
+	e, err := r.store.Get(path.Join(key, name), false, false)
 	if err != nil {
 		return nil
 	}

+ 29 - 0
tests/functional/cluster_config_test.go

@@ -0,0 +1,29 @@
+package test
+
+import (
+	"bytes"
+	"os"
+	"testing"
+	"time"
+
+	"github.com/coreos/etcd/tests"
+	"github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
+)
+
+// Ensure that the cluster configuration can be updated.
+func TestClusterConfig(t *testing.T) {
+	_, etcds, err := CreateCluster(3, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
+	assert.NoError(t, err)
+	defer DestroyCluster(etcds)
+
+	resp, _ := tests.Put("http://localhost:7001/config", "application/json", bytes.NewBufferString(`{"activeSize":3, "promoteDelay":60}`))
+	assert.Equal(t, resp.StatusCode, 200)
+
+	time.Sleep(1 * time.Second)
+
+	resp, _ = tests.Get("http://localhost:7002/config")
+	body := tests.ReadBodyJSON(resp)
+	assert.Equal(t, resp.StatusCode, 200)
+	assert.Equal(t, body["activeSize"], 3)
+	assert.Equal(t, body["promoteDelay"], 60)
+}

+ 17 - 1
tests/functional/proxy_test.go

@@ -1,11 +1,13 @@
 package test
 
 import (
+	"bytes"
 	"fmt"
 	"os"
 	"testing"
 	"time"
 
+	"github.com/coreos/etcd/server"
 	"github.com/coreos/etcd/tests"
 	"github.com/coreos/etcd/third_party/github.com/coreos/go-etcd/etcd"
 	"github.com/coreos/etcd/third_party/github.com/stretchr/testify/assert"
@@ -13,7 +15,7 @@ import (
 
 // Create a full cluster and then add extra an extra proxy node.
 func TestProxy(t *testing.T) {
-	clusterSize := 10 // MaxClusterSize + 1
+	clusterSize := 10 // DefaultActiveSize + 1
 	_, etcds, err := CreateCluster(clusterSize, &os.ProcAttr{Files: []*os.File{nil, os.Stdout, os.Stderr}}, false)
 	assert.NoError(t, err)
 	defer DestroyCluster(etcds)
@@ -42,4 +44,18 @@ func TestProxy(t *testing.T) {
 			}
 		}
 	}
+
+	time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
+
+	// Reconfigure with larger active size (10 nodes) and wait for promotion.
+	resp, _ := tests.Put("http://localhost:7001/config", "application/json", bytes.NewBufferString(`{"activeSize":10, "promoteDelay":1800}`))
+	if !assert.Equal(t, resp.StatusCode, 200) {
+		t.FailNow()
+	}
+
+	time.Sleep(server.ActiveMonitorTimeout + (1 * time.Second))
+
+	// Verify that the proxy node is now a peer.
+	fmt.Println("CHECK!")
+	time.Sleep(30 * time.Second)
 }

+ 0 - 1
tests/server_utils.go

@@ -39,7 +39,6 @@ func RunServer(f func(*server.Server)) {
 		URL:		"http://" + testRaftURL,
 		Scheme:		"http",
 		SnapshotCount:	testSnapshotCount,
-		MaxClusterSize:	9,
 	}
 
 	mb := metrics.NewBucket("")