123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485 |
- // Copyright 2018 The etcd Authors
- //
- // Licensed under the Apache License, Version 2.0 (the "License");
- // you may not use this file except in compliance with the License.
- // You may obtain a copy of the License at
- //
- // http://www.apache.org/licenses/LICENSE-2.0
- //
- // Unless required by applicable law or agreed to in writing, software
- // distributed under the License is distributed on an "AS IS" BASIS,
- // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- // See the License for the specific language governing permissions and
- // limitations under the License.
- package snapshot
- import (
- "context"
- "crypto/sha256"
- "encoding/json"
- "fmt"
- "hash/crc32"
- "io"
- "math"
- "os"
- "path/filepath"
- "reflect"
- "time"
- "github.com/coreos/etcd/clientv3"
- "github.com/coreos/etcd/etcdserver"
- "github.com/coreos/etcd/etcdserver/etcdserverpb"
- "github.com/coreos/etcd/etcdserver/membership"
- "github.com/coreos/etcd/lease"
- "github.com/coreos/etcd/mvcc"
- "github.com/coreos/etcd/mvcc/backend"
- "github.com/coreos/etcd/pkg/fileutil"
- "github.com/coreos/etcd/pkg/types"
- "github.com/coreos/etcd/raft"
- "github.com/coreos/etcd/raft/raftpb"
- "github.com/coreos/etcd/snap"
- "github.com/coreos/etcd/store"
- "github.com/coreos/etcd/wal"
- "github.com/coreos/etcd/wal/walpb"
- bolt "github.com/coreos/bbolt"
- "go.uber.org/zap"
- )
- // Manager defines snapshot methods.
- type Manager interface {
- // Save fetches snapshot from remote etcd server and saves data
- // to target path. If the context "ctx" is canceled or timed out,
- // snapshot save stream will error out (e.g. context.Canceled,
- // context.DeadlineExceeded). Make sure to specify only one endpoint
- // in client configuration. Snapshot API must be requested to a
- // selected node, and saved snapshot is the point-in-time state of
- // the selected node.
- Save(ctx context.Context, cfg clientv3.Config, dbPath string) error
- // Status returns the snapshot file information.
- Status(dbPath string) (Status, error)
- // Restore restores a new etcd data directory from given snapshot
- // file. It returns an error if specified data directory already
- // exists, to prevent unintended data directory overwrites.
- Restore(cfg RestoreConfig) error
- }
- // NewV3 returns a new snapshot Manager for v3.x snapshot.
- func NewV3(lg *zap.Logger) Manager {
- if lg == nil {
- lg = zap.NewExample()
- }
- return &v3Manager{lg: lg}
- }
- type v3Manager struct {
- lg *zap.Logger
- name string
- dbPath string
- walDir string
- snapDir string
- cl *membership.RaftCluster
- skipHashCheck bool
- }
- // Save fetches snapshot from remote etcd server and saves data to target path.
- func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error {
- if len(cfg.Endpoints) != 1 {
- return fmt.Errorf("snapshot must be requested to one selected node, not multiple %v", cfg.Endpoints)
- }
- cli, err := clientv3.New(cfg)
- if err != nil {
- return err
- }
- defer cli.Close()
- partpath := dbPath + ".part"
- defer os.RemoveAll(partpath)
- var f *os.File
- f, err = os.Create(partpath)
- if err != nil {
- return fmt.Errorf("could not open %s (%v)", partpath, err)
- }
- s.lg.Info(
- "created temporary db file",
- zap.String("path", partpath),
- )
- now := time.Now()
- var rd io.ReadCloser
- rd, err = cli.Snapshot(ctx)
- if err != nil {
- return err
- }
- s.lg.Info(
- "fetching snapshot",
- zap.String("endpoint", cfg.Endpoints[0]),
- )
- if _, err = io.Copy(f, rd); err != nil {
- return err
- }
- if err = fileutil.Fsync(f); err != nil {
- return err
- }
- if err = f.Close(); err != nil {
- return err
- }
- s.lg.Info(
- "fetched snapshot",
- zap.String("endpoint", cfg.Endpoints[0]),
- zap.Duration("took", time.Since(now)),
- )
- if err = os.Rename(partpath, dbPath); err != nil {
- return fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err)
- }
- s.lg.Info("saved", zap.String("path", dbPath))
- return nil
- }
- // Status is the snapshot file status.
- type Status struct {
- Hash uint32 `json:"hash"`
- Revision int64 `json:"revision"`
- TotalKey int `json:"totalKey"`
- TotalSize int64 `json:"totalSize"`
- }
- // Status returns the snapshot file information.
- func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
- if _, err = os.Stat(dbPath); err != nil {
- return ds, err
- }
- db, err := bolt.Open(dbPath, 0400, &bolt.Options{ReadOnly: true})
- if err != nil {
- return ds, err
- }
- defer db.Close()
- h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
- if err = db.View(func(tx *bolt.Tx) error {
- ds.TotalSize = tx.Size()
- c := tx.Cursor()
- for next, _ := c.First(); next != nil; next, _ = c.Next() {
- b := tx.Bucket(next)
- if b == nil {
- return fmt.Errorf("cannot get hash of bucket %s", string(next))
- }
- h.Write(next)
- iskeyb := (string(next) == "key")
- b.ForEach(func(k, v []byte) error {
- h.Write(k)
- h.Write(v)
- if iskeyb {
- rev := bytesToRev(k)
- ds.Revision = rev.main
- }
- ds.TotalKey++
- return nil
- })
- }
- return nil
- }); err != nil {
- return ds, err
- }
- ds.Hash = h.Sum32()
- return ds, nil
- }
- // RestoreConfig configures snapshot restore operation.
- type RestoreConfig struct {
- // SnapshotPath is the path of snapshot file to restore from.
- SnapshotPath string
- // Name is the human-readable name of this member.
- Name string
- // OutputDataDir is the target data directory to save restored data.
- // OutputDataDir should not conflict with existing etcd data directory.
- // If OutputDataDir already exists, it will return an error to prevent
- // unintended data directory overwrites.
- // If empty, defaults to "[Name].etcd" if not given.
- OutputDataDir string
- // OutputWALDir is the target WAL data directory.
- // If empty, defaults to "[OutputDataDir]/member/wal" if not given.
- OutputWALDir string
- // PeerURLs is a list of member's peer URLs to advertise to the rest of the cluster.
- PeerURLs []string
- // InitialCluster is the initial cluster configuration for restore bootstrap.
- InitialCluster string
- // InitialClusterToken is the initial cluster token for etcd cluster during restore bootstrap.
- InitialClusterToken string
- // SkipHashCheck is "true" to ignore snapshot integrity hash value
- // (required if copied from data directory).
- SkipHashCheck bool
- }
- // Restore restores a new etcd data directory from given snapshot file.
- func (s *v3Manager) Restore(cfg RestoreConfig) error {
- pURLs, err := types.NewURLs(cfg.PeerURLs)
- if err != nil {
- return err
- }
- var ics types.URLsMap
- ics, err = types.NewURLsMap(cfg.InitialCluster)
- if err != nil {
- return err
- }
- srv := etcdserver.ServerConfig{
- Name: cfg.Name,
- PeerURLs: pURLs,
- InitialPeerURLsMap: ics,
- InitialClusterToken: cfg.InitialClusterToken,
- }
- if err = srv.VerifyBootstrap(); err != nil {
- return err
- }
- s.cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, ics)
- if err != nil {
- return err
- }
- dataDir := cfg.OutputDataDir
- if dataDir == "" {
- dataDir = cfg.Name + ".etcd"
- }
- if fileutil.Exist(dataDir) {
- return fmt.Errorf("data-dir %q exists", dataDir)
- }
- walDir := cfg.OutputWALDir
- if walDir == "" {
- walDir = filepath.Join(dataDir, "member", "wal")
- } else if fileutil.Exist(walDir) {
- return fmt.Errorf("wal-dir %q exists", walDir)
- }
- s.name = cfg.Name
- s.dbPath = cfg.SnapshotPath
- s.walDir = walDir
- s.snapDir = filepath.Join(dataDir, "member", "snap")
- s.skipHashCheck = cfg.SkipHashCheck
- s.lg.Info(
- "restoring snapshot",
- zap.String("path", s.dbPath),
- zap.String("wal-dir", s.walDir),
- zap.String("data-dir", dataDir),
- zap.String("snap-dir", s.snapDir),
- )
- if err = s.saveDB(); err != nil {
- return err
- }
- if err = s.saveWALAndSnap(); err != nil {
- return err
- }
- s.lg.Info(
- "restored snapshot",
- zap.String("path", s.dbPath),
- zap.String("wal-dir", s.walDir),
- zap.String("data-dir", dataDir),
- zap.String("snap-dir", s.snapDir),
- )
- return nil
- }
- // saveDB copies the database snapshot to the snapshot directory
- func (s *v3Manager) saveDB() error {
- f, ferr := os.OpenFile(s.dbPath, os.O_RDONLY, 0600)
- if ferr != nil {
- return ferr
- }
- defer f.Close()
- // get snapshot integrity hash
- if _, err := f.Seek(-sha256.Size, io.SeekEnd); err != nil {
- return err
- }
- sha := make([]byte, sha256.Size)
- if _, err := f.Read(sha); err != nil {
- return err
- }
- if _, err := f.Seek(0, io.SeekStart); err != nil {
- return err
- }
- if err := fileutil.CreateDirAll(s.snapDir); err != nil {
- return err
- }
- dbpath := filepath.Join(s.snapDir, "db")
- db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600)
- if dberr != nil {
- return dberr
- }
- if _, err := io.Copy(db, f); err != nil {
- return err
- }
- // truncate away integrity hash, if any.
- off, serr := db.Seek(0, io.SeekEnd)
- if serr != nil {
- return serr
- }
- hasHash := (off % 512) == sha256.Size
- if hasHash {
- if err := db.Truncate(off - sha256.Size); err != nil {
- return err
- }
- }
- if !hasHash && !s.skipHashCheck {
- return fmt.Errorf("snapshot missing hash but --skip-hash-check=false")
- }
- if hasHash && !s.skipHashCheck {
- // check for match
- if _, err := db.Seek(0, io.SeekStart); err != nil {
- return err
- }
- h := sha256.New()
- if _, err := io.Copy(h, db); err != nil {
- return err
- }
- dbsha := h.Sum(nil)
- if !reflect.DeepEqual(sha, dbsha) {
- return fmt.Errorf("expected sha256 %v, got %v", sha, dbsha)
- }
- }
- // db hash is OK, can now modify DB so it can be part of a new cluster
- db.Close()
- commit := len(s.cl.Members())
- // update consistentIndex so applies go through on etcdserver despite
- // having a new raft instance
- be := backend.NewDefaultBackend(dbpath)
- // a lessor never timeouts leases
- lessor := lease.NewLessor(be, math.MaxInt64)
- mvs := mvcc.NewStore(be, lessor, (*initIndex)(&commit))
- txn := mvs.Write()
- btx := be.BatchTx()
- del := func(k, v []byte) error {
- txn.DeleteRange(k, nil)
- return nil
- }
- // delete stored members from old cluster since using new members
- btx.UnsafeForEach([]byte("members"), del)
- // todo: add back new members when we start to deprecate old snap file.
- btx.UnsafeForEach([]byte("members_removed"), del)
- // trigger write-out of new consistent index
- txn.End()
- mvs.Commit()
- mvs.Close()
- be.Close()
- return nil
- }
- // saveWALAndSnap creates a WAL for the initial cluster
- func (s *v3Manager) saveWALAndSnap() error {
- if err := fileutil.CreateDirAll(s.walDir); err != nil {
- return err
- }
- // add members again to persist them to the store we create.
- st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
- s.cl.SetStore(st)
- for _, m := range s.cl.Members() {
- s.cl.AddMember(m)
- }
- m := s.cl.MemberByName(s.name)
- md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())}
- metadata, merr := md.Marshal()
- if merr != nil {
- return merr
- }
- w, walerr := wal.Create(s.walDir, metadata)
- if walerr != nil {
- return walerr
- }
- defer w.Close()
- peers := make([]raft.Peer, len(s.cl.MemberIDs()))
- for i, id := range s.cl.MemberIDs() {
- ctx, err := json.Marshal((*s.cl).Member(id))
- if err != nil {
- return err
- }
- peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
- }
- ents := make([]raftpb.Entry, len(peers))
- nodeIDs := make([]uint64, len(peers))
- for i, p := range peers {
- nodeIDs[i] = p.ID
- cc := raftpb.ConfChange{
- Type: raftpb.ConfChangeAddNode,
- NodeID: p.ID,
- Context: p.Context,
- }
- d, err := cc.Marshal()
- if err != nil {
- return err
- }
- ents[i] = raftpb.Entry{
- Type: raftpb.EntryConfChange,
- Term: 1,
- Index: uint64(i + 1),
- Data: d,
- }
- }
- commit, term := uint64(len(ents)), uint64(1)
- if err := w.Save(raftpb.HardState{
- Term: term,
- Vote: peers[0].ID,
- Commit: commit,
- }, ents); err != nil {
- return err
- }
- b, berr := st.Save()
- if berr != nil {
- return berr
- }
- raftSnap := raftpb.Snapshot{
- Data: b,
- Metadata: raftpb.SnapshotMetadata{
- Index: commit,
- Term: term,
- ConfState: raftpb.ConfState{
- Nodes: nodeIDs,
- },
- },
- }
- sn := snap.New(s.snapDir)
- if err := sn.SaveSnap(raftSnap); err != nil {
- return err
- }
- return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term})
- }
|