snapshot_command.go 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package command
  15. import (
  16. "context"
  17. "crypto/sha256"
  18. "encoding/binary"
  19. "encoding/json"
  20. "fmt"
  21. "hash/crc32"
  22. "io"
  23. "math"
  24. "os"
  25. "path/filepath"
  26. "reflect"
  27. "strings"
  28. "github.com/coreos/etcd/etcdserver"
  29. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  30. "github.com/coreos/etcd/etcdserver/membership"
  31. "github.com/coreos/etcd/lease"
  32. "github.com/coreos/etcd/mvcc"
  33. "github.com/coreos/etcd/mvcc/backend"
  34. "github.com/coreos/etcd/pkg/fileutil"
  35. "github.com/coreos/etcd/pkg/types"
  36. "github.com/coreos/etcd/raft"
  37. "github.com/coreos/etcd/raft/raftpb"
  38. "github.com/coreos/etcd/snap"
  39. "github.com/coreos/etcd/store"
  40. "github.com/coreos/etcd/wal"
  41. "github.com/coreos/etcd/wal/walpb"
  42. bolt "github.com/coreos/bbolt"
  43. "github.com/spf13/cobra"
  44. )
  45. const (
  46. defaultName = "default"
  47. defaultInitialAdvertisePeerURLs = "http://localhost:2380"
  48. )
  49. var (
  50. restoreCluster string
  51. restoreClusterToken string
  52. restoreDataDir string
  53. restoreWalDir string
  54. restorePeerURLs string
  55. restoreName string
  56. skipHashCheck bool
  57. )
  58. // NewSnapshotCommand returns the cobra command for "snapshot".
  59. func NewSnapshotCommand() *cobra.Command {
  60. cmd := &cobra.Command{
  61. Use: "snapshot <subcommand>",
  62. Short: "Manages etcd node snapshots",
  63. }
  64. cmd.AddCommand(NewSnapshotSaveCommand())
  65. cmd.AddCommand(NewSnapshotRestoreCommand())
  66. cmd.AddCommand(newSnapshotStatusCommand())
  67. return cmd
  68. }
  69. func NewSnapshotSaveCommand() *cobra.Command {
  70. return &cobra.Command{
  71. Use: "save <filename>",
  72. Short: "Stores an etcd node backend snapshot to a given file",
  73. Run: snapshotSaveCommandFunc,
  74. }
  75. }
  76. func newSnapshotStatusCommand() *cobra.Command {
  77. return &cobra.Command{
  78. Use: "status <filename>",
  79. Short: "Gets backend snapshot status of a given file",
  80. Long: `When --write-out is set to simple, this command prints out comma-separated status lists for each endpoint.
  81. The items in the lists are hash, revision, total keys, total size.
  82. `,
  83. Run: snapshotStatusCommandFunc,
  84. }
  85. }
  86. func NewSnapshotRestoreCommand() *cobra.Command {
  87. cmd := &cobra.Command{
  88. Use: "restore <filename> [options]",
  89. Short: "Restores an etcd member snapshot to an etcd directory",
  90. Run: snapshotRestoreCommandFunc,
  91. }
  92. cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory")
  93. cmd.Flags().StringVar(&restoreWalDir, "wal-dir", "", "Path to the WAL directory (use --data-dir if none given)")
  94. cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap")
  95. cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap")
  96. cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster")
  97. cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member")
  98. cmd.Flags().BoolVar(&skipHashCheck, "skip-hash-check", false, "Ignore snapshot integrity hash value (required if copied from data directory)")
  99. return cmd
  100. }
  101. func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
  102. if len(args) != 1 {
  103. err := fmt.Errorf("snapshot save expects one argument")
  104. ExitWithError(ExitBadArgs, err)
  105. }
  106. path := args[0]
  107. partpath := path + ".part"
  108. f, err := os.Create(partpath)
  109. if err != nil {
  110. exiterr := fmt.Errorf("could not open %s (%v)", partpath, err)
  111. ExitWithError(ExitBadArgs, exiterr)
  112. }
  113. c := mustClientFromCmd(cmd)
  114. r, serr := c.Snapshot(context.TODO())
  115. if serr != nil {
  116. os.RemoveAll(partpath)
  117. ExitWithError(ExitInterrupted, serr)
  118. }
  119. if _, rerr := io.Copy(f, r); rerr != nil {
  120. os.RemoveAll(partpath)
  121. ExitWithError(ExitInterrupted, rerr)
  122. }
  123. fileutil.Fsync(f)
  124. f.Close()
  125. if rerr := os.Rename(partpath, path); rerr != nil {
  126. exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr)
  127. ExitWithError(ExitIO, exiterr)
  128. }
  129. fmt.Printf("Snapshot saved at %s\n", path)
  130. }
  131. func snapshotStatusCommandFunc(cmd *cobra.Command, args []string) {
  132. if len(args) != 1 {
  133. err := fmt.Errorf("snapshot status requires exactly one argument")
  134. ExitWithError(ExitBadArgs, err)
  135. }
  136. initDisplayFromCmd(cmd)
  137. ds := dbStatus(args[0])
  138. display.DBStatus(ds)
  139. }
  140. func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
  141. if len(args) != 1 {
  142. err := fmt.Errorf("snapshot restore requires exactly one argument")
  143. ExitWithError(ExitBadArgs, err)
  144. }
  145. urlmap, uerr := types.NewURLsMap(restoreCluster)
  146. if uerr != nil {
  147. ExitWithError(ExitBadArgs, uerr)
  148. }
  149. cfg := etcdserver.ServerConfig{
  150. InitialClusterToken: restoreClusterToken,
  151. InitialPeerURLsMap: urlmap,
  152. PeerURLs: types.MustNewURLs(strings.Split(restorePeerURLs, ",")),
  153. Name: restoreName,
  154. }
  155. if err := cfg.VerifyBootstrap(); err != nil {
  156. ExitWithError(ExitBadArgs, err)
  157. }
  158. cl, cerr := membership.NewClusterFromURLsMap(restoreClusterToken, urlmap)
  159. if cerr != nil {
  160. ExitWithError(ExitBadArgs, cerr)
  161. }
  162. basedir := restoreDataDir
  163. if basedir == "" {
  164. basedir = restoreName + ".etcd"
  165. }
  166. waldir := restoreWalDir
  167. if waldir == "" {
  168. waldir = filepath.Join(basedir, "member", "wal")
  169. }
  170. snapdir := filepath.Join(basedir, "member", "snap")
  171. if _, err := os.Stat(basedir); err == nil {
  172. ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
  173. }
  174. makeDB(snapdir, args[0], len(cl.Members()))
  175. makeWALAndSnap(waldir, snapdir, cl)
  176. }
  177. func initialClusterFromName(name string) string {
  178. n := name
  179. if name == "" {
  180. n = defaultName
  181. }
  182. return fmt.Sprintf("%s=http://localhost:2380", n)
  183. }
  184. // makeWAL creates a WAL for the initial cluster
  185. func makeWALAndSnap(waldir, snapdir string, cl *membership.RaftCluster) {
  186. if err := fileutil.CreateDirAll(waldir); err != nil {
  187. ExitWithError(ExitIO, err)
  188. }
  189. // add members again to persist them to the store we create.
  190. st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
  191. cl.SetStore(st)
  192. for _, m := range cl.Members() {
  193. cl.AddMember(m)
  194. }
  195. m := cl.MemberByName(restoreName)
  196. md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
  197. metadata, merr := md.Marshal()
  198. if merr != nil {
  199. ExitWithError(ExitInvalidInput, merr)
  200. }
  201. w, walerr := wal.Create(waldir, metadata)
  202. if walerr != nil {
  203. ExitWithError(ExitIO, walerr)
  204. }
  205. defer w.Close()
  206. peers := make([]raft.Peer, len(cl.MemberIDs()))
  207. for i, id := range cl.MemberIDs() {
  208. ctx, err := json.Marshal((*cl).Member(id))
  209. if err != nil {
  210. ExitWithError(ExitInvalidInput, err)
  211. }
  212. peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
  213. }
  214. ents := make([]raftpb.Entry, len(peers))
  215. nodeIDs := make([]uint64, len(peers))
  216. for i, p := range peers {
  217. nodeIDs[i] = p.ID
  218. cc := raftpb.ConfChange{
  219. Type: raftpb.ConfChangeAddNode,
  220. NodeID: p.ID,
  221. Context: p.Context}
  222. d, err := cc.Marshal()
  223. if err != nil {
  224. ExitWithError(ExitInvalidInput, err)
  225. }
  226. e := raftpb.Entry{
  227. Type: raftpb.EntryConfChange,
  228. Term: 1,
  229. Index: uint64(i + 1),
  230. Data: d,
  231. }
  232. ents[i] = e
  233. }
  234. commit, term := uint64(len(ents)), uint64(1)
  235. if err := w.Save(raftpb.HardState{
  236. Term: term,
  237. Vote: peers[0].ID,
  238. Commit: commit}, ents); err != nil {
  239. ExitWithError(ExitIO, err)
  240. }
  241. b, berr := st.Save()
  242. if berr != nil {
  243. ExitWithError(ExitError, berr)
  244. }
  245. raftSnap := raftpb.Snapshot{
  246. Data: b,
  247. Metadata: raftpb.SnapshotMetadata{
  248. Index: commit,
  249. Term: term,
  250. ConfState: raftpb.ConfState{
  251. Nodes: nodeIDs,
  252. },
  253. },
  254. }
  255. snapshotter := snap.New(snapdir)
  256. if err := snapshotter.SaveSnap(raftSnap); err != nil {
  257. panic(err)
  258. }
  259. if err := w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term}); err != nil {
  260. ExitWithError(ExitIO, err)
  261. }
  262. }
  263. // initIndex implements ConsistentIndexGetter so the snapshot won't block
  264. // the new raft instance by waiting for a future raft index.
  265. type initIndex int
  266. func (i *initIndex) ConsistentIndex() uint64 { return uint64(*i) }
  267. // makeDB copies the database snapshot to the snapshot directory
  268. func makeDB(snapdir, dbfile string, commit int) {
  269. f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
  270. if ferr != nil {
  271. ExitWithError(ExitInvalidInput, ferr)
  272. }
  273. defer f.Close()
  274. // get snapshot integrity hash
  275. if _, err := f.Seek(-sha256.Size, io.SeekEnd); err != nil {
  276. ExitWithError(ExitIO, err)
  277. }
  278. sha := make([]byte, sha256.Size)
  279. if _, err := f.Read(sha); err != nil {
  280. ExitWithError(ExitIO, err)
  281. }
  282. if _, err := f.Seek(0, io.SeekStart); err != nil {
  283. ExitWithError(ExitIO, err)
  284. }
  285. if err := fileutil.CreateDirAll(snapdir); err != nil {
  286. ExitWithError(ExitIO, err)
  287. }
  288. dbpath := filepath.Join(snapdir, "db")
  289. db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600)
  290. if dberr != nil {
  291. ExitWithError(ExitIO, dberr)
  292. }
  293. if _, err := io.Copy(db, f); err != nil {
  294. ExitWithError(ExitIO, err)
  295. }
  296. // truncate away integrity hash, if any.
  297. off, serr := db.Seek(0, io.SeekEnd)
  298. if serr != nil {
  299. ExitWithError(ExitIO, serr)
  300. }
  301. hasHash := (off % 512) == sha256.Size
  302. if hasHash {
  303. if err := db.Truncate(off - sha256.Size); err != nil {
  304. ExitWithError(ExitIO, err)
  305. }
  306. }
  307. if !hasHash && !skipHashCheck {
  308. err := fmt.Errorf("snapshot missing hash but --skip-hash-check=false")
  309. ExitWithError(ExitBadArgs, err)
  310. }
  311. if hasHash && !skipHashCheck {
  312. // check for match
  313. if _, err := db.Seek(0, io.SeekStart); err != nil {
  314. ExitWithError(ExitIO, err)
  315. }
  316. h := sha256.New()
  317. if _, err := io.Copy(h, db); err != nil {
  318. ExitWithError(ExitIO, err)
  319. }
  320. dbsha := h.Sum(nil)
  321. if !reflect.DeepEqual(sha, dbsha) {
  322. err := fmt.Errorf("expected sha256 %v, got %v", sha, dbsha)
  323. ExitWithError(ExitInvalidInput, err)
  324. }
  325. }
  326. // db hash is OK, can now modify DB so it can be part of a new cluster
  327. db.Close()
  328. // update consistentIndex so applies go through on etcdserver despite
  329. // having a new raft instance
  330. be := backend.NewDefaultBackend(dbpath)
  331. // a lessor never timeouts leases
  332. lessor := lease.NewLessor(be, math.MaxInt64)
  333. s := mvcc.NewStore(be, lessor, (*initIndex)(&commit))
  334. txn := s.Write()
  335. btx := be.BatchTx()
  336. del := func(k, v []byte) error {
  337. txn.DeleteRange(k, nil)
  338. return nil
  339. }
  340. // delete stored members from old cluster since using new members
  341. btx.UnsafeForEach([]byte("members"), del)
  342. // todo: add back new members when we start to deprecate old snap file.
  343. btx.UnsafeForEach([]byte("members_removed"), del)
  344. // trigger write-out of new consistent index
  345. txn.End()
  346. s.Commit()
  347. s.Close()
  348. be.Close()
  349. }
  350. type dbstatus struct {
  351. Hash uint32 `json:"hash"`
  352. Revision int64 `json:"revision"`
  353. TotalKey int `json:"totalKey"`
  354. TotalSize int64 `json:"totalSize"`
  355. }
  356. func dbStatus(p string) dbstatus {
  357. if _, err := os.Stat(p); err != nil {
  358. ExitWithError(ExitError, err)
  359. }
  360. ds := dbstatus{}
  361. db, err := bolt.Open(p, 0400, &bolt.Options{ReadOnly: true})
  362. if err != nil {
  363. ExitWithError(ExitError, err)
  364. }
  365. defer db.Close()
  366. h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
  367. err = db.View(func(tx *bolt.Tx) error {
  368. // check snapshot file integrity first
  369. var dbErrStrings []string
  370. for dbErr := range tx.Check() {
  371. dbErrStrings = append(dbErrStrings, dbErr.Error())
  372. }
  373. if len(dbErrStrings) > 0 {
  374. return fmt.Errorf("snapshot file integrity check failed. %d errors found.\n"+strings.Join(dbErrStrings, "\n"), len(dbErrStrings))
  375. }
  376. ds.TotalSize = tx.Size()
  377. c := tx.Cursor()
  378. for next, _ := c.First(); next != nil; next, _ = c.Next() {
  379. b := tx.Bucket(next)
  380. if b == nil {
  381. return fmt.Errorf("cannot get hash of bucket %s", string(next))
  382. }
  383. h.Write(next)
  384. iskeyb := (string(next) == "key")
  385. b.ForEach(func(k, v []byte) error {
  386. h.Write(k)
  387. h.Write(v)
  388. if iskeyb {
  389. rev := bytesToRev(k)
  390. ds.Revision = rev.main
  391. }
  392. ds.TotalKey++
  393. return nil
  394. })
  395. }
  396. return nil
  397. })
  398. if err != nil {
  399. ExitWithError(ExitError, err)
  400. }
  401. ds.Hash = h.Sum32()
  402. return ds
  403. }
  404. type revision struct {
  405. main int64
  406. sub int64
  407. }
  408. func bytesToRev(bytes []byte) revision {
  409. return revision{
  410. main: int64(binary.BigEndian.Uint64(bytes[0:8])),
  411. sub: int64(binary.BigEndian.Uint64(bytes[9:])),
  412. }
  413. }