Browse Source

feat(option): add cluster config option

It will be used when creating a brand-new cluster.
Yicheng Qin 11 years ago
parent
commit
c6b1a738c3
4 changed files with 43 additions and 7 deletions
  1. 23 0
      config/config.go
  2. 14 0
      config/config_test.go
  3. 1 1
      etcd/etcd.go
  4. 5 6
      server/peer_server.go

+ 23 - 0
config/config.go

@@ -85,6 +85,11 @@ type Config struct {
 	}
 	strTrace     string `toml:"trace" env:"ETCD_TRACE"`
 	GraphiteHost string `toml:"graphite_host" env:"ETCD_GRAPHITE_HOST"`
+	Cluster      struct {
+		ActiveSize   int `toml:"active_size" env:"ETCD_CLUSTER_ACTIVE_SIZE"`
+		RemoveDelay  int `toml:"remove_delay" env:"ETCD_CLUSTER_REMOVE_DELAY"`
+		SyncInterval int `toml:"sync_interval" env:"ETCD_CLUSTER_SYNC_INTERVAL"`
+	}
 }
 
 // New returns a Config initialized with default values.
@@ -103,6 +108,9 @@ func New() *Config {
 	rand.Seed(time.Now().UTC().UnixNano())
 	// Make maximum twice as minimum.
 	c.RetryInterval = float64(50+rand.Int()%50) * defaultHeartbeatInterval / 1000
+	c.Cluster.ActiveSize = server.DefaultActiveSize
+	c.Cluster.RemoveDelay = server.DefaultRemoveDelay
+	c.Cluster.SyncInterval = server.DefaultSyncInterval
 	return c
 }
 
@@ -167,6 +175,9 @@ func (c *Config) LoadEnv() error {
 	if err := c.loadEnv(&c.Peer); err != nil {
 		return err
 	}
+	if err := c.loadEnv(&c.Cluster); err != nil {
+		return err
+	}
 	return nil
 }
 
@@ -253,6 +264,10 @@ func (c *Config) LoadFlags(arguments []string) error {
 	f.StringVar(&c.strTrace, "trace", "", "")
 	f.StringVar(&c.GraphiteHost, "graphite-host", "", "")
 
+	f.IntVar(&c.Cluster.ActiveSize, "cluster-active-size", c.Cluster.ActiveSize, "")
+	f.IntVar(&c.Cluster.RemoveDelay, "cluster-remove-delay", c.Cluster.RemoveDelay, "")
+	f.IntVar(&c.Cluster.SyncInterval, "cluster-sync-interval", c.Cluster.SyncInterval, "")
+
 	// BEGIN IGNORED FLAGS
 	f.StringVar(&path, "config", "", "")
 	// BEGIN IGNORED FLAGS
@@ -409,6 +424,14 @@ func (c *Config) Trace() bool {
 	return c.strTrace == "*"
 }
 
+func (c *Config) ClusterConfig() *server.ClusterConfig {
+	return &server.ClusterConfig{
+		ActiveSize:   c.Cluster.ActiveSize,
+		RemoveDelay:  c.Cluster.RemoveDelay,
+		SyncInterval: c.Cluster.SyncInterval,
+	}
+}
+
 // sanitizeURL will cleanup a host string in the format hostname[:port] and
 // attach a schema.
 func sanitizeURL(host string, defaultScheme string) (string, error) {

+ 14 - 0
config/config_test.go

@@ -37,6 +37,11 @@ func TestConfigTOML(t *testing.T) {
 		cert_file = "/tmp/peer/file.cert"
 		key_file = "/tmp/peer/file.key"
 		bind_addr = "127.0.0.1:7003"
+
+		[cluster]
+		active_size = 5
+		remove_delay = 100
+		sync_interval = 10
 	`
 	c := New()
 	_, err := toml.Decode(content, &c)
@@ -62,6 +67,9 @@ func TestConfigTOML(t *testing.T) {
 	assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "")
 	assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "")
 	assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:7003", "")
+	assert.Equal(t, c.Cluster.ActiveSize, 5, "")
+	assert.Equal(t, c.Cluster.RemoveDelay, 100, "")
+	assert.Equal(t, c.Cluster.SyncInterval, 10, "")
 }
 
 // Ensures that a configuration can be retrieved from environment variables.
@@ -88,6 +96,9 @@ func TestConfigEnv(t *testing.T) {
 	os.Setenv("ETCD_PEER_CERT_FILE", "/tmp/peer/file.cert")
 	os.Setenv("ETCD_PEER_KEY_FILE", "/tmp/peer/file.key")
 	os.Setenv("ETCD_PEER_BIND_ADDR", "127.0.0.1:7003")
+	os.Setenv("ETCD_CLUSTER_ACTIVE_SIZE", "5")
+	os.Setenv("ETCD_CLUSTER_REMOVE_DELAY", "100")
+	os.Setenv("ETCD_CLUSTER_SYNC_INTERVAL", "10")
 
 	c := New()
 	c.LoadEnv()
@@ -111,6 +122,9 @@ func TestConfigEnv(t *testing.T) {
 	assert.Equal(t, c.Peer.CertFile, "/tmp/peer/file.cert", "")
 	assert.Equal(t, c.Peer.KeyFile, "/tmp/peer/file.key", "")
 	assert.Equal(t, c.Peer.BindAddr, "127.0.0.1:7003", "")
+	assert.Equal(t, c.Cluster.ActiveSize, 5, "")
+	assert.Equal(t, c.Cluster.RemoveDelay, 100, "")
+	assert.Equal(t, c.Cluster.SyncInterval, 10, "")
 
 	// Clear this as it will mess up other tests
 	os.Setenv("ETCD_DISCOVERY", "")

+ 1 - 1
etcd/etcd.go

@@ -298,7 +298,7 @@ func (e *Etcd) runServer() {
 			// If not, it may leave many requests unaccepted, or cannot receive heartbeat from the cluster.
 			// One severe problem caused if failing receiving heartbeats is when the second node joins one-node cluster,
 			// the cluster could be out of work as long as the two nodes cannot transfer messages.
-			e.PeerServer.Start(e.Config.Snapshot)
+			e.PeerServer.Start(e.Config.Snapshot, e.Config.ClusterConfig())
 			removeNotify = e.PeerServer.RemoveNotify()
 		} else {
 			log.Infof("%v starts to run in standby mode", e.Config.Name)

+ 5 - 6
server/peer_server.go

@@ -247,7 +247,7 @@ func (s *PeerServer) FindCluster(discoverURL string, peers []string) (toStart bo
 
 // Start starts the raft server.
 // The function assumes that join has been accepted successfully.
-func (s *PeerServer) Start(snapshot bool) error {
+func (s *PeerServer) Start(snapshot bool, clusterConfig *ClusterConfig) error {
 	s.Lock()
 	defer s.Unlock()
 	if s.started {
@@ -260,7 +260,7 @@ func (s *PeerServer) Start(snapshot bool) error {
 
 	s.raftServer.Start()
 	if s.isNewCluster {
-		s.InitNewCluster()
+		s.InitNewCluster(clusterConfig)
 		s.isNewCluster = false
 	}
 
@@ -401,7 +401,7 @@ func (s *PeerServer) SetServer(server *Server) {
 	s.server = server
 }
 
-func (s *PeerServer) InitNewCluster() {
+func (s *PeerServer) InitNewCluster(clusterConfig *ClusterConfig) {
 	// leader need to join self as a peer
 	s.doCommand(&JoinCommand{
 		MinVersion: store.MinVersion(),
@@ -413,9 +413,8 @@ func (s *PeerServer) InitNewCluster() {
 	log.Debugf("%s start as a leader", s.Config.Name)
 	s.joinIndex = 1
 
-	conf := NewClusterConfig()
-	s.doCommand(&SetClusterConfigCommand{Config: conf})
-	log.Debugf("%s sets cluster config as %v", s.Config.Name, conf)
+	s.doCommand(&SetClusterConfigCommand{Config: clusterConfig})
+	log.Debugf("%s sets cluster config as %v", s.Config.Name, clusterConfig)
 }
 
 func (s *PeerServer) doCommand(cmd raft.Command) {