snapshot_command.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. // Copyright 2016 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package command
  15. import (
  16. "crypto/sha256"
  17. "encoding/binary"
  18. "encoding/json"
  19. "fmt"
  20. "hash/crc32"
  21. "io"
  22. "os"
  23. "path"
  24. "reflect"
  25. "strings"
  26. "github.com/boltdb/bolt"
  27. "github.com/coreos/etcd/etcdserver"
  28. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  29. "github.com/coreos/etcd/etcdserver/membership"
  30. "github.com/coreos/etcd/mvcc"
  31. "github.com/coreos/etcd/mvcc/backend"
  32. "github.com/coreos/etcd/pkg/fileutil"
  33. "github.com/coreos/etcd/pkg/types"
  34. "github.com/coreos/etcd/raft"
  35. "github.com/coreos/etcd/raft/raftpb"
  36. "github.com/coreos/etcd/wal"
  37. "github.com/spf13/cobra"
  38. "golang.org/x/net/context"
  39. )
  40. const (
  41. defaultName = "default"
  42. defaultInitialAdvertisePeerURLs = "http://localhost:2380"
  43. )
  44. var (
  45. restoreCluster string
  46. restoreClusterToken string
  47. restoreDataDir string
  48. restorePeerURLs string
  49. restoreName string
  50. skipHashCheck bool
  51. )
  52. // NewSnapshotCommand returns the cobra command for "snapshot".
  53. func NewSnapshotCommand() *cobra.Command {
  54. cmd := &cobra.Command{
  55. Use: "snapshot",
  56. Short: "snapshot manages etcd node snapshots.",
  57. }
  58. cmd.AddCommand(NewSnapshotSaveCommand())
  59. cmd.AddCommand(NewSnapshotRestoreCommand())
  60. cmd.AddCommand(newSnapshotStatusCommand())
  61. return cmd
  62. }
  63. func NewSnapshotSaveCommand() *cobra.Command {
  64. return &cobra.Command{
  65. Use: "save <filename>",
  66. Short: "save stores an etcd node backend snapshot to a given file.",
  67. Run: snapshotSaveCommandFunc,
  68. }
  69. }
  70. func newSnapshotStatusCommand() *cobra.Command {
  71. return &cobra.Command{
  72. Use: "status <filename>",
  73. Short: "status gets backend snapshot status of a given file.",
  74. Long: `When --write-out is set to simple, this command prints out comma-separated status lists for each endpoint.
  75. The items in the lists are hash, revision, total keys, total size.
  76. `,
  77. Run: snapshotStatusCommandFunc,
  78. }
  79. }
  80. func NewSnapshotRestoreCommand() *cobra.Command {
  81. cmd := &cobra.Command{
  82. Use: "restore <filename>",
  83. Short: "restore an etcd member snapshot to an etcd directory",
  84. Run: snapshotRestoreCommandFunc,
  85. }
  86. cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory.")
  87. cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap.")
  88. cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap.")
  89. cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster.")
  90. cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member.")
  91. cmd.Flags().BoolVar(&skipHashCheck, "skip-hash-check", false, "Ignore snapshot integrity hash value (required if copied from data directory).")
  92. return cmd
  93. }
  94. func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
  95. if len(args) != 1 {
  96. err := fmt.Errorf("snapshot save expects one argument")
  97. ExitWithError(ExitBadArgs, err)
  98. }
  99. path := args[0]
  100. partpath := path + ".part"
  101. f, err := os.Create(partpath)
  102. defer f.Close()
  103. if err != nil {
  104. exiterr := fmt.Errorf("could not open %s (%v)", partpath, err)
  105. ExitWithError(ExitBadArgs, exiterr)
  106. }
  107. c := mustClientFromCmd(cmd)
  108. r, serr := c.Snapshot(context.TODO())
  109. if serr != nil {
  110. os.RemoveAll(partpath)
  111. ExitWithError(ExitInterrupted, serr)
  112. }
  113. if _, rerr := io.Copy(f, r); rerr != nil {
  114. os.RemoveAll(partpath)
  115. ExitWithError(ExitInterrupted, rerr)
  116. }
  117. fileutil.Fsync(f)
  118. if rerr := os.Rename(partpath, path); rerr != nil {
  119. exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr)
  120. ExitWithError(ExitIO, exiterr)
  121. }
  122. fmt.Printf("Snapshot saved at %s\n", path)
  123. }
  124. func snapshotStatusCommandFunc(cmd *cobra.Command, args []string) {
  125. if len(args) != 1 {
  126. err := fmt.Errorf("snapshot status requires exactly one argument")
  127. ExitWithError(ExitBadArgs, err)
  128. }
  129. initDisplayFromCmd(cmd)
  130. ds := dbStatus(args[0])
  131. display.DBStatus(ds)
  132. }
  133. func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
  134. if len(args) != 1 {
  135. err := fmt.Errorf("snapshot restore requires exactly one argument")
  136. ExitWithError(ExitBadArgs, err)
  137. }
  138. urlmap, uerr := types.NewURLsMap(restoreCluster)
  139. if uerr != nil {
  140. ExitWithError(ExitBadArgs, uerr)
  141. }
  142. cfg := etcdserver.ServerConfig{
  143. InitialClusterToken: restoreClusterToken,
  144. InitialPeerURLsMap: urlmap,
  145. PeerURLs: types.MustNewURLs(strings.Split(restorePeerURLs, ",")),
  146. Name: restoreName,
  147. }
  148. if err := cfg.VerifyBootstrap(); err != nil {
  149. ExitWithError(ExitBadArgs, err)
  150. }
  151. cl, cerr := membership.NewClusterFromURLsMap(restoreClusterToken, urlmap)
  152. if cerr != nil {
  153. ExitWithError(ExitBadArgs, cerr)
  154. }
  155. basedir := restoreDataDir
  156. if basedir == "" {
  157. basedir = restoreName + ".etcd"
  158. }
  159. waldir := path.Join(basedir, "member", "wal")
  160. snapdir := path.Join(basedir, "member", "snap")
  161. if _, err := os.Stat(basedir); err == nil {
  162. ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
  163. }
  164. makeDB(snapdir, args[0])
  165. makeWAL(waldir, cl)
  166. }
  167. func initialClusterFromName(name string) string {
  168. n := name
  169. if name == "" {
  170. n = defaultName
  171. }
  172. return fmt.Sprintf("%s=http://localhost:2380", n)
  173. }
  174. // makeWAL creates a WAL for the initial cluster
  175. func makeWAL(waldir string, cl *membership.RaftCluster) {
  176. if err := os.MkdirAll(waldir, 0755); err != nil {
  177. ExitWithError(ExitIO, err)
  178. }
  179. m := cl.MemberByName(restoreName)
  180. md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
  181. metadata, merr := md.Marshal()
  182. if merr != nil {
  183. ExitWithError(ExitInvalidInput, merr)
  184. }
  185. w, walerr := wal.Create(waldir, metadata)
  186. if walerr != nil {
  187. ExitWithError(ExitIO, walerr)
  188. }
  189. defer w.Close()
  190. peers := make([]raft.Peer, len(cl.MemberIDs()))
  191. for i, id := range cl.MemberIDs() {
  192. ctx, err := json.Marshal((*cl).Member(id))
  193. if err != nil {
  194. ExitWithError(ExitInvalidInput, err)
  195. }
  196. peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
  197. }
  198. ents := make([]raftpb.Entry, len(peers))
  199. for i, p := range peers {
  200. cc := raftpb.ConfChange{
  201. Type: raftpb.ConfChangeAddNode,
  202. NodeID: p.ID,
  203. Context: p.Context}
  204. d, err := cc.Marshal()
  205. if err != nil {
  206. ExitWithError(ExitInvalidInput, err)
  207. }
  208. e := raftpb.Entry{
  209. Type: raftpb.EntryConfChange,
  210. Term: 1,
  211. Index: uint64(i + 1),
  212. Data: d,
  213. }
  214. ents[i] = e
  215. }
  216. w.Save(raftpb.HardState{
  217. Term: 1,
  218. Vote: peers[0].ID,
  219. Commit: uint64(len(ents))}, ents)
  220. }
  221. // initIndex implements ConsistentIndexGetter so the snapshot won't block
  222. // the new raft instance by waiting for a future raft index.
  223. type initIndex struct{}
  224. func (*initIndex) ConsistentIndex() uint64 { return 1 }
  225. // makeDB copies the database snapshot to the snapshot directory
  226. func makeDB(snapdir, dbfile string) {
  227. f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
  228. if ferr != nil {
  229. ExitWithError(ExitInvalidInput, ferr)
  230. }
  231. defer f.Close()
  232. // get snapshot integrity hash
  233. if _, err := f.Seek(-sha256.Size, os.SEEK_END); err != nil {
  234. ExitWithError(ExitIO, err)
  235. }
  236. sha := make([]byte, sha256.Size)
  237. if _, err := f.Read(sha); err != nil {
  238. ExitWithError(ExitIO, err)
  239. }
  240. if _, err := f.Seek(0, os.SEEK_SET); err != nil {
  241. ExitWithError(ExitIO, err)
  242. }
  243. if err := os.MkdirAll(snapdir, 0755); err != nil {
  244. ExitWithError(ExitIO, err)
  245. }
  246. dbpath := path.Join(snapdir, "db")
  247. db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600)
  248. if dberr != nil {
  249. ExitWithError(ExitIO, dberr)
  250. }
  251. if _, err := io.Copy(db, f); err != nil {
  252. ExitWithError(ExitIO, err)
  253. }
  254. // truncate away integrity hash, if any.
  255. off, serr := db.Seek(0, os.SEEK_END)
  256. if serr != nil {
  257. ExitWithError(ExitIO, serr)
  258. }
  259. hasHash := (off % 512) == sha256.Size
  260. if hasHash {
  261. if err := db.Truncate(off - sha256.Size); err != nil {
  262. ExitWithError(ExitIO, err)
  263. }
  264. }
  265. if !hasHash && !skipHashCheck {
  266. err := fmt.Errorf("snapshot missing hash but --skip-hash-check=false")
  267. ExitWithError(ExitBadArgs, err)
  268. }
  269. if hasHash && !skipHashCheck {
  270. // check for match
  271. if _, err := db.Seek(0, os.SEEK_SET); err != nil {
  272. ExitWithError(ExitIO, err)
  273. }
  274. h := sha256.New()
  275. if _, err := io.Copy(h, db); err != nil {
  276. ExitWithError(ExitIO, err)
  277. }
  278. dbsha := h.Sum(nil)
  279. if !reflect.DeepEqual(sha, dbsha) {
  280. err := fmt.Errorf("expected sha256 %v, got %v", sha, dbsha)
  281. ExitWithError(ExitInvalidInput, err)
  282. }
  283. }
  284. // db hash is OK, can now modify DB so it can be part of a new cluster
  285. db.Close()
  286. // update consistentIndex so applies go through on etcdserver despite
  287. // having a new raft instance
  288. be := backend.NewDefaultBackend(dbpath)
  289. s := mvcc.NewStore(be, nil, &initIndex{})
  290. id := s.TxnBegin()
  291. btx := be.BatchTx()
  292. del := func(k, v []byte) error {
  293. _, _, err := s.TxnDeleteRange(id, k, nil)
  294. return err
  295. }
  296. // delete stored members from old cluster since using new members
  297. btx.UnsafeForEach([]byte("members"), del)
  298. btx.UnsafeForEach([]byte("members_removed"), del)
  299. // trigger write-out of new consistent index
  300. s.TxnEnd(id)
  301. s.Commit()
  302. s.Close()
  303. }
  304. type dbstatus struct {
  305. Hash uint32 `json:"hash"`
  306. Revision int64 `json:"revision"`
  307. TotalKey int `json:"totalKey"`
  308. TotalSize int64 `json:"totalSize"`
  309. }
  310. func dbStatus(p string) dbstatus {
  311. if _, err := os.Stat(p); err != nil {
  312. ExitWithError(ExitError, err)
  313. }
  314. ds := dbstatus{}
  315. db, err := bolt.Open(p, 0400, nil)
  316. if err != nil {
  317. ExitWithError(ExitError, err)
  318. }
  319. defer db.Close()
  320. h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
  321. err = db.View(func(tx *bolt.Tx) error {
  322. ds.TotalSize = tx.Size()
  323. c := tx.Cursor()
  324. for next, _ := c.First(); next != nil; next, _ = c.Next() {
  325. b := tx.Bucket(next)
  326. if b == nil {
  327. return fmt.Errorf("cannot get hash of bucket %s", string(next))
  328. }
  329. h.Write(next)
  330. iskeyb := (string(next) == "key")
  331. b.ForEach(func(k, v []byte) error {
  332. h.Write(k)
  333. h.Write(v)
  334. if iskeyb {
  335. rev := bytesToRev(k)
  336. ds.Revision = rev.main
  337. }
  338. ds.TotalKey++
  339. return nil
  340. })
  341. }
  342. return nil
  343. })
  344. if err != nil {
  345. ExitWithError(ExitError, err)
  346. }
  347. ds.Hash = h.Sum32()
  348. return ds
  349. }
  350. type revision struct {
  351. main int64
  352. sub int64
  353. }
  354. func bytesToRev(bytes []byte) revision {
  355. return revision{
  356. main: int64(binary.BigEndian.Uint64(bytes[0:8])),
  357. sub: int64(binary.BigEndian.Uint64(bytes[9:])),
  358. }
  359. }