Pārlūkot izejas kodu

Merge pull request #8479 from heyitsanthony/ctlv2-backup-v3

ctlv2: backup --with-v3
Anthony Romano 8 gadi atpakaļ
vecāks
revīzija
9a84c84ea6

+ 44 - 6
e2e/ctl_v2_test.go

@@ -226,7 +226,13 @@ func TestCtlV2RoleList(t *testing.T) {
 	}
 }
 
-func TestCtlV2Backup(t *testing.T) { // For https://github.com/coreos/etcd/issues/5360
+func TestCtlV2Backup(t *testing.T)         { testCtlV2Backup(t, 0, false) }
+func TestCtlV2BackupSnapshot(t *testing.T) { testCtlV2Backup(t, 1, false) }
+
+func TestCtlV2BackupV3(t *testing.T)         { testCtlV2Backup(t, 0, true) }
+func TestCtlV2BackupV3Snapshot(t *testing.T) { testCtlV2Backup(t, 1, true) }
+
+func testCtlV2Backup(t *testing.T, snapCount int, v3 bool) {
 	defer testutil.AfterTest(t)
 
 	backupDir, err := ioutil.TempDir("", "testbackup0.etcd")
@@ -235,15 +241,29 @@ func TestCtlV2Backup(t *testing.T) { // For https://github.com/coreos/etcd/issue
 	}
 	defer os.RemoveAll(backupDir)
 
-	epc1 := setupEtcdctlTest(t, &configNoTLS, false)
-	if err := etcdctlSet(epc1, "foo1", "bar"); err != nil {
+	etcdCfg := configNoTLS
+	etcdCfg.snapCount = snapCount
+	epc1 := setupEtcdctlTest(t, &etcdCfg, false)
+
+	// v3 put before v2 set so snapshot happens after v3 operations to confirm
+	// v3 data is preserved after snapshot.
+	if err := ctlV3Put(ctlCtx{t: t, epc: epc1}, "v3key", "123", ""); err != nil {
 		t.Fatal(err)
 	}
 
-	if err := etcdctlBackup(epc1, epc1.procs[0].Config().dataDirPath, backupDir); err != nil {
+	if err := etcdctlSet(epc1, "foo1", "bar"); err != nil {
 		t.Fatal(err)
 	}
 
+	if v3 {
+		// v3 must lock the db to backup, so stop process
+		if err := epc1.Stop(); err != nil {
+			t.Fatal(err)
+		}
+	}
+	if err := etcdctlBackup(epc1, epc1.procs[0].Config().dataDirPath, backupDir, v3); err != nil {
+		t.Fatal(err)
+	}
 	if err := epc1.Close(); err != nil {
 		t.Fatalf("error closing etcd processes (%v)", err)
 	}
@@ -260,6 +280,17 @@ func TestCtlV2Backup(t *testing.T) { // For https://github.com/coreos/etcd/issue
 		t.Fatal(err)
 	}
 
+	ctx2 := ctlCtx{t: t, epc: epc2}
+	if v3 {
+		if err := ctlV3Get(ctx2, []string{"v3key"}, kv{"v3key", "123"}); err != nil {
+			t.Fatal(err)
+		}
+	} else {
+		if err := ctlV3Get(ctx2, []string{"v3key"}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
 	// check if it can serve client requests
 	if err := etcdctlSet(epc2, "foo2", "bar"); err != nil {
 		t.Fatal(err)
@@ -451,9 +482,16 @@ func etcdctlAuthEnable(clus *etcdProcessCluster) error {
 	return spawnWithExpect(cmdArgs, "Authentication Enabled")
 }
 
-func etcdctlBackup(clus *etcdProcessCluster, dataDir, backupDir string) error {
+func etcdctlBackup(clus *etcdProcessCluster, dataDir, backupDir string, v3 bool) error {
 	cmdArgs := append(etcdctlPrefixArgs(clus), "backup", "--data-dir", dataDir, "--backup-dir", backupDir)
-	return spawnWithExpects(cmdArgs)
+	if v3 {
+		cmdArgs = append(cmdArgs, "--with-v3")
+	}
+	proc, err := spawnCmd(cmdArgs)
+	if err != nil {
+		return err
+	}
+	return proc.Close()
 }
 
 func mustEtcdctl(t *testing.T) {

+ 1 - 3
e2e/ctl_v3_test.go

@@ -123,7 +123,6 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
 	}
 	ret.applyOpts(opts)
 
-	os.Setenv("ETCDCTL_API", "3")
 	mustEtcdctl(t)
 	if !ret.quorum {
 		ret.cfg = *configStandalone(ret.cfg)
@@ -140,7 +139,6 @@ func testCtl(t *testing.T, testFunc func(ctlCtx), opts ...ctlOption) {
 	ret.epc = epc
 
 	defer func() {
-		os.Unsetenv("ETCDCTL_API")
 		if ret.envMap != nil {
 			for k := range ret.envMap {
 				os.Unsetenv(k)
@@ -192,7 +190,7 @@ func (cx *ctlCtx) prefixArgs(eps []string) []string {
 
 	useEnv := cx.envMap != nil
 
-	cmdArgs := []string{ctlBinPath}
+	cmdArgs := []string{ctlBinPath + "3"}
 	for k, v := range fmap {
 		if useEnv {
 			ek := flags.FlagToEnv("ETCDCTL", k)

+ 5 - 1
e2e/etcd_process.go

@@ -23,7 +23,11 @@ import (
 	"github.com/coreos/etcd/pkg/fileutil"
 )
 
-var etcdServerReadyLines = []string{"enabled capabilities for version", "published"}
+var (
+	etcdServerReadyLines = []string{"enabled capabilities for version", "published"}
+	binPath              string
+	ctlBinPath           string
+)
 
 // etcdProcess is a process that serves etcd requests.
 type etcdProcess interface {

+ 11 - 8
e2e/etcd_spawn_cov.go

@@ -35,21 +35,24 @@ func spawnCmd(args []string) (*expect.ExpectProcess, error) {
 	if args[0] == binPath {
 		return spawnEtcd(args)
 	}
+	if args[0] == ctlBinPath || args[0] == ctlBinPath+"3" {
+		// avoid test flag conflicts in coverage enabled etcdctl by putting flags in ETCDCTL_ARGS
+		env := []string{
+			// was \xff, but that's used for testing boundary conditions; 0xe7cd should be safe
+			"ETCDCTL_ARGS=" + strings.Join(args, "\xe7\xcd"),
+		}
+		if args[0] == ctlBinPath+"3" {
+			env = append(env, "ETCDCTL_API=3")
+		}
 
-	if args[0] == ctlBinPath {
 		covArgs, err := getCovArgs()
 		if err != nil {
 			return nil, err
 		}
-		// avoid test flag conflicts in coverage enabled etcdctl by putting flags in ETCDCTL_ARGS
-		ctl_cov_env := []string{
-			// was \xff, but that's used for testing boundary conditions; 0xe7cd should be safe
-			"ETCDCTL_ARGS=" + strings.Join(args, "\xe7\xcd"),
-		}
 		// when withFlagByEnv() is used in testCtl(), env variables for ctl is set to os.env.
 		// they must be included in ctl_cov_env.
-		ctl_cov_env = append(ctl_cov_env, os.Environ()...)
-		ep, err := expect.NewExpectWithEnv(binDir+"/etcdctl_test", covArgs, ctl_cov_env)
+		env = append(env, os.Environ()...)
+		ep, err := expect.NewExpectWithEnv(binDir+"/etcdctl_test", covArgs, env)
 		if err != nil {
 			return nil, err
 		}

+ 9 - 1
e2e/etcd_spawn_nocov.go

@@ -16,10 +16,18 @@
 
 package e2e
 
-import "github.com/coreos/etcd/pkg/expect"
+import (
+	"os"
+
+	"github.com/coreos/etcd/pkg/expect"
+)
 
 const noOutputLineCount = 0 // regular binaries emit no extra lines
 
 func spawnCmd(args []string) (*expect.ExpectProcess, error) {
+	if args[0] == ctlBinPath+"3" {
+		env := append(os.Environ(), "ETCDCTL_API=3")
+		return expect.NewExpectWithEnv(ctlBinPath, args[1:], env)
+	}
 	return expect.NewExpect(args[0], args[1:]...)
 }

+ 0 - 2
e2e/main_test.go

@@ -17,8 +17,6 @@ var (
 	binDir  string
 	certDir string
 
-	binPath        string
-	ctlBinPath     string
 	certPath       string
 	privateKeyPath string
 	caPath         string

+ 151 - 13
etcdctl/ctlv2/command/backup_command.go

@@ -15,18 +15,25 @@
 package command
 
 import (
-	"fmt"
+	"encoding/binary"
 	"log"
+	"os"
+	"path"
 	"path/filepath"
+	"regexp"
 	"time"
 
 	"github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/etcdserver/membership"
 	"github.com/coreos/etcd/pkg/fileutil"
 	"github.com/coreos/etcd/pkg/idutil"
 	"github.com/coreos/etcd/pkg/pbutil"
+	"github.com/coreos/etcd/raft/raftpb"
 	"github.com/coreos/etcd/snap"
 	"github.com/coreos/etcd/wal"
 	"github.com/coreos/etcd/wal/walpb"
+
+	bolt "github.com/coreos/bbolt"
 	"github.com/urfave/cli"
 )
 
@@ -40,6 +47,7 @@ func NewBackupCommand() cli.Command {
 			cli.StringFlag{Name: "wal-dir", Value: "", Usage: "Path to the etcd wal dir"},
 			cli.StringFlag{Name: "backup-dir", Value: "", Usage: "Path to the backup dir"},
 			cli.StringFlag{Name: "backup-wal-dir", Value: "", Usage: "Path to the backup wal dir"},
+			cli.BoolFlag{Name: "with-v3", Usage: "Backup v3 backend data"},
 		},
 		Action: handleBackup,
 	}
@@ -50,6 +58,7 @@ func handleBackup(c *cli.Context) error {
 	var srcWAL string
 	var destWAL string
 
+	withV3 := c.Bool("with-v3")
 	srcSnap := filepath.Join(c.String("data-dir"), "member", "snap")
 	destSnap := filepath.Join(c.String("backup-dir"), "member", "snap")
 
@@ -68,12 +77,36 @@ func handleBackup(c *cli.Context) error {
 	if err := fileutil.CreateDirAll(destSnap); err != nil {
 		log.Fatalf("failed creating backup snapshot dir %v: %v", destSnap, err)
 	}
+
+	walsnap := saveSnap(destSnap, srcSnap)
+	metadata, state, ents := loadWAL(srcWAL, walsnap, withV3)
+	saveDB(filepath.Join(destSnap, "db"), filepath.Join(srcSnap, "db"), state.Commit, withV3)
+
+	idgen := idutil.NewGenerator(0, time.Now())
+	metadata.NodeID = idgen.Next()
+	metadata.ClusterID = idgen.Next()
+
+	neww, err := wal.Create(destWAL, pbutil.MustMarshal(&metadata))
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer neww.Close()
+	if err := neww.Save(state, ents); err != nil {
+		log.Fatal(err)
+	}
+	if err := neww.SaveSnapshot(walsnap); err != nil {
+		log.Fatal(err)
+	}
+
+	return nil
+}
+
+func saveSnap(destSnap, srcSnap string) (walsnap walpb.Snapshot) {
 	ss := snap.New(srcSnap)
 	snapshot, err := ss.Load()
 	if err != nil && err != snap.ErrNoSnapshot {
 		log.Fatal(err)
 	}
-	var walsnap walpb.Snapshot
 	if snapshot != nil {
 		walsnap.Index, walsnap.Term = snapshot.Metadata.Index, snapshot.Metadata.Term
 		newss := snap.New(destSnap)
@@ -81,7 +114,10 @@ func handleBackup(c *cli.Context) error {
 			log.Fatal(err)
 		}
 	}
+	return walsnap
+}
 
+func loadWAL(srcWAL string, walsnap walpb.Snapshot, v3 bool) (etcdserverpb.Metadata, raftpb.HardState, []raftpb.Entry) {
 	w, err := wal.OpenForRead(srcWAL, walsnap)
 	if err != nil {
 		log.Fatal(err)
@@ -91,28 +127,130 @@ func handleBackup(c *cli.Context) error {
 	switch err {
 	case nil:
 	case wal.ErrSnapshotNotFound:
-		fmt.Printf("Failed to find the match snapshot record %+v in wal %v.", walsnap, srcWAL)
-		fmt.Printf("etcdctl will add it back. Start auto fixing...")
+		log.Printf("Failed to find the match snapshot record %+v in wal %v.", walsnap, srcWAL)
+		log.Printf("etcdctl will add it back. Start auto fixing...")
 	default:
 		log.Fatal(err)
 	}
+
+	re := path.Join(membership.StoreMembersPrefix, "[[:xdigit:]]{1,16}", "attributes")
+	memberAttrRE := regexp.MustCompile(re)
+
+	removed := uint64(0)
+	i := 0
+	remove := func() {
+		ents = append(ents[:i], ents[i+1:]...)
+		removed++
+		i--
+	}
+	for i = 0; i < len(ents); i++ {
+		ents[i].Index -= removed
+		if ents[i].Type == raftpb.EntryConfChange {
+			log.Println("ignoring EntryConfChange raft entry")
+			remove()
+			continue
+		}
+
+		var raftReq etcdserverpb.InternalRaftRequest
+		var v2Req *etcdserverpb.Request
+		if pbutil.MaybeUnmarshal(&raftReq, ents[i].Data) {
+			v2Req = raftReq.V2
+		} else {
+			v2Req = &etcdserverpb.Request{}
+			pbutil.MustUnmarshal(v2Req, ents[i].Data)
+		}
+
+		if v2Req != nil && v2Req.Method == "PUT" && memberAttrRE.MatchString(v2Req.Path) {
+			log.Println("ignoring member attribute update on", v2Req.Path)
+			remove()
+			continue
+		}
+
+		if v2Req != nil {
+			continue
+		}
+
+		if v3 || raftReq.Header == nil {
+			continue
+		}
+		log.Println("ignoring v3 raft entry")
+		remove()
+	}
+	state.Commit -= removed
 	var metadata etcdserverpb.Metadata
 	pbutil.MustUnmarshal(&metadata, wmetadata)
-	idgen := idutil.NewGenerator(0, time.Now())
-	metadata.NodeID = idgen.Next()
-	metadata.ClusterID = idgen.Next()
+	return metadata, state, ents
+}
 
-	neww, err := wal.Create(destWAL, pbutil.MustMarshal(&metadata))
+// saveDB copies the v3 backend and strips cluster information.
+func saveDB(destDB, srcDB string, idx uint64, v3 bool) {
+	// open src db to safely copy db state
+	if v3 {
+		var src *bolt.DB
+		ch := make(chan *bolt.DB, 1)
+		go func() {
+			src, err := bolt.Open(srcDB, 0444, &bolt.Options{ReadOnly: true})
+			if err != nil {
+				log.Fatal(err)
+			}
+			ch <- src
+		}()
+		select {
+		case src = <-ch:
+		case <-time.After(time.Second):
+			log.Println("waiting to acquire lock on", srcDB)
+			src = <-ch
+		}
+		defer src.Close()
+
+		tx, err := src.Begin(false)
+		if err != nil {
+			log.Fatal(err)
+		}
+
+		// copy srcDB to destDB
+		dest, err := os.Create(destDB)
+		if err != nil {
+			log.Fatal(err)
+		}
+		if _, err := tx.WriteTo(dest); err != nil {
+			log.Fatal(err)
+		}
+		dest.Close()
+		if err := tx.Rollback(); err != nil {
+			log.Fatal(err)
+		}
+	}
+
+	db, err := bolt.Open(destDB, 0644, &bolt.Options{})
 	if err != nil {
 		log.Fatal(err)
 	}
-	defer neww.Close()
-	if err := neww.Save(state, ents); err != nil {
+	tx, err := db.Begin(true)
+	if err != nil {
 		log.Fatal(err)
 	}
-	if err := neww.SaveSnapshot(walsnap); err != nil {
-		log.Fatal(err)
+
+	// remove membership information; should be clobbered by --force-new-cluster
+	for _, bucket := range []string{"members", "members_removed", "cluster"} {
+		tx.DeleteBucket([]byte(bucket))
 	}
 
-	return nil
+	// update consistent index to match hard state
+	if !v3 {
+		idxBytes := make([]byte, 8)
+		binary.BigEndian.PutUint64(idxBytes, idx)
+		b, err := tx.CreateBucketIfNotExists([]byte("meta"))
+		if err != nil {
+			log.Fatal(err)
+		}
+		b.Put([]byte("consistent_index"), idxBytes)
+	}
+
+	if err := tx.Commit(); err != nil {
+		log.Fatal(err)
+	}
+	if err := db.Close(); err != nil {
+		log.Fatal(err)
+	}
 }