Browse Source

feat(standby_server): write cluster info to disk

For better fault tolerance and availability.
Yicheng Qin 11 years ago
parent
commit
b7d9fdbd39

+ 5 - 5
Documentation/design/standbys.md

@@ -33,6 +33,9 @@ After each interval, standbys synchronize information with cluster.
 #### Main logic
 #### Main logic
 
 
 ```
 ```
+If find existing standby cluster info:
+  Goto standby loop
+
 Find cluster as required
 Find cluster as required
 If determine to start peer server:
 If determine to start peer server:
   Goto peer loop
   Goto peer loop
@@ -74,7 +77,6 @@ return true
 **Note**
 **Note**
 1. [TODO] The running mode cannot be determined by log, because the log may be outdated. But the log could be used to estimate its state.
 1. [TODO] The running mode cannot be determined by log, because the log may be outdated. But the log could be used to estimate its state.
 2. Even if sync cluster fails, it will restart still for recovery from full outage.
 2. Even if sync cluster fails, it will restart still for recovery from full outage.
-3. [BUG] Possible peers from discover URL, peers flag and data dir could be outdated because standby machine doesn't record log. This could make reconnect fail if the whole cluster migrates to new address.
 
 
 
 
 #### Peer mode start logic
 #### Peer mode start logic
@@ -100,11 +102,12 @@ When removed from the cluster:
 Loop:
 Loop:
   Sleep for some time
   Sleep for some time
 
 
-  Sync cluster
+  Sync cluster, and write cluster info into disk
 
 
   If peer count < active size:
   If peer count < active size:
     Send join request
     Send join request
     If succeed:
     If succeed:
+      Clear cluster info from disk
       Return
       Return
 ```
 ```
 
 
@@ -192,9 +195,6 @@ Machines in peer mode recover heartbeat between each other.
 
 
 Machines in standby mode always sync the cluster. If sync fails, it uses the first address from data log as redirect target.
 Machines in standby mode always sync the cluster. If sync fails, it uses the first address from data log as redirect target.
 
 
-**Note**
-1. [TODO] Machine which runs in standby mode and has no log data cannot be recovered. But it could join the cluster finally if it is restarted always.
-
 
 
 ### Kill one peer machine
 ### Kill one peer machine
 
 

+ 15 - 8
etcd/etcd.go

@@ -229,22 +229,29 @@ func (e *Etcd) Run() {
 		PeerScheme: e.Config.PeerTLSInfo().Scheme(),
 		PeerScheme: e.Config.PeerTLSInfo().Scheme(),
 		PeerURL:    e.Config.Peer.Addr,
 		PeerURL:    e.Config.Peer.Addr,
 		ClientURL:  e.Config.Addr,
 		ClientURL:  e.Config.Addr,
+		DataDir:    e.Config.DataDir,
+	}
+	if e.StandbyServer, err = server.NewStandbyServer(ssConfig, client); err != nil {
+		log.Fatal("error new standby server:", err)
 	}
 	}
-	e.StandbyServer = server.NewStandbyServer(ssConfig, client)
 
 
 	// Generating config could be slow.
 	// Generating config could be slow.
 	// Put it here to make listen happen immediately after peer-server starting.
 	// Put it here to make listen happen immediately after peer-server starting.
 	peerTLSConfig := server.TLSServerConfig(e.Config.PeerTLSInfo())
 	peerTLSConfig := server.TLSServerConfig(e.Config.PeerTLSInfo())
 	etcdTLSConfig := server.TLSServerConfig(e.Config.EtcdTLSInfo())
 	etcdTLSConfig := server.TLSServerConfig(e.Config.EtcdTLSInfo())
 
 
-	startPeerServer, possiblePeers, err := e.PeerServer.FindCluster(e.Config.Discovery, e.Config.Peers)
-	if err != nil {
-		log.Fatal(err)
-	}
-	if startPeerServer {
-		e.setMode(PeerMode)
+	if !e.StandbyServer.ClusterRecorded() {
+		startPeerServer, possiblePeers, err := e.PeerServer.FindCluster(e.Config.Discovery, e.Config.Peers)
+		if err != nil {
+			log.Fatal(err)
+		}
+		if startPeerServer {
+			e.setMode(PeerMode)
+		} else {
+			e.StandbyServer.SyncCluster(possiblePeers)
+			e.setMode(StandbyMode)
+		}
 	} else {
 	} else {
-		e.StandbyServer.SyncCluster(possiblePeers)
 		e.setMode(StandbyMode)
 		e.setMode(StandbyMode)
 	}
 	}
 
 

+ 89 - 8
server/standby_server.go

@@ -1,9 +1,12 @@
 package server
 package server
 
 
 import (
 import (
+	"encoding/json"
 	"fmt"
 	"fmt"
 	"net/http"
 	"net/http"
 	"net/url"
 	"net/url"
+	"os"
+	"path/filepath"
 	"sync"
 	"sync"
 	"time"
 	"time"
 
 
@@ -15,11 +18,14 @@ import (
 	"github.com/coreos/etcd/store"
 	"github.com/coreos/etcd/store"
 )
 )
 
 
+const clusterInfoName = "cluster_info"
+
 type StandbyServerConfig struct {
 type StandbyServerConfig struct {
 	Name       string
 	Name       string
 	PeerScheme string
 	PeerScheme string
 	PeerURL    string
 	PeerURL    string
 	ClientURL  string
 	ClientURL  string
+	DataDir    string
 }
 }
 
 
 type StandbyServer struct {
 type StandbyServer struct {
@@ -30,6 +36,9 @@ type StandbyServer struct {
 	syncInterval float64
 	syncInterval float64
 	joinIndex    uint64
 	joinIndex    uint64
 
 
+	file     *os.File
+	recorded bool
+
 	removeNotify chan bool
 	removeNotify chan bool
 	started      bool
 	started      bool
 	closeChan    chan bool
 	closeChan    chan bool
@@ -38,12 +47,19 @@ type StandbyServer struct {
 	sync.Mutex
 	sync.Mutex
 }
 }
 
 
-func NewStandbyServer(config StandbyServerConfig, client *Client) *StandbyServer {
-	return &StandbyServer{
+func NewStandbyServer(config StandbyServerConfig, client *Client) (*StandbyServer, error) {
+	s := &StandbyServer{
 		Config:       config,
 		Config:       config,
 		client:       client,
 		client:       client,
 		syncInterval: DefaultSyncInterval,
 		syncInterval: DefaultSyncInterval,
 	}
 	}
+	if err := s.openClusterInfo(); err != nil {
+		return nil, fmt.Errorf("error open/create cluster info file: %v", err)
+	}
+	if clusterInfo, err := s.loadClusterInfo(); err == nil {
+		s.setCluster(clusterInfo)
+	}
+	return s, nil
 }
 }
 
 
 func (s *StandbyServer) Start() {
 func (s *StandbyServer) Start() {
@@ -75,6 +91,10 @@ func (s *StandbyServer) Stop() {
 
 
 	close(s.closeChan)
 	close(s.closeChan)
 	s.routineGroup.Wait()
 	s.routineGroup.Wait()
+
+	if err := s.clearClusterInfo(); err != nil {
+		log.Warnf("error clearing cluster info for standby")
+	}
 }
 }
 
 
 // RemoveNotify notifies the server is removed from standby mode and ready
 // RemoveNotify notifies the server is removed from standby mode and ready
@@ -87,6 +107,10 @@ func (s *StandbyServer) ClientHTTPHandler() http.Handler {
 	return http.HandlerFunc(s.redirectRequests)
 	return http.HandlerFunc(s.redirectRequests)
 }
 }
 
 
+func (s *StandbyServer) ClusterRecorded() bool {
+	return s.recorded
+}
+
 func (s *StandbyServer) Cluster() []string {
 func (s *StandbyServer) Cluster() []string {
 	peerURLs := make([]string, 0)
 	peerURLs := make([]string, 0)
 	for _, peer := range s.cluster {
 	for _, peer := range s.cluster {
@@ -145,14 +169,18 @@ func (s *StandbyServer) redirectRequests(w http.ResponseWriter, r *http.Request)
 }
 }
 
 
 func (s *StandbyServer) monitorCluster() {
 func (s *StandbyServer) monitorCluster() {
+	first := true
 	for {
 	for {
-		timer := time.NewTimer(time.Duration(int64(s.syncInterval * float64(time.Second))))
-		defer timer.Stop()
-		select {
-		case <-s.closeChan:
-			return
-		case <-timer.C:
+		if !first {
+			timer := time.NewTimer(time.Duration(int64(s.syncInterval * float64(time.Second))))
+			defer timer.Stop()
+			select {
+			case <-s.closeChan:
+				return
+			case <-timer.C:
+			}
 		}
 		}
+		first = false
 
 
 		if err := s.syncCluster(nil); err != nil {
 		if err := s.syncCluster(nil); err != nil {
 			log.Warnf("fail syncing cluster(%v): %v", s.Cluster(), err)
 			log.Warnf("fail syncing cluster(%v): %v", s.Cluster(), err)
@@ -198,6 +226,9 @@ func (s *StandbyServer) syncCluster(peerURLs []string) error {
 
 
 		s.setCluster(machines)
 		s.setCluster(machines)
 		s.SetSyncInterval(config.SyncInterval)
 		s.SetSyncInterval(config.SyncInterval)
+		if err := s.saveClusterInfo(); err != nil {
+			log.Warnf("fail saving cluster info into disk: %v", err)
+		}
 		return nil
 		return nil
 	}
 	}
 	return fmt.Errorf("unreachable cluster")
 	return fmt.Errorf("unreachable cluster")
@@ -252,3 +283,53 @@ func (s *StandbyServer) fullPeerURL(urlStr string) string {
 	u.Scheme = s.Config.PeerScheme
 	u.Scheme = s.Config.PeerScheme
 	return u.String()
 	return u.String()
 }
 }
+
+func (s *StandbyServer) openClusterInfo() error {
+	var err error
+	path := filepath.Join(s.Config.DataDir, clusterInfoName)
+	s.file, err = os.OpenFile(path, os.O_RDWR, 0600)
+	if err != nil {
+		if os.IsNotExist(err) {
+			s.file, err = os.OpenFile(path, os.O_WRONLY|os.O_CREATE, 0600)
+		}
+		return err
+	}
+	return nil
+}
+
+func (s *StandbyServer) loadClusterInfo() ([]*machineMessage, error) {
+	clusterInfo := make([]*machineMessage, 0)
+	if _, err := s.file.Seek(0, os.SEEK_SET); err != nil {
+		return nil, err
+	}
+	if err := json.NewDecoder(s.file).Decode(&clusterInfo); err != nil {
+		return nil, err
+	}
+	s.recorded = true
+	return clusterInfo, nil
+}
+
+func (s *StandbyServer) saveClusterInfo() error {
+	if err := s.clearClusterInfo(); err != nil {
+		return nil
+	}
+	if err := json.NewEncoder(s.file).Encode(s.cluster); err != nil {
+		return err
+	}
+	if err := s.file.Sync(); err != nil {
+		return err
+	}
+	s.recorded = true
+	return nil
+}
+
+func (s *StandbyServer) clearClusterInfo() error {
+	if _, err := s.file.Seek(0, os.SEEK_SET); err != nil {
+		return err
+	}
+	if err := s.file.Truncate(0); err != nil {
+		return err
+	}
+	s.recorded = false
+	return nil
+}

+ 11 - 11
tests/functional/multi_node_kill_all_and_recovery_test.go

@@ -167,7 +167,7 @@ func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) {
 	leaderChan := make(chan string, 1)
 	leaderChan := make(chan string, 1)
 	all := make(chan bool, 1)
 	all := make(chan bool, 1)
 
 
-	clusterSize := 5
+	clusterSize := 15
 	argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
 	argGroup, etcds, err := CreateCluster(clusterSize, procAttr, false)
 	defer DestroyCluster(etcds)
 	defer DestroyCluster(etcds)
 
 
@@ -184,8 +184,8 @@ func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) {
 
 
 	c.SyncCluster()
 	c.SyncCluster()
 
 
-	// Reconfigure with smaller active size (3 nodes) and wait for demotion.
-	resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3}`))
+	// Reconfigure with smaller active size (7 nodes) and wait for remove.
+	resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":7}`))
 	if !assert.Equal(t, resp.StatusCode, 200) {
 	if !assert.Equal(t, resp.StatusCode, 200) {
 		t.FailNow()
 		t.FailNow()
 	}
 	}
@@ -195,10 +195,10 @@ func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) {
 	// Verify that there is three machines in peer mode.
 	// Verify that there is three machines in peer mode.
 	result, err := c.Get("_etcd/machines", false, true)
 	result, err := c.Get("_etcd/machines", false, true)
 	assert.NoError(t, err)
 	assert.NoError(t, err)
-	assert.Equal(t, len(result.Node.Nodes), 3)
+	assert.Equal(t, len(result.Node.Nodes), 7)
 
 
-	// send 10 commands
-	for i := 0; i < 10; i++ {
+	// send set commands
+	for i := 0; i < 2*clusterSize; i++ {
 		// Test Set
 		// Test Set
 		_, err := c.Set("foo", "bar", 0)
 		_, err := c.Set("foo", "bar", 0)
 		if err != nil {
 		if err != nil {
@@ -220,13 +220,13 @@ func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) {
 	time.Sleep(time.Second)
 	time.Sleep(time.Second)
 
 
 	for i := 0; i < clusterSize; i++ {
 	for i := 0; i < clusterSize; i++ {
-		etcds[i], err = os.StartProcess(EtcdBinPath, argGroup[i], procAttr)
+		etcds[i], err = os.StartProcess(EtcdBinPath, append(argGroup[i], "-peers="), procAttr)
 	}
 	}
 
 
 	time.Sleep(2 * time.Second)
 	time.Sleep(2 * time.Second)
 
 
-	// send 10 commands
-	for i := 0; i < 10; i++ {
+	// send set commands
+	for i := 0; i < 2*clusterSize; i++ {
 		// Test Set
 		// Test Set
 		_, err := c.Set("foo", "bar", 0)
 		_, err := c.Set("foo", "bar", 0)
 		if err != nil {
 		if err != nil {
@@ -234,8 +234,8 @@ func TestMultiNodeKillAllAndRecoveryWithStandbys(t *testing.T) {
 		}
 		}
 	}
 	}
 
 
-	// Verify that we have three machines.
+	// Verify that we have seven machines.
 	result, err = c.Get("_etcd/machines", false, true)
 	result, err = c.Get("_etcd/machines", false, true)
 	assert.NoError(t, err)
 	assert.NoError(t, err)
-	assert.Equal(t, len(result.Node.Nodes), 3)
+	assert.Equal(t, len(result.Node.Nodes), 7)
 }
 }

+ 29 - 9
tests/functional/remove_node_test.go

@@ -19,7 +19,7 @@ func TestRemoveNode(t *testing.T) {
 	procAttr := new(os.ProcAttr)
 	procAttr := new(os.ProcAttr)
 	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
 	procAttr.Files = []*os.File{nil, os.Stdout, os.Stderr}
 
 
-	clusterSize := 3
+	clusterSize := 4
 	argGroup, etcds, _ := CreateCluster(clusterSize, procAttr, false)
 	argGroup, etcds, _ := CreateCluster(clusterSize, procAttr, false)
 	defer DestroyCluster(etcds)
 	defer DestroyCluster(etcds)
 
 
@@ -29,7 +29,7 @@ func TestRemoveNode(t *testing.T) {
 
 
 	c.SyncCluster()
 	c.SyncCluster()
 
 
-	resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"syncInterval":1}`))
+	resp, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4, "syncInterval":1}`))
 	if !assert.Equal(t, resp.StatusCode, 200) {
 	if !assert.Equal(t, resp.StatusCode, 200) {
 		t.FailNow()
 		t.FailNow()
 	}
 	}
@@ -39,6 +39,11 @@ func TestRemoveNode(t *testing.T) {
 	client := &http.Client{}
 	client := &http.Client{}
 	for i := 0; i < 2; i++ {
 	for i := 0; i < 2; i++ {
 		for i := 0; i < 2; i++ {
 		for i := 0; i < 2; i++ {
+			r, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3}`))
+			if !assert.Equal(t, r.StatusCode, 200) {
+				t.FailNow()
+			}
+
 			client.Do(rmReq)
 			client.Do(rmReq)
 
 
 			fmt.Println("send remove to node3 and wait for its exiting")
 			fmt.Println("send remove to node3 and wait for its exiting")
@@ -50,7 +55,7 @@ func TestRemoveNode(t *testing.T) {
 				panic(err)
 				panic(err)
 			}
 			}
 
 
-			if len(resp.Node.Nodes) != 2 {
+			if len(resp.Node.Nodes) != 3 {
 				t.Fatal("cannot remove peer")
 				t.Fatal("cannot remove peer")
 			}
 			}
 
 
@@ -69,6 +74,11 @@ func TestRemoveNode(t *testing.T) {
 				panic(err)
 				panic(err)
 			}
 			}
 
 
+			r, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4}`))
+			if !assert.Equal(t, r.StatusCode, 200) {
+				t.FailNow()
+			}
+
 			time.Sleep(time.Second + time.Second)
 			time.Sleep(time.Second + time.Second)
 
 
 			resp, err = c.Get("_etcd/machines", false, false)
 			resp, err = c.Get("_etcd/machines", false, false)
@@ -77,13 +87,18 @@ func TestRemoveNode(t *testing.T) {
 				panic(err)
 				panic(err)
 			}
 			}
 
 
-			if len(resp.Node.Nodes) != 3 {
-				t.Fatalf("add peer fails #1 (%d != 3)", len(resp.Node.Nodes))
+			if len(resp.Node.Nodes) != 4 {
+				t.Fatalf("add peer fails #1 (%d != 4)", len(resp.Node.Nodes))
 			}
 			}
 		}
 		}
 
 
 		// first kill the node, then remove it, then add it back
 		// first kill the node, then remove it, then add it back
 		for i := 0; i < 2; i++ {
 		for i := 0; i < 2; i++ {
+			r, _ := tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":3}`))
+			if !assert.Equal(t, r.StatusCode, 200) {
+				t.FailNow()
+			}
+
 			etcds[2].Kill()
 			etcds[2].Kill()
 			fmt.Println("kill node3 and wait for its exiting")
 			fmt.Println("kill node3 and wait for its exiting")
 			etcds[2].Wait()
 			etcds[2].Wait()
@@ -96,7 +111,7 @@ func TestRemoveNode(t *testing.T) {
 				panic(err)
 				panic(err)
 			}
 			}
 
 
-			if len(resp.Node.Nodes) != 2 {
+			if len(resp.Node.Nodes) != 3 {
 				t.Fatal("cannot remove peer")
 				t.Fatal("cannot remove peer")
 			}
 			}
 
 
@@ -112,7 +127,12 @@ func TestRemoveNode(t *testing.T) {
 				panic(err)
 				panic(err)
 			}
 			}
 
 
-			time.Sleep(time.Second)
+			r, _ = tests.Put("http://localhost:7001/v2/admin/config", "application/json", bytes.NewBufferString(`{"activeSize":4}`))
+			if !assert.Equal(t, r.StatusCode, 200) {
+				t.FailNow()
+			}
+
+			time.Sleep(time.Second + time.Second)
 
 
 			resp, err = c.Get("_etcd/machines", false, false)
 			resp, err = c.Get("_etcd/machines", false, false)
 
 
@@ -120,8 +140,8 @@ func TestRemoveNode(t *testing.T) {
 				panic(err)
 				panic(err)
 			}
 			}
 
 
-			if len(resp.Node.Nodes) != 3 {
-				t.Fatalf("add peer fails #2 (%d != 3)", len(resp.Node.Nodes))
+			if len(resp.Node.Nodes) != 4 {
+				t.Fatalf("add peer fails #2 (%d != 4)", len(resp.Node.Nodes))
 			}
 			}
 		}
 		}
 	}
 	}

+ 2 - 2
tests/functional/util.go

@@ -169,7 +169,7 @@ func DestroyCluster(etcds []*os.Process) error {
 //
 //
 func Monitor(size int, allowDeadNum int, leaderChan chan string, all chan bool, stop chan bool) {
 func Monitor(size int, allowDeadNum int, leaderChan chan string, all chan bool, stop chan bool) {
 	leaderMap := make(map[int]string)
 	leaderMap := make(map[int]string)
-	baseAddrFormat := "http://0.0.0.0:400%d"
+	baseAddrFormat := "http://0.0.0.0:%d"
 
 
 	for {
 	for {
 		knownLeader := "unknown"
 		knownLeader := "unknown"
@@ -177,7 +177,7 @@ func Monitor(size int, allowDeadNum int, leaderChan chan string, all chan bool,
 		var i int
 		var i int
 
 
 		for i = 0; i < size; i++ {
 		for i = 0; i < size; i++ {
-			leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+1))
+			leader, err := getLeader(fmt.Sprintf(baseAddrFormat, i+4001))
 
 
 			if err == nil {
 			if err == nil {
 				leaderMap[i] = leader
 				leaderMap[i] = leader