Browse Source

feat(standby_server): save/load syncInterval to disk

Yicheng Qin 11 years ago
parent
commit
35cc81e22f
2 changed files with 41 additions and 37 deletions
  1. 3 0
      config/config.go
  2. 38 37
      server/standby_server.go

+ 3 - 0
config/config.go

@@ -366,6 +366,9 @@ func (c *Config) Reset() error {
 	if err := os.RemoveAll(filepath.Join(c.DataDir, "snapshot")); err != nil {
 		return err
 	}
+	if err := os.RemoveAll(filepath.Join(c.DataDir, "standby_info")); err != nil {
+		return err
+	}
 
 	return nil
 }

+ 38 - 37
server/standby_server.go

@@ -18,7 +18,7 @@ import (
 	"github.com/coreos/etcd/store"
 )
 
-const clusterInfoName = "cluster_info"
+const standbyInfoName = "standby_info"
 
 type StandbyServerConfig struct {
 	Name       string
@@ -28,13 +28,17 @@ type StandbyServerConfig struct {
 	DataDir    string
 }
 
+type standbyInfo struct {
+	Cluster      []*machineMessage
+	SyncInterval float64
+}
+
 type StandbyServer struct {
 	Config StandbyServerConfig
 	client *Client
 
-	cluster      []*machineMessage
-	syncInterval float64
-	joinIndex    uint64
+	standbyInfo
+	joinIndex uint64
 
 	file     *os.File
 	recorded bool
@@ -49,16 +53,14 @@ type StandbyServer struct {
 
 func NewStandbyServer(config StandbyServerConfig, client *Client) (*StandbyServer, error) {
 	s := &StandbyServer{
-		Config:       config,
-		client:       client,
-		syncInterval: DefaultSyncInterval,
+		Config:      config,
+		client:      client,
+		standbyInfo: standbyInfo{SyncInterval: DefaultSyncInterval},
 	}
-	if err := s.openClusterInfo(); err != nil {
+	if err := s.openStandbyInfo(); err != nil {
 		return nil, fmt.Errorf("error open/create cluster info file: %v", err)
 	}
-	if clusterInfo, err := s.loadClusterInfo(); err == nil {
-		s.setCluster(clusterInfo)
-	}
+	s.loadStandbyInfo()
 	return s, nil
 }
 
@@ -92,7 +94,7 @@ func (s *StandbyServer) Stop() {
 	close(s.closeChan)
 	s.routineGroup.Wait()
 
-	if err := s.clearClusterInfo(); err != nil {
+	if err := s.clearStandbyInfo(); err != nil {
 		log.Warnf("error clearing cluster info for standby")
 	}
 }
@@ -111,20 +113,20 @@ func (s *StandbyServer) ClusterRecorded() bool {
 	return s.recorded
 }
 
-func (s *StandbyServer) Cluster() []string {
+func (s *StandbyServer) ClusterURLs() []string {
 	peerURLs := make([]string, 0)
-	for _, peer := range s.cluster {
+	for _, peer := range s.Cluster {
 		peerURLs = append(peerURLs, peer.PeerURL)
 	}
 	return peerURLs
 }
 
 func (s *StandbyServer) ClusterSize() int {
-	return len(s.cluster)
+	return len(s.Cluster)
 }
 
 func (s *StandbyServer) setCluster(cluster []*machineMessage) {
-	s.cluster = cluster
+	s.Cluster = cluster
 }
 
 func (s *StandbyServer) SyncCluster(peers []string) error {
@@ -133,20 +135,20 @@ func (s *StandbyServer) SyncCluster(peers []string) error {
 	}
 
 	if err := s.syncCluster(peers); err != nil {
-		log.Infof("fail syncing cluster(%v): %v", s.Cluster(), err)
+		log.Infof("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
 		return err
 	}
 
-	log.Infof("set cluster(%v) for standby server", s.Cluster())
+	log.Infof("set cluster(%v) for standby server", s.ClusterURLs())
 	return nil
 }
 
 func (s *StandbyServer) SetSyncInterval(second float64) {
-	s.syncInterval = second
+	s.SyncInterval = second
 }
 
 func (s *StandbyServer) ClusterLeader() *machineMessage {
-	for _, machine := range s.cluster {
+	for _, machine := range s.Cluster {
 		if machine.State == raft.Leader {
 			return machine
 		}
@@ -172,7 +174,7 @@ func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request)
 // failed, so it waits for the interval at the beginning.
 func (s *StandbyServer) monitorCluster() {
 	for {
-		timer := time.NewTimer(time.Duration(int64(s.syncInterval * float64(time.Second))))
+		timer := time.NewTimer(time.Duration(int64(s.SyncInterval * float64(time.Second))))
 		defer timer.Stop()
 		select {
 		case <-s.closeChan:
@@ -181,13 +183,13 @@ func (s *StandbyServer) monitorCluster() {
 		}
 
 		if err := s.syncCluster(nil); err != nil {
-			log.Warnf("fail syncing cluster(%v): %v", s.Cluster(), err)
+			log.Warnf("fail syncing cluster(%v): %v", s.ClusterURLs(), err)
 			continue
 		}
 
 		leader := s.ClusterLeader()
 		if leader == nil {
-			log.Warnf("fail getting leader from cluster(%v)", s.Cluster())
+			log.Warnf("fail getting leader from cluster(%v)", s.ClusterURLs())
 			continue
 		}
 
@@ -206,7 +208,7 @@ func (s *StandbyServer) monitorCluster() {
 }
 
 func (s *StandbyServer) syncCluster(peerURLs []string) error {
-	peerURLs = append(s.Cluster(), peerURLs...)
+	peerURLs = append(s.ClusterURLs(), peerURLs...)
 
 	for _, peerURL := range peerURLs {
 		// Fetch current peer list
@@ -224,7 +226,7 @@ func (s *StandbyServer) syncCluster(peerURLs []string) error {
 
 		s.setCluster(machines)
 		s.SetSyncInterval(config.SyncInterval)
-		if err := s.saveClusterInfo(); err != nil {
+		if err := s.saveStandbyInfo(); err != nil {
 			log.Warnf("fail saving cluster info into disk: %v", err)
 		}
 		return nil
@@ -250,8 +252,8 @@ func (s *StandbyServer) join(peer string) error {
 		log.Debugf("error getting cluster config")
 		return err
 	}
-	if clusterConfig.ActiveSize <= len(s.Cluster()) {
-		log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster()))
+	if clusterConfig.ActiveSize <= len(s.Cluster) {
+		log.Debugf("stop joining because the cluster is full with %d nodes", len(s.Cluster))
 		return fmt.Errorf("out of quota")
 	}
 
@@ -282,9 +284,9 @@ func (s *StandbyServer) fullPeerURL(urlStr string) string {
 	return u.String()
 }
 
-func (s *StandbyServer) openClusterInfo() error {
+func (s *StandbyServer) openStandbyInfo() error {
 	var err error
-	path := filepath.Join(s.Config.DataDir, clusterInfoName)
+	path := filepath.Join(s.Config.DataDir, standbyInfoName)
 	s.file, err = os.OpenFile(path, os.O_RDWR, 0600)
 	if err != nil {
 		if os.IsNotExist(err) {
@@ -295,23 +297,22 @@ func (s *StandbyServer) openClusterInfo() error {
 	return nil
 }
 
-func (s *StandbyServer) loadClusterInfo() ([]*machineMessage, error) {
-	clusterInfo := make([]*machineMessage, 0)
+func (s *StandbyServer) loadStandbyInfo() ([]*machineMessage, error) {
 	if _, err := s.file.Seek(0, os.SEEK_SET); err != nil {
 		return nil, err
 	}
-	if err := json.NewDecoder(s.file).Decode(&clusterInfo); err != nil {
+	if err := json.NewDecoder(s.file).Decode(&s.standbyInfo); err != nil {
 		return nil, err
 	}
 	s.recorded = true
-	return clusterInfo, nil
+	return s.standbyInfo.Cluster, nil
 }
 
-func (s *StandbyServer) saveClusterInfo() error {
-	if err := s.clearClusterInfo(); err != nil {
+func (s *StandbyServer) saveStandbyInfo() error {
+	if err := s.clearStandbyInfo(); err != nil {
 		return nil
 	}
-	if err := json.NewEncoder(s.file).Encode(s.cluster); err != nil {
+	if err := json.NewEncoder(s.file).Encode(s.standbyInfo); err != nil {
 		return err
 	}
 	if err := s.file.Sync(); err != nil {
@@ -321,7 +322,7 @@ func (s *StandbyServer) saveClusterInfo() error {
 	return nil
 }
 
-func (s *StandbyServer) clearClusterInfo() error {
+func (s *StandbyServer) clearStandbyInfo() error {
 	if _, err := s.file.Seek(0, os.SEEK_SET); err != nil {
 		return err
 	}