snapshot_command.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. // Copyright 2016 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package command
  15. import (
  16. "encoding/binary"
  17. "encoding/json"
  18. "fmt"
  19. "hash/crc32"
  20. "io"
  21. "os"
  22. "path"
  23. "strings"
  24. "github.com/boltdb/bolt"
  25. "github.com/coreos/etcd/etcdserver"
  26. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  27. "github.com/coreos/etcd/etcdserver/membership"
  28. "github.com/coreos/etcd/pkg/fileutil"
  29. "github.com/coreos/etcd/pkg/types"
  30. "github.com/coreos/etcd/raft"
  31. "github.com/coreos/etcd/raft/raftpb"
  32. "github.com/coreos/etcd/storage"
  33. "github.com/coreos/etcd/storage/backend"
  34. "github.com/coreos/etcd/wal"
  35. "github.com/spf13/cobra"
  36. "golang.org/x/net/context"
  37. )
  38. const (
  39. defaultName = "default"
  40. defaultInitialAdvertisePeerURLs = "http://localhost:2380,http://localhost:7001"
  41. )
  42. var (
  43. restoreCluster string
  44. restoreClusterToken string
  45. restoreDataDir string
  46. restorePeerURLs string
  47. restoreName string
  48. )
  49. // NewSnapshotCommand returns the cobra command for "snapshot".
  50. func NewSnapshotCommand() *cobra.Command {
  51. cmd := &cobra.Command{
  52. Use: "snapshot",
  53. Short: "snapshot manages etcd node snapshots.",
  54. }
  55. cmd.AddCommand(NewSnapshotSaveCommand())
  56. cmd.AddCommand(NewSnapshotRestoreCommand())
  57. cmd.AddCommand(newSnapshotStatusCommand())
  58. return cmd
  59. }
  60. func NewSnapshotSaveCommand() *cobra.Command {
  61. return &cobra.Command{
  62. Use: "save <filename>",
  63. Short: "save stores an etcd node backend snapshot to a given file.",
  64. Run: snapshotSaveCommandFunc,
  65. }
  66. }
  67. func newSnapshotStatusCommand() *cobra.Command {
  68. return &cobra.Command{
  69. Use: "status <filename>",
  70. Short: "status gets backend snapshot status of a given file.",
  71. Run: snapshotStatusCommandFunc,
  72. }
  73. }
  74. func NewSnapshotRestoreCommand() *cobra.Command {
  75. cmd := &cobra.Command{
  76. Use: "restore <filename>",
  77. Short: "restore an etcd member snapshot to an etcd directory",
  78. Run: snapshotRestoreCommandFunc,
  79. }
  80. cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory.")
  81. cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap.")
  82. cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap.")
  83. cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster.")
  84. cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member.")
  85. return cmd
  86. }
  87. func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
  88. if len(args) != 1 {
  89. err := fmt.Errorf("snapshot save expects one argument")
  90. ExitWithError(ExitBadArgs, err)
  91. }
  92. path := args[0]
  93. partpath := path + ".part"
  94. f, err := os.Create(partpath)
  95. defer f.Close()
  96. if err != nil {
  97. exiterr := fmt.Errorf("could not open %s (%v)", partpath, err)
  98. ExitWithError(ExitBadArgs, exiterr)
  99. }
  100. c := mustClientFromCmd(cmd)
  101. r, serr := c.Snapshot(context.TODO())
  102. if serr != nil {
  103. os.RemoveAll(partpath)
  104. ExitWithError(ExitInterrupted, serr)
  105. }
  106. if _, rerr := io.Copy(f, r); rerr != nil {
  107. os.RemoveAll(partpath)
  108. ExitWithError(ExitInterrupted, rerr)
  109. }
  110. fileutil.Fsync(f)
  111. if rerr := os.Rename(partpath, path); rerr != nil {
  112. exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr)
  113. ExitWithError(ExitIO, exiterr)
  114. }
  115. fmt.Printf("Snapshot saved at %s\n", path)
  116. }
  117. func snapshotStatusCommandFunc(cmd *cobra.Command, args []string) {
  118. if len(args) != 1 {
  119. err := fmt.Errorf("snapshot status requires exactly one argument")
  120. ExitWithError(ExitBadArgs, err)
  121. }
  122. initDisplayFromCmd(cmd)
  123. ds := dbStatus(args[0])
  124. display.DBStatus(ds)
  125. }
  126. func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
  127. if len(args) != 1 {
  128. err := fmt.Errorf("snapshot restore requires exactly one argument")
  129. ExitWithError(ExitBadArgs, err)
  130. }
  131. urlmap, uerr := types.NewURLsMap(restoreCluster)
  132. if uerr != nil {
  133. ExitWithError(ExitBadArgs, uerr)
  134. }
  135. cfg := etcdserver.ServerConfig{
  136. InitialClusterToken: restoreClusterToken,
  137. InitialPeerURLsMap: urlmap,
  138. PeerURLs: types.MustNewURLs(strings.Split(restorePeerURLs, ",")),
  139. Name: restoreName,
  140. }
  141. if err := cfg.VerifyBootstrap(); err != nil {
  142. ExitWithError(ExitBadArgs, err)
  143. }
  144. cl, cerr := membership.NewClusterFromURLsMap(restoreClusterToken, urlmap)
  145. if cerr != nil {
  146. ExitWithError(ExitBadArgs, cerr)
  147. }
  148. basedir := restoreDataDir
  149. if basedir == "" {
  150. basedir = restoreName + ".etcd"
  151. }
  152. waldir := path.Join(basedir, "member", "wal")
  153. snapdir := path.Join(basedir, "member", "snap")
  154. if _, err := os.Stat(basedir); err == nil {
  155. ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
  156. }
  157. makeDB(snapdir, args[0])
  158. makeWAL(waldir, cl)
  159. }
  160. func initialClusterFromName(name string) string {
  161. n := name
  162. if name == "" {
  163. n = defaultName
  164. }
  165. return fmt.Sprintf("%s=http://localhost:2380,%s=http://localhost:7001", n, n)
  166. }
  167. // makeWAL creates a WAL for the initial cluster
  168. func makeWAL(waldir string, cl *membership.RaftCluster) {
  169. if err := os.MkdirAll(waldir, 0755); err != nil {
  170. ExitWithError(ExitIO, err)
  171. }
  172. m := cl.MemberByName(restoreName)
  173. md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
  174. metadata, merr := md.Marshal()
  175. if merr != nil {
  176. ExitWithError(ExitInvalidInput, merr)
  177. }
  178. w, walerr := wal.Create(waldir, metadata)
  179. if walerr != nil {
  180. ExitWithError(ExitIO, walerr)
  181. }
  182. defer w.Close()
  183. peers := make([]raft.Peer, len(cl.MemberIDs()))
  184. for i, id := range cl.MemberIDs() {
  185. ctx, err := json.Marshal((*cl).Member(id))
  186. if err != nil {
  187. ExitWithError(ExitInvalidInput, err)
  188. }
  189. peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
  190. }
  191. ents := make([]raftpb.Entry, len(peers))
  192. for i, p := range peers {
  193. cc := raftpb.ConfChange{
  194. Type: raftpb.ConfChangeAddNode,
  195. NodeID: p.ID,
  196. Context: p.Context}
  197. d, err := cc.Marshal()
  198. if err != nil {
  199. ExitWithError(ExitInvalidInput, err)
  200. }
  201. e := raftpb.Entry{
  202. Type: raftpb.EntryConfChange,
  203. Term: 1,
  204. Index: uint64(i + 1),
  205. Data: d,
  206. }
  207. ents[i] = e
  208. }
  209. w.Save(raftpb.HardState{
  210. Term: 1,
  211. Vote: peers[0].ID,
  212. Commit: uint64(len(ents))}, ents)
  213. }
  214. // initIndex implements ConsistentIndexGetter so the snapshot won't block
  215. // the new raft instance by waiting for a future raft index.
  216. type initIndex struct{}
  217. func (*initIndex) ConsistentIndex() uint64 { return 1 }
  218. // makeDB copies the database snapshot to the snapshot directory
  219. func makeDB(snapdir, dbfile string) {
  220. f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
  221. if ferr != nil {
  222. ExitWithError(ExitInvalidInput, ferr)
  223. }
  224. defer f.Close()
  225. if err := os.MkdirAll(snapdir, 0755); err != nil {
  226. ExitWithError(ExitIO, err)
  227. }
  228. dbpath := path.Join(snapdir, "db")
  229. db, dberr := os.OpenFile(dbpath, os.O_WRONLY|os.O_CREATE, 0600)
  230. if dberr != nil {
  231. ExitWithError(ExitIO, dberr)
  232. }
  233. if _, err := io.Copy(db, f); err != nil {
  234. ExitWithError(ExitIO, err)
  235. }
  236. db.Close()
  237. // update consistentIndex so applies go through on etcdserver despite
  238. // having a new raft instance
  239. be := backend.NewDefaultBackend(dbpath)
  240. s := storage.NewStore(be, nil, &initIndex{})
  241. id := s.TxnBegin()
  242. btx := be.BatchTx()
  243. del := func(k, v []byte) error {
  244. _, _, err := s.TxnDeleteRange(id, k, nil)
  245. return err
  246. }
  247. // delete stored members from old cluster since using new members
  248. btx.UnsafeForEach([]byte("members"), del)
  249. btx.UnsafeForEach([]byte("members_removed"), del)
  250. // trigger write-out of new consistent index
  251. s.TxnEnd(id)
  252. s.Commit()
  253. s.Close()
  254. }
  255. type dbstatus struct {
  256. Hash uint32 `json:"hash"`
  257. Revision int64 `json:"revision"`
  258. TotalKey int `json:"totalKey"`
  259. TotalSize int64 `json:"totalSize"`
  260. }
  261. func dbStatus(p string) dbstatus {
  262. if _, err := os.Stat(p); err != nil {
  263. ExitWithError(ExitError, err)
  264. }
  265. ds := dbstatus{}
  266. db, err := bolt.Open(p, 0600, nil)
  267. if err != nil {
  268. ExitWithError(ExitError, err)
  269. }
  270. h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
  271. err = db.View(func(tx *bolt.Tx) error {
  272. ds.TotalSize = tx.Size()
  273. c := tx.Cursor()
  274. for next, _ := c.First(); next != nil; next, _ = c.Next() {
  275. b := tx.Bucket(next)
  276. if b == nil {
  277. return fmt.Errorf("cannot get hash of bucket %s", string(next))
  278. }
  279. h.Write(next)
  280. iskeyb := (string(next) == "key")
  281. b.ForEach(func(k, v []byte) error {
  282. h.Write(k)
  283. h.Write(v)
  284. if iskeyb {
  285. rev := bytesToRev(k)
  286. ds.Revision = rev.main
  287. }
  288. ds.TotalKey++
  289. return nil
  290. })
  291. }
  292. return nil
  293. })
  294. if err != nil {
  295. ExitWithError(ExitError, err)
  296. }
  297. ds.Hash = h.Sum32()
  298. return ds
  299. }
  300. type revision struct {
  301. main int64
  302. sub int64
  303. }
  304. func bytesToRev(bytes []byte) revision {
  305. return revision{
  306. main: int64(binary.BigEndian.Uint64(bytes[0:8])),
  307. sub: int64(binary.BigEndian.Uint64(bytes[9:])),
  308. }
  309. }