Explorar o código

etcdctl: use snapshot RPC in snapshot command

Anthony Romano %!s(int64=9) %!d(string=hai) anos
pai
achega
e838c26f8a
Modificáronse 2 ficheiros con 212 adicións e 66 borrados
  1. 206 65
      etcdctl/ctlv3/command/snapshot_command.go
  2. 6 1
      storage/backend/batch_tx.go

+ 206 - 65
etcdctl/ctlv3/command/snapshot_command.go

@@ -15,56 +15,81 @@
 package command
 
 import (
+	"encoding/json"
 	"fmt"
 	"io"
 	"os"
+	"path"
+	"strings"
 
-	"github.com/coreos/etcd/clientv3"
-	"github.com/coreos/etcd/clientv3/mirror"
-	"github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
+	"github.com/coreos/etcd/etcdserver"
+	"github.com/coreos/etcd/etcdserver/etcdserverpb"
+	"github.com/coreos/etcd/etcdserver/membership"
+	"github.com/coreos/etcd/pkg/types"
+	"github.com/coreos/etcd/raft"
+	"github.com/coreos/etcd/raft/raftpb"
+	"github.com/coreos/etcd/storage"
+	"github.com/coreos/etcd/storage/backend"
+	"github.com/coreos/etcd/wal"
 	"github.com/spf13/cobra"
 	"golang.org/x/net/context"
 )
 
+const (
+	defaultName                     = "default"
+	defaultInitialAdvertisePeerURLs = "http://localhost:2380,http://localhost:7001"
+)
+
+var (
+	restoreCluster      string
+	restoreClusterToken string
+	restoreDataDir      string
+	restorePeerURLs     string
+	restoreName         string
+)
+
 // NewSnapshotCommand returns the cobra command for "snapshot".
 func NewSnapshotCommand() *cobra.Command {
-	return &cobra.Command{
-		Use:   "snapshot [filename]",
-		Short: "Snapshot streams a point-in-time snapshot of the store",
-		Run:   snapshotCommandFunc,
+	cmd := &cobra.Command{
+		Use:   "snapshot",
+		Short: "snapshot manages etcd node snapshots.",
 	}
+	cmd.AddCommand(NewSnapshotSaveCommand())
+	cmd.AddCommand(NewSnapshotRestoreCommand())
+	return cmd
 }
 
-// snapshotCommandFunc watches for the length of the entire store and records
-// to a file.
-func snapshotCommandFunc(cmd *cobra.Command, args []string) {
-	switch {
-	case len(args) == 0:
-		snapshotToStdout(mustClientFromCmd(cmd))
-	case len(args) == 1:
-		snapshotToFile(mustClientFromCmd(cmd), args[0])
-	default:
-		err := fmt.Errorf("snapshot takes at most one argument")
-		ExitWithError(ExitBadArgs, err)
+func NewSnapshotSaveCommand() *cobra.Command {
+	return &cobra.Command{
+		Use:   "save <filename>",
+		Short: "save stores an etcd node backend snapshot to a given file.",
+		Run:   snapshotSaveCommandFunc,
 	}
 }
 
-// snapshotToStdout streams a snapshot over stdout
-func snapshotToStdout(c *clientv3.Client) {
-	// must explicitly fetch first revision since no retry on stdout
-	wr := <-c.Watch(context.TODO(), "", clientv3.WithPrefix(), clientv3.WithRev(1))
-	if wr.Err() == nil {
-		wr.CompactRevision = 1
+func NewSnapshotRestoreCommand() *cobra.Command {
+	cmd := &cobra.Command{
+		Use:   "restore <filename>",
+		Short: "restore an etcd node snapshot to an etcd directory",
+		Run:   snapshotRestoreCommandFunc,
 	}
-	if rev := snapshot(os.Stdout, c, wr.CompactRevision+1); rev != 0 {
-		err := fmt.Errorf("snapshot interrupted by compaction %v", rev)
-		ExitWithError(ExitInterrupted, err)
-	}
-	os.Stdout.Sync()
+	cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory.")
+	cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap.")
+	cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap.")
+	cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster.")
+	cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member.")
+
+	return cmd
 }
 
-// snapshotToFile atomically writes a snapshot to a file
-func snapshotToFile(c *clientv3.Client, path string) {
+func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
+	if len(args) != 1 {
+		err := fmt.Errorf("snapshot save expects one argument")
+		ExitWithError(ExitBadArgs, err)
+	}
+
+	path := args[0]
+
 	partpath := path + ".part"
 	f, err := os.Create(partpath)
 	defer f.Close()
@@ -72,56 +97,172 @@ func snapshotToFile(c *clientv3.Client, path string) {
 		exiterr := fmt.Errorf("could not open %s (%v)", partpath, err)
 		ExitWithError(ExitBadArgs, exiterr)
 	}
-	rev := int64(1)
-	for rev != 0 {
-		f.Seek(0, 0)
-		f.Truncate(0)
-		rev = snapshot(f, c, rev)
+
+	c := mustClientFromCmd(cmd)
+	r, serr := c.Snapshot(context.TODO())
+	if serr != nil {
+		os.RemoveAll(partpath)
+		ExitWithError(ExitInterrupted, serr)
+	}
+	if _, rerr := io.Copy(f, r); rerr != nil {
+		os.RemoveAll(partpath)
+		ExitWithError(ExitInterrupted, rerr)
 	}
+
 	f.Sync()
-	if err := os.Rename(partpath, path); err != nil {
-		exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, err)
+
+	if rerr := os.Rename(partpath, path); rerr != nil {
+		exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr)
 		ExitWithError(ExitIO, exiterr)
 	}
 }
 
-// snapshot reads all of a watcher; returns compaction revision if incomplete
-// TODO: stabilize snapshot format
-func snapshot(w io.Writer, c *clientv3.Client, rev int64) int64 {
-	s := mirror.NewSyncer(c, "", rev)
+func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
+	if len(args) != 1 {
+		err := fmt.Errorf("snapshot restore exactly one argument")
+		ExitWithError(ExitBadArgs, err)
+	}
 
-	rc, errc := s.SyncBase(context.TODO())
+	urlmap, uerr := types.NewURLsMap(restoreCluster)
+	if uerr != nil {
+		ExitWithError(ExitBadArgs, uerr)
+	}
 
-	for r := range rc {
-		for _, kv := range r.Kvs {
-			fmt.Fprintln(w, kv)
-		}
+	cfg := etcdserver.ServerConfig{
+		InitialClusterToken: restoreClusterToken,
+		InitialPeerURLsMap:  urlmap,
+		PeerURLs:            types.MustNewURLs(strings.Split(restorePeerURLs, ",")),
+		Name:                restoreName,
+	}
+	if err := cfg.VerifyBootstrap(); err != nil {
+		ExitWithError(ExitBadArgs, err)
 	}
 
-	err := <-errc
-	if err != nil {
-		if err == rpctypes.ErrCompacted {
-			// will get correct compact revision on retry
-			return rev + 1
-		}
-		// failed for some unknown reason, retry on same revision
-		return rev
+	cl, cerr := membership.NewClusterFromURLsMap(restoreClusterToken, urlmap)
+	if cerr != nil {
+		ExitWithError(ExitBadArgs, cerr)
 	}
 
-	wc := s.SyncUpdates(context.TODO())
+	basedir := restoreDataDir
+	if basedir == "" {
+		basedir = restoreName + ".etcd"
+	}
+
+	waldir := path.Join(basedir, "member", "wal")
+	snapdir := path.Join(basedir, "member", "snap")
 
-	for wr := range wc {
-		if wr.Err() != nil {
-			return wr.CompactRevision
+	if _, err := os.Stat(basedir); err == nil {
+		ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
+	}
+
+	makeDB(snapdir, args[0])
+	makeWAL(waldir, cl)
+}
+
+func initialClusterFromName(name string) string {
+	n := name
+	if name == "" {
+		n = defaultName
+	}
+	return fmt.Sprintf("%s=http://localhost:2380,%s=http://localhost:7001", n, n)
+}
+
+// makeWAL creates a WAL for the initial cluster
+func makeWAL(waldir string, cl *membership.RaftCluster) {
+	if err := os.MkdirAll(waldir, 0755); err != nil {
+		ExitWithError(ExitIO, err)
+	}
+
+	m := cl.MemberByName(restoreName)
+	md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
+	metadata, merr := md.Marshal()
+	if merr != nil {
+		ExitWithError(ExitInvalidInput, merr)
+	}
+
+	w, walerr := wal.Create(waldir, metadata)
+	if walerr != nil {
+		ExitWithError(ExitIO, walerr)
+	}
+	defer w.Close()
+
+	peers := make([]raft.Peer, len(cl.MemberIDs()))
+	for i, id := range cl.MemberIDs() {
+		ctx, err := json.Marshal((*cl).Member(id))
+		if err != nil {
+			ExitWithError(ExitInvalidInput, err)
 		}
-		for _, ev := range wr.Events {
-			fmt.Fprintln(w, ev)
+		peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
+	}
+
+	ents := make([]raftpb.Entry, len(peers))
+	for i, p := range peers {
+		cc := raftpb.ConfChange{
+			Type:    raftpb.ConfChangeAddNode,
+			NodeID:  p.ID,
+			Context: p.Context}
+		d, err := cc.Marshal()
+		if err != nil {
+			ExitWithError(ExitInvalidInput, err)
 		}
-		rev := wr.Events[len(wr.Events)-1].Kv.ModRevision
-		if rev >= wr.Header.Revision {
-			break
+		e := raftpb.Entry{
+			Type:  raftpb.EntryConfChange,
+			Term:  1,
+			Index: uint64(i + 1),
+			Data:  d,
 		}
+		ents[i] = e
 	}
 
-	return 0
+	w.Save(raftpb.HardState{
+		Term:   1,
+		Vote:   peers[0].ID,
+		Commit: uint64(len(ents))}, ents)
+}
+
+// initIndex implements ConsistentIndexGetter so the snapshot won't block
+// the new raft instance by waiting for a future raft index.
+type initIndex struct{}
+
+func (*initIndex) ConsistentIndex() uint64 { return 1 }
+
+// makeDB copies the database snapshot to the snapshot directory
+func makeDB(snapdir, dbfile string) {
+	f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
+	if ferr != nil {
+		ExitWithError(ExitInvalidInput, ferr)
+	}
+	defer f.Close()
+
+	if err := os.MkdirAll(snapdir, 0755); err != nil {
+		ExitWithError(ExitIO, err)
+	}
+
+	dbpath := path.Join(snapdir, "db")
+	db, dberr := os.OpenFile(dbpath, os.O_WRONLY|os.O_CREATE, 0600)
+	if dberr != nil {
+		ExitWithError(ExitIO, dberr)
+	}
+	if _, err := io.Copy(db, f); err != nil {
+		ExitWithError(ExitIO, err)
+	}
+	db.Close()
+
+	// update consistentIndex so applies go through on etcdserver despite
+	// having a new raft instance
+	be := backend.NewDefaultBackend(dbpath)
+	s := storage.NewStore(be, nil, &initIndex{})
+	id := s.TxnBegin()
+	btx := be.BatchTx()
+	del := func(k, v []byte) error {
+		_, _, err := s.TxnDeleteRange(id, k, nil)
+		return err
+	}
+	// delete stored members from old cluster since using new members
+	btx.UnsafeForEach([]byte("members"), del)
+	btx.UnsafeForEach([]byte("members_removed"), del)
+	// trigger write-out of new consistent index
+	s.TxnEnd(id)
+	s.Commit()
+	s.Close()
 }

+ 6 - 1
storage/backend/batch_tx.go

@@ -125,7 +125,12 @@ func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
 
 // UnsafeForEach must be called holding the lock on the tx.
 func (t *batchTx) UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error {
-	return t.tx.Bucket(bucketName).ForEach(visitor)
+	b := t.tx.Bucket(bucketName)
+	if b == nil {
+		// bucket does not exist
+		return nil
+	}
+	return b.ForEach(visitor)
 }
 
 // Commit commits a previous tx and begins a new writable one.