snapshot_command.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. // Copyright 2016 CoreOS, Inc.
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package command
  15. import (
  16. "encoding/json"
  17. "fmt"
  18. "io"
  19. "os"
  20. "path"
  21. "strings"
  22. "github.com/coreos/etcd/etcdserver"
  23. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  24. "github.com/coreos/etcd/etcdserver/membership"
  25. "github.com/coreos/etcd/pkg/types"
  26. "github.com/coreos/etcd/raft"
  27. "github.com/coreos/etcd/raft/raftpb"
  28. "github.com/coreos/etcd/storage"
  29. "github.com/coreos/etcd/storage/backend"
  30. "github.com/coreos/etcd/wal"
  31. "github.com/spf13/cobra"
  32. "golang.org/x/net/context"
  33. )
  34. const (
  35. defaultName = "default"
  36. defaultInitialAdvertisePeerURLs = "http://localhost:2380,http://localhost:7001"
  37. )
  38. var (
  39. restoreCluster string
  40. restoreClusterToken string
  41. restoreDataDir string
  42. restorePeerURLs string
  43. restoreName string
  44. )
  45. // NewSnapshotCommand returns the cobra command for "snapshot".
  46. func NewSnapshotCommand() *cobra.Command {
  47. cmd := &cobra.Command{
  48. Use: "snapshot",
  49. Short: "snapshot manages etcd node snapshots.",
  50. }
  51. cmd.AddCommand(NewSnapshotSaveCommand())
  52. cmd.AddCommand(NewSnapshotRestoreCommand())
  53. return cmd
  54. }
  55. func NewSnapshotSaveCommand() *cobra.Command {
  56. return &cobra.Command{
  57. Use: "save <filename>",
  58. Short: "save stores an etcd node backend snapshot to a given file.",
  59. Run: snapshotSaveCommandFunc,
  60. }
  61. }
  62. func NewSnapshotRestoreCommand() *cobra.Command {
  63. cmd := &cobra.Command{
  64. Use: "restore <filename>",
  65. Short: "restore an etcd node snapshot to an etcd directory",
  66. Run: snapshotRestoreCommandFunc,
  67. }
  68. cmd.Flags().StringVar(&restoreDataDir, "data-dir", "", "Path to the data directory.")
  69. cmd.Flags().StringVar(&restoreCluster, "initial-cluster", initialClusterFromName(defaultName), "Initial cluster configuration for restore bootstrap.")
  70. cmd.Flags().StringVar(&restoreClusterToken, "initial-cluster-token", "etcd-cluster", "Initial cluster token for the etcd cluster during restore bootstrap.")
  71. cmd.Flags().StringVar(&restorePeerURLs, "initial-advertise-peer-urls", defaultInitialAdvertisePeerURLs, "List of this member's peer URLs to advertise to the rest of the cluster.")
  72. cmd.Flags().StringVar(&restoreName, "name", defaultName, "Human-readable name for this member.")
  73. return cmd
  74. }
  75. func snapshotSaveCommandFunc(cmd *cobra.Command, args []string) {
  76. if len(args) != 1 {
  77. err := fmt.Errorf("snapshot save expects one argument")
  78. ExitWithError(ExitBadArgs, err)
  79. }
  80. path := args[0]
  81. partpath := path + ".part"
  82. f, err := os.Create(partpath)
  83. defer f.Close()
  84. if err != nil {
  85. exiterr := fmt.Errorf("could not open %s (%v)", partpath, err)
  86. ExitWithError(ExitBadArgs, exiterr)
  87. }
  88. c := mustClientFromCmd(cmd)
  89. r, serr := c.Snapshot(context.TODO())
  90. if serr != nil {
  91. os.RemoveAll(partpath)
  92. ExitWithError(ExitInterrupted, serr)
  93. }
  94. if _, rerr := io.Copy(f, r); rerr != nil {
  95. os.RemoveAll(partpath)
  96. ExitWithError(ExitInterrupted, rerr)
  97. }
  98. f.Sync()
  99. if rerr := os.Rename(partpath, path); rerr != nil {
  100. exiterr := fmt.Errorf("could not rename %s to %s (%v)", partpath, path, rerr)
  101. ExitWithError(ExitIO, exiterr)
  102. }
  103. }
  104. func snapshotRestoreCommandFunc(cmd *cobra.Command, args []string) {
  105. if len(args) != 1 {
  106. err := fmt.Errorf("snapshot restore exactly one argument")
  107. ExitWithError(ExitBadArgs, err)
  108. }
  109. urlmap, uerr := types.NewURLsMap(restoreCluster)
  110. if uerr != nil {
  111. ExitWithError(ExitBadArgs, uerr)
  112. }
  113. cfg := etcdserver.ServerConfig{
  114. InitialClusterToken: restoreClusterToken,
  115. InitialPeerURLsMap: urlmap,
  116. PeerURLs: types.MustNewURLs(strings.Split(restorePeerURLs, ",")),
  117. Name: restoreName,
  118. }
  119. if err := cfg.VerifyBootstrap(); err != nil {
  120. ExitWithError(ExitBadArgs, err)
  121. }
  122. cl, cerr := membership.NewClusterFromURLsMap(restoreClusterToken, urlmap)
  123. if cerr != nil {
  124. ExitWithError(ExitBadArgs, cerr)
  125. }
  126. basedir := restoreDataDir
  127. if basedir == "" {
  128. basedir = restoreName + ".etcd"
  129. }
  130. waldir := path.Join(basedir, "member", "wal")
  131. snapdir := path.Join(basedir, "member", "snap")
  132. if _, err := os.Stat(basedir); err == nil {
  133. ExitWithError(ExitInvalidInput, fmt.Errorf("data-dir %q exists", basedir))
  134. }
  135. makeDB(snapdir, args[0])
  136. makeWAL(waldir, cl)
  137. }
  138. func initialClusterFromName(name string) string {
  139. n := name
  140. if name == "" {
  141. n = defaultName
  142. }
  143. return fmt.Sprintf("%s=http://localhost:2380,%s=http://localhost:7001", n, n)
  144. }
  145. // makeWAL creates a WAL for the initial cluster
  146. func makeWAL(waldir string, cl *membership.RaftCluster) {
  147. if err := os.MkdirAll(waldir, 0755); err != nil {
  148. ExitWithError(ExitIO, err)
  149. }
  150. m := cl.MemberByName(restoreName)
  151. md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(cl.ID())}
  152. metadata, merr := md.Marshal()
  153. if merr != nil {
  154. ExitWithError(ExitInvalidInput, merr)
  155. }
  156. w, walerr := wal.Create(waldir, metadata)
  157. if walerr != nil {
  158. ExitWithError(ExitIO, walerr)
  159. }
  160. defer w.Close()
  161. peers := make([]raft.Peer, len(cl.MemberIDs()))
  162. for i, id := range cl.MemberIDs() {
  163. ctx, err := json.Marshal((*cl).Member(id))
  164. if err != nil {
  165. ExitWithError(ExitInvalidInput, err)
  166. }
  167. peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
  168. }
  169. ents := make([]raftpb.Entry, len(peers))
  170. for i, p := range peers {
  171. cc := raftpb.ConfChange{
  172. Type: raftpb.ConfChangeAddNode,
  173. NodeID: p.ID,
  174. Context: p.Context}
  175. d, err := cc.Marshal()
  176. if err != nil {
  177. ExitWithError(ExitInvalidInput, err)
  178. }
  179. e := raftpb.Entry{
  180. Type: raftpb.EntryConfChange,
  181. Term: 1,
  182. Index: uint64(i + 1),
  183. Data: d,
  184. }
  185. ents[i] = e
  186. }
  187. w.Save(raftpb.HardState{
  188. Term: 1,
  189. Vote: peers[0].ID,
  190. Commit: uint64(len(ents))}, ents)
  191. }
  192. // initIndex implements ConsistentIndexGetter so the snapshot won't block
  193. // the new raft instance by waiting for a future raft index.
  194. type initIndex struct{}
  195. func (*initIndex) ConsistentIndex() uint64 { return 1 }
  196. // makeDB copies the database snapshot to the snapshot directory
  197. func makeDB(snapdir, dbfile string) {
  198. f, ferr := os.OpenFile(dbfile, os.O_RDONLY, 0600)
  199. if ferr != nil {
  200. ExitWithError(ExitInvalidInput, ferr)
  201. }
  202. defer f.Close()
  203. if err := os.MkdirAll(snapdir, 0755); err != nil {
  204. ExitWithError(ExitIO, err)
  205. }
  206. dbpath := path.Join(snapdir, "db")
  207. db, dberr := os.OpenFile(dbpath, os.O_WRONLY|os.O_CREATE, 0600)
  208. if dberr != nil {
  209. ExitWithError(ExitIO, dberr)
  210. }
  211. if _, err := io.Copy(db, f); err != nil {
  212. ExitWithError(ExitIO, err)
  213. }
  214. db.Close()
  215. // update consistentIndex so applies go through on etcdserver despite
  216. // having a new raft instance
  217. be := backend.NewDefaultBackend(dbpath)
  218. s := storage.NewStore(be, nil, &initIndex{})
  219. id := s.TxnBegin()
  220. btx := be.BatchTx()
  221. del := func(k, v []byte) error {
  222. _, _, err := s.TxnDeleteRange(id, k, nil)
  223. return err
  224. }
  225. // delete stored members from old cluster since using new members
  226. btx.UnsafeForEach([]byte("members"), del)
  227. btx.UnsafeForEach([]byte("members_removed"), del)
  228. // trigger write-out of new consistent index
  229. s.TxnEnd(id)
  230. s.Commit()
  231. s.Close()
  232. }