v3_snapshot.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package snapshot
  15. import (
  16. "context"
  17. "crypto/sha256"
  18. "encoding/json"
  19. "fmt"
  20. "hash/crc32"
  21. "io"
  22. "math"
  23. "os"
  24. "path/filepath"
  25. "reflect"
  26. "strings"
  27. "time"
  28. bolt "go.etcd.io/bbolt"
  29. "go.etcd.io/etcd/clientv3"
  30. "go.etcd.io/etcd/etcdserver"
  31. "go.etcd.io/etcd/etcdserver/api/membership"
  32. "go.etcd.io/etcd/etcdserver/api/snap"
  33. "go.etcd.io/etcd/etcdserver/api/v2store"
  34. "go.etcd.io/etcd/etcdserver/etcdserverpb"
  35. "go.etcd.io/etcd/lease"
  36. "go.etcd.io/etcd/mvcc"
  37. "go.etcd.io/etcd/mvcc/backend"
  38. "go.etcd.io/etcd/pkg/fileutil"
  39. "go.etcd.io/etcd/pkg/traceutil"
  40. "go.etcd.io/etcd/pkg/types"
  41. "go.etcd.io/etcd/raft"
  42. "go.etcd.io/etcd/raft/raftpb"
  43. "go.etcd.io/etcd/wal"
  44. "go.etcd.io/etcd/wal/walpb"
  45. "go.uber.org/zap"
  46. )
  47. // Manager defines snapshot methods.
  48. type Manager interface {
  49. // Save fetches snapshot from remote etcd server and saves data
  50. // to target path. If the context "ctx" is canceled or timed out,
  51. // snapshot save stream will error out (e.g. context.Canceled,
  52. // context.DeadlineExceeded). Make sure to specify only one endpoint
  53. // in client configuration. Snapshot API must be requested to a
  54. // selected node, and saved snapshot is the point-in-time state of
  55. // the selected node.
  56. Save(ctx context.Context, cfg clientv3.Config, dbPath string) error
  57. // Status returns the snapshot file information.
  58. Status(dbPath string) (Status, error)
  59. // Restore restores a new etcd data directory from given snapshot
  60. // file. It returns an error if specified data directory already
  61. // exists, to prevent unintended data directory overwrites.
  62. Restore(cfg RestoreConfig) error
  63. }
  64. // NewV3 returns a new snapshot Manager for v3.x snapshot.
  65. func NewV3(lg *zap.Logger) Manager {
  66. if lg == nil {
  67. lg = zap.NewExample()
  68. }
  69. return &v3Manager{lg: lg}
  70. }
  71. type v3Manager struct {
  72. lg *zap.Logger
  73. name string
  74. dbPath string
  75. walDir string
  76. snapDir string
  77. cl *membership.RaftCluster
  78. skipHashCheck bool
  79. }
  80. // Save fetches snapshot from remote etcd server and saves data to target path.
  81. func (s *v3Manager) Save(ctx context.Context, cfg clientv3.Config, dbPath string) error {
  82. if len(cfg.Endpoints) != 1 {
  83. return fmt.Errorf("snapshot must be requested to one selected node, not multiple %v", cfg.Endpoints)
  84. }
  85. cli, err := clientv3.New(cfg)
  86. if err != nil {
  87. return err
  88. }
  89. defer cli.Close()
  90. partpath := dbPath + ".part"
  91. defer os.RemoveAll(partpath)
  92. var f *os.File
  93. f, err = os.OpenFile(partpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, fileutil.PrivateFileMode)
  94. if err != nil {
  95. return fmt.Errorf("could not open %s (%v)", partpath, err)
  96. }
  97. s.lg.Info(
  98. "created temporary db file",
  99. zap.String("path", partpath),
  100. )
  101. now := time.Now()
  102. var rd io.ReadCloser
  103. rd, err = cli.Snapshot(ctx)
  104. if err != nil {
  105. return err
  106. }
  107. s.lg.Info(
  108. "fetching snapshot",
  109. zap.String("endpoint", cfg.Endpoints[0]),
  110. )
  111. if _, err = io.Copy(f, rd); err != nil {
  112. return err
  113. }
  114. if err = fileutil.Fsync(f); err != nil {
  115. return err
  116. }
  117. if err = f.Close(); err != nil {
  118. return err
  119. }
  120. s.lg.Info(
  121. "fetched snapshot",
  122. zap.String("endpoint", cfg.Endpoints[0]),
  123. zap.Duration("took", time.Since(now)),
  124. )
  125. if err = os.Rename(partpath, dbPath); err != nil {
  126. return fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err)
  127. }
  128. s.lg.Info("saved", zap.String("path", dbPath))
  129. return nil
  130. }
  131. // Status is the snapshot file status.
  132. type Status struct {
  133. Hash uint32 `json:"hash"`
  134. Revision int64 `json:"revision"`
  135. TotalKey int `json:"totalKey"`
  136. TotalSize int64 `json:"totalSize"`
  137. }
  138. // Status returns the snapshot file information.
  139. func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
  140. if _, err = os.Stat(dbPath); err != nil {
  141. return ds, err
  142. }
  143. db, err := bolt.Open(dbPath, 0400, &bolt.Options{ReadOnly: true})
  144. if err != nil {
  145. return ds, err
  146. }
  147. defer db.Close()
  148. h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
  149. if err = db.View(func(tx *bolt.Tx) error {
  150. // check snapshot file integrity first
  151. var dbErrStrings []string
  152. for dbErr := range tx.Check() {
  153. dbErrStrings = append(dbErrStrings, dbErr.Error())
  154. }
  155. if len(dbErrStrings) > 0 {
  156. return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings))
  157. }
  158. ds.TotalSize = tx.Size()
  159. c := tx.Cursor()
  160. for next, _ := c.First(); next != nil; next, _ = c.Next() {
  161. b := tx.Bucket(next)
  162. if b == nil {
  163. return fmt.Errorf("cannot get hash of bucket %s", string(next))
  164. }
  165. h.Write(next)
  166. iskeyb := (string(next) == "key")
  167. b.ForEach(func(k, v []byte) error {
  168. h.Write(k)
  169. h.Write(v)
  170. if iskeyb {
  171. rev := bytesToRev(k)
  172. ds.Revision = rev.main
  173. }
  174. ds.TotalKey++
  175. return nil
  176. })
  177. }
  178. return nil
  179. }); err != nil {
  180. return ds, err
  181. }
  182. ds.Hash = h.Sum32()
  183. return ds, nil
  184. }
  185. // RestoreConfig configures snapshot restore operation.
  186. type RestoreConfig struct {
  187. // SnapshotPath is the path of snapshot file to restore from.
  188. SnapshotPath string
  189. // Name is the human-readable name of this member.
  190. Name string
  191. // OutputDataDir is the target data directory to save restored data.
  192. // OutputDataDir should not conflict with existing etcd data directory.
  193. // If OutputDataDir already exists, it will return an error to prevent
  194. // unintended data directory overwrites.
  195. // If empty, defaults to "[Name].etcd" if not given.
  196. OutputDataDir string
  197. // OutputWALDir is the target WAL data directory.
  198. // If empty, defaults to "[OutputDataDir]/member/wal" if not given.
  199. OutputWALDir string
  200. // PeerURLs is a list of member's peer URLs to advertise to the rest of the cluster.
  201. PeerURLs []string
  202. // InitialCluster is the initial cluster configuration for restore bootstrap.
  203. InitialCluster string
  204. // InitialClusterToken is the initial cluster token for etcd cluster during restore bootstrap.
  205. InitialClusterToken string
  206. // SkipHashCheck is "true" to ignore snapshot integrity hash value
  207. // (required if copied from data directory).
  208. SkipHashCheck bool
  209. }
  210. // Restore restores a new etcd data directory from given snapshot file.
  211. func (s *v3Manager) Restore(cfg RestoreConfig) error {
  212. pURLs, err := types.NewURLs(cfg.PeerURLs)
  213. if err != nil {
  214. return err
  215. }
  216. var ics types.URLsMap
  217. ics, err = types.NewURLsMap(cfg.InitialCluster)
  218. if err != nil {
  219. return err
  220. }
  221. srv := etcdserver.ServerConfig{
  222. Logger: s.lg,
  223. Name: cfg.Name,
  224. PeerURLs: pURLs,
  225. InitialPeerURLsMap: ics,
  226. InitialClusterToken: cfg.InitialClusterToken,
  227. }
  228. if err = srv.VerifyBootstrap(); err != nil {
  229. return err
  230. }
  231. s.cl, err = membership.NewClusterFromURLsMap(s.lg, cfg.InitialClusterToken, ics)
  232. if err != nil {
  233. return err
  234. }
  235. dataDir := cfg.OutputDataDir
  236. if dataDir == "" {
  237. dataDir = cfg.Name + ".etcd"
  238. }
  239. if fileutil.Exist(dataDir) {
  240. return fmt.Errorf("data-dir %q exists", dataDir)
  241. }
  242. walDir := cfg.OutputWALDir
  243. if walDir == "" {
  244. walDir = filepath.Join(dataDir, "member", "wal")
  245. } else if fileutil.Exist(walDir) {
  246. return fmt.Errorf("wal-dir %q exists", walDir)
  247. }
  248. s.name = cfg.Name
  249. s.dbPath = cfg.SnapshotPath
  250. s.walDir = walDir
  251. s.snapDir = filepath.Join(dataDir, "member", "snap")
  252. s.skipHashCheck = cfg.SkipHashCheck
  253. s.lg.Info(
  254. "restoring snapshot",
  255. zap.String("path", s.dbPath),
  256. zap.String("wal-dir", s.walDir),
  257. zap.String("data-dir", dataDir),
  258. zap.String("snap-dir", s.snapDir),
  259. )
  260. if err = s.saveDB(); err != nil {
  261. return err
  262. }
  263. if err = s.saveWALAndSnap(); err != nil {
  264. return err
  265. }
  266. s.lg.Info(
  267. "restored snapshot",
  268. zap.String("path", s.dbPath),
  269. zap.String("wal-dir", s.walDir),
  270. zap.String("data-dir", dataDir),
  271. zap.String("snap-dir", s.snapDir),
  272. )
  273. return nil
  274. }
  275. // saveDB copies the database snapshot to the snapshot directory
  276. func (s *v3Manager) saveDB() error {
  277. f, ferr := os.OpenFile(s.dbPath, os.O_RDONLY, 0600)
  278. if ferr != nil {
  279. return ferr
  280. }
  281. defer f.Close()
  282. // get snapshot integrity hash
  283. if _, err := f.Seek(-sha256.Size, io.SeekEnd); err != nil {
  284. return err
  285. }
  286. sha := make([]byte, sha256.Size)
  287. if _, err := f.Read(sha); err != nil {
  288. return err
  289. }
  290. if _, err := f.Seek(0, io.SeekStart); err != nil {
  291. return err
  292. }
  293. if err := fileutil.CreateDirAll(s.snapDir); err != nil {
  294. return err
  295. }
  296. dbpath := filepath.Join(s.snapDir, "db")
  297. db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600)
  298. if dberr != nil {
  299. return dberr
  300. }
  301. if _, err := io.Copy(db, f); err != nil {
  302. return err
  303. }
  304. // truncate away integrity hash, if any.
  305. off, serr := db.Seek(0, io.SeekEnd)
  306. if serr != nil {
  307. return serr
  308. }
  309. hasHash := (off % 512) == sha256.Size
  310. if hasHash {
  311. if err := db.Truncate(off - sha256.Size); err != nil {
  312. return err
  313. }
  314. }
  315. if !hasHash && !s.skipHashCheck {
  316. return fmt.Errorf("snapshot missing hash but --skip-hash-check=false")
  317. }
  318. if hasHash && !s.skipHashCheck {
  319. // check for match
  320. if _, err := db.Seek(0, io.SeekStart); err != nil {
  321. return err
  322. }
  323. h := sha256.New()
  324. if _, err := io.Copy(h, db); err != nil {
  325. return err
  326. }
  327. dbsha := h.Sum(nil)
  328. if !reflect.DeepEqual(sha, dbsha) {
  329. return fmt.Errorf("expected sha256 %v, got %v", sha, dbsha)
  330. }
  331. }
  332. // db hash is OK, can now modify DB so it can be part of a new cluster
  333. db.Close()
  334. commit := len(s.cl.Members())
  335. // update consistentIndex so applies go through on etcdserver despite
  336. // having a new raft instance
  337. be := backend.NewDefaultBackend(dbpath)
  338. // a lessor never timeouts leases
  339. lessor := lease.NewLessor(s.lg, be, lease.LessorConfig{MinLeaseTTL: math.MaxInt64})
  340. mvs := mvcc.NewStore(s.lg, be, lessor, (*initIndex)(&commit), mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
  341. txn := mvs.Write(traceutil.TODO())
  342. btx := be.BatchTx()
  343. del := func(k, v []byte) error {
  344. txn.DeleteRange(k, nil)
  345. return nil
  346. }
  347. // delete stored members from old cluster since using new members
  348. btx.UnsafeForEach([]byte("members"), del)
  349. // todo: add back new members when we start to deprecate old snap file.
  350. btx.UnsafeForEach([]byte("members_removed"), del)
  351. // trigger write-out of new consistent index
  352. txn.End()
  353. mvs.Commit()
  354. mvs.Close()
  355. be.Close()
  356. return nil
  357. }
  358. // saveWALAndSnap creates a WAL for the initial cluster
  359. func (s *v3Manager) saveWALAndSnap() error {
  360. if err := fileutil.CreateDirAll(s.walDir); err != nil {
  361. return err
  362. }
  363. // add members again to persist them to the store we create.
  364. st := v2store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
  365. s.cl.SetStore(st)
  366. for _, m := range s.cl.Members() {
  367. s.cl.AddMember(m)
  368. }
  369. m := s.cl.MemberByName(s.name)
  370. md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())}
  371. metadata, merr := md.Marshal()
  372. if merr != nil {
  373. return merr
  374. }
  375. w, walerr := wal.Create(s.lg, s.walDir, metadata)
  376. if walerr != nil {
  377. return walerr
  378. }
  379. defer w.Close()
  380. peers := make([]raft.Peer, len(s.cl.MemberIDs()))
  381. for i, id := range s.cl.MemberIDs() {
  382. ctx, err := json.Marshal((*s.cl).Member(id))
  383. if err != nil {
  384. return err
  385. }
  386. peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
  387. }
  388. ents := make([]raftpb.Entry, len(peers))
  389. nodeIDs := make([]uint64, len(peers))
  390. for i, p := range peers {
  391. nodeIDs[i] = p.ID
  392. cc := raftpb.ConfChange{
  393. Type: raftpb.ConfChangeAddNode,
  394. NodeID: p.ID,
  395. Context: p.Context,
  396. }
  397. d, err := cc.Marshal()
  398. if err != nil {
  399. return err
  400. }
  401. ents[i] = raftpb.Entry{
  402. Type: raftpb.EntryConfChange,
  403. Term: 1,
  404. Index: uint64(i + 1),
  405. Data: d,
  406. }
  407. }
  408. commit, term := uint64(len(ents)), uint64(1)
  409. if err := w.Save(raftpb.HardState{
  410. Term: term,
  411. Vote: peers[0].ID,
  412. Commit: commit,
  413. }, ents); err != nil {
  414. return err
  415. }
  416. b, berr := st.Save()
  417. if berr != nil {
  418. return berr
  419. }
  420. raftSnap := raftpb.Snapshot{
  421. Data: b,
  422. Metadata: raftpb.SnapshotMetadata{
  423. Index: commit,
  424. Term: term,
  425. ConfState: raftpb.ConfState{
  426. Voters: nodeIDs,
  427. },
  428. },
  429. }
  430. sn := snap.New(s.lg, s.snapDir)
  431. if err := sn.SaveSnap(raftSnap); err != nil {
  432. return err
  433. }
  434. return w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term})
  435. }