Browse Source

functional/tester: implement fetchSnapshotCaseQuorum

Signed-off-by: Gyuho Lee <gyuhox@gmail.com>
Gyuho Lee 7 years ago
parent
commit
888b55e91b
1 changed files with 90 additions and 18 deletions
  1. 90 18
      functional/tester/case_sigquit_remove_quorum.go

+ 90 - 18
functional/tester/case_sigquit_remove_quorum.go

@@ -16,6 +16,8 @@ package tester
 
 import (
 	"context"
+	"fmt"
+	"strings"
 	"time"
 
 	"github.com/coreos/etcd/clientv3"
@@ -32,22 +34,21 @@ type fetchSnapshotCaseQuorum struct {
 }
 
 func (c *fetchSnapshotCaseQuorum) Inject(clus *Cluster) error {
-	//  1. Assume node C is the current leader with most up-to-date data.
+	// 1. Assume node C is the current leader with most up-to-date data.
 	lead, err := clus.GetLeader()
 	if err != nil {
 		return err
 	}
 	c.snapshotted = lead
 
-	//  2. Download snapshot from node C, before destroying node A and B.
+	// 2. Download snapshot from node C, before destroying node A and B.
 	clus.lg.Info(
 		"install snapshot on leader node START",
 		zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
-		zap.Error(err),
 	)
 	var resp *rpcpb.Response
 	if resp == nil || err != nil {
-		resp, err = clus.sendOpWithResp(lead, rpcpb.Operation_FETCH_SNAPSHOT)
+		resp, err = clus.sendOpWithResp(lead, rpcpb.Operation_SAVE_SNAPSHOT)
 		clus.lg.Info(
 			"install snapshot on leader node END",
 			zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
@@ -55,7 +56,7 @@ func (c *fetchSnapshotCaseQuorum) Inject(clus *Cluster) error {
 		)
 		return err
 	}
-	resp, err = clus.sendOpWithResp(lead, rpcpb.Operation_FETCH_SNAPSHOT)
+	resp, err = clus.sendOpWithResp(lead, rpcpb.Operation_SAVE_SNAPSHOT)
 	clus.lg.Info(
 		"install snapshot on leader node END",
 		zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
@@ -99,7 +100,7 @@ func (c *fetchSnapshotCaseQuorum) Inject(clus *Cluster) error {
 	// after some time since last snapshot save
 	time.Sleep(time.Second)
 
-	//  3. Destroy node A and B, and make the whole cluster inoperable.
+	// 3. Destroy node A and B, and make the whole cluster inoperable.
 	for {
 		c.injected = pickQuorum(len(clus.Members))
 		if _, ok := c.injected[lead]; !ok {
@@ -110,7 +111,6 @@ func (c *fetchSnapshotCaseQuorum) Inject(clus *Cluster) error {
 		clus.lg.Info(
 			"disastrous machine failure to quorum START",
 			zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
-			zap.Error(err),
 		)
 		err = clus.sendOp(idx, rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA)
 		clus.lg.Info(
@@ -123,12 +123,11 @@ func (c *fetchSnapshotCaseQuorum) Inject(clus *Cluster) error {
 		}
 	}
 
-	//  4. Now node C cannot operate either.
-	//  5. SIGTERM node C and remove its data directories.
+	// 4. Now node C cannot operate either.
+	// 5. SIGTERM node C and remove its data directories.
 	clus.lg.Info(
 		"disastrous machine failure to old leader START",
 		zap.String("target-endpoint", clus.Members[lead].EtcdClientEndpoint),
-		zap.Error(err),
 	)
 	err = clus.sendOp(lead, rpcpb.Operation_SIGQUIT_ETCD_AND_REMOVE_DATA)
 	clus.lg.Info(
@@ -140,17 +139,90 @@ func (c *fetchSnapshotCaseQuorum) Inject(clus *Cluster) error {
 }
 
 func (c *fetchSnapshotCaseQuorum) Recover(clus *Cluster) error {
-	//  6. Restore a new seed member from node C's latest snapshot file.
+	// 6. Restore a new seed member from node C's latest snapshot file.
+	oldlead := c.snapshotted
+
+	// configuration on restart from recovered snapshot
+	// seed member's configuration is all the same as previous one
+	// except initial cluster string is now a single-node cluster
+	clus.Members[oldlead].EtcdOnSnapshotRestore = clus.Members[oldlead].Etcd
+	clus.Members[oldlead].EtcdOnSnapshotRestore.InitialClusterState = "existing"
+	name := clus.Members[oldlead].Etcd.Name
+	initClus := []string{}
+	for _, u := range clus.Members[oldlead].Etcd.AdvertisePeerURLs {
+		initClus = append(initClus, fmt.Sprintf("%s=%s", name, u))
+	}
+	clus.Members[oldlead].EtcdOnSnapshotRestore.InitialCluster = strings.Join(initClus, ",")
+
+	clus.lg.Info(
+		"restore snapshot and restart from snapshot request START",
+		zap.String("target-endpoint", clus.Members[oldlead].EtcdClientEndpoint),
+	)
+	err := clus.sendOp(oldlead, rpcpb.Operation_RESTORE_RESTART_FROM_SNAPSHOT)
+	clus.lg.Info(
+		"restore snapshot and restart from snapshot request END",
+		zap.String("target-endpoint", clus.Members[oldlead].EtcdClientEndpoint),
+		zap.Error(err),
+	)
+	if err != nil {
+		return err
+	}
+
+	leaderc, err := clus.Members[oldlead].CreateEtcdClient()
+	if err != nil {
+		return err
+	}
+	defer leaderc.Close()
+
+	// 7. Add another member to establish 2-node cluster.
+	// 8. Add another member to establish 3-node cluster.
+	// 9. Add more if any.
+	for idx := range c.injected {
+		clus.lg.Info(
+			"member add request START",
+			zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
+			zap.Strings("peer-urls", clus.Members[idx].Etcd.AdvertisePeerURLs),
+		)
+		ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
+		_, err := leaderc.MemberAdd(ctx, clus.Members[idx].Etcd.AdvertisePeerURLs)
+		cancel()
+		clus.lg.Info(
+			"member add request END",
+			zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
+			zap.Strings("peer-urls", clus.Members[idx].Etcd.AdvertisePeerURLs),
+			zap.Error(err),
+		)
+		if err != nil {
+			return err
+		}
 
-	//  7. Add another member to establish 2-node cluster.
+		// wait until membership reconfiguration entry gets applied
+		// TODO: test concurrent member add
+		time.Sleep(3 * time.Second)
 
-	//  8. Add another member to establish 3-node cluster.
+		// start the added(new) member with fresh data
+		clus.Members[idx].EtcdOnSnapshotRestore = clus.Members[idx].Etcd
+		clus.Members[idx].EtcdOnSnapshotRestore.InitialClusterState = "existing"
+		name := clus.Members[idx].Etcd.Name
+		for _, u := range clus.Members[idx].Etcd.AdvertisePeerURLs {
+			initClus = append(initClus, fmt.Sprintf("%s=%s", name, u))
+		}
+		clus.Members[idx].EtcdOnSnapshotRestore.InitialCluster = strings.Join(initClus, ",")
+		clus.lg.Info(
+			"restart from snapshot request START",
+			zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
+		)
+		err = clus.sendOp(idx, rpcpb.Operation_RESTART_FROM_SNAPSHOT)
+		clus.lg.Info(
+			"restart from snapshot request END",
+			zap.String("target-endpoint", clus.Members[idx].EtcdClientEndpoint),
+			zap.Error(err),
+		)
+		if err != nil {
+			return err
+		}
+	}
 
-	// for idx := range c.injected {
-	// 	if err := c.recoverMember(clus, idx); err != nil {
-	// 		return err
-	// 	}
-	// }
 	return nil
 }