v3_snapshot.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. // Copyright 2018 The etcd Authors
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package snapshot
  15. import (
  16. "context"
  17. "crypto/sha256"
  18. "encoding/json"
  19. "fmt"
  20. "hash/crc32"
  21. "io"
  22. "math"
  23. "os"
  24. "path/filepath"
  25. "reflect"
  26. "github.com/coreos/etcd/clientv3"
  27. "github.com/coreos/etcd/etcdserver"
  28. "github.com/coreos/etcd/etcdserver/etcdserverpb"
  29. "github.com/coreos/etcd/etcdserver/membership"
  30. "github.com/coreos/etcd/lease"
  31. "github.com/coreos/etcd/mvcc"
  32. "github.com/coreos/etcd/mvcc/backend"
  33. "github.com/coreos/etcd/pkg/fileutil"
  34. "github.com/coreos/etcd/pkg/logger"
  35. "github.com/coreos/etcd/pkg/types"
  36. "github.com/coreos/etcd/raft"
  37. "github.com/coreos/etcd/raft/raftpb"
  38. "github.com/coreos/etcd/snap"
  39. "github.com/coreos/etcd/store"
  40. "github.com/coreos/etcd/wal"
  41. "github.com/coreos/etcd/wal/walpb"
  42. bolt "github.com/coreos/bbolt"
  43. )
  44. // Manager defines snapshot methods.
  45. type Manager interface {
  46. // Save fetches snapshot from specified client's endpoints and saves to target path.
  47. // If the context "ctx" is canceled or timed out, snapshot save stream will error out
  48. // (e.g. context.Canceled, context.DeadlineExceeded).
  49. Save(ctx context.Context, dbPath string) error
  50. // Status returns the snapshot file information.
  51. Status(dbPath string) (Status, error)
  52. // Restore restores a new etcd data directory from given snapshot file.
  53. Restore(dbPath string, cfg RestoreConfig) error
  54. }
  55. // Status is the snapshot file status.
  56. type Status struct {
  57. Hash uint32 `json:"hash"`
  58. Revision int64 `json:"revision"`
  59. TotalKey int `json:"totalKey"`
  60. TotalSize int64 `json:"totalSize"`
  61. }
  62. // RestoreConfig configures snapshot restore operation.
  63. type RestoreConfig struct {
  64. // Name is the human-readable name of this member.
  65. Name string
  66. // OutputDataDir is the target data directory to save restored data.
  67. // Defaults to "[Name].etcd" if not given.
  68. OutputDataDir string
  69. // OutputWALDir is the target WAL data directory.
  70. // Defaults to "[OutputDataDir]/member/wal" if not given.
  71. OutputWALDir string
  72. // InitialCluster is the initial cluster configuration for restore bootstrap.
  73. InitialCluster types.URLsMap
  74. // InitialClusterToken is the initial cluster token for etcd cluster during restore bootstrap.
  75. InitialClusterToken string
  76. // PeerURLs is a list of member's peer URLs to advertise to the rest of the cluster.
  77. PeerURLs types.URLs
  78. // SkipHashCheck is "true" to ignore snapshot integrity hash value
  79. // (required if copied from data directory).
  80. SkipHashCheck bool
  81. }
  82. // NewV3 returns a new snapshot Manager for v3.x snapshot.
  83. // "*clientv3.Client" is only used for "Save" method.
  84. // Otherwise, pass "nil".
  85. func NewV3(cli *clientv3.Client, lg logger.Logger) Manager {
  86. if lg == nil {
  87. lg = logger.NewDiscardLogger()
  88. }
  89. return &v3Manager{cli: cli, logger: lg}
  90. }
  91. type v3Manager struct {
  92. cli *clientv3.Client
  93. name string
  94. dbPath string
  95. walDir string
  96. snapDir string
  97. cl *membership.RaftCluster
  98. skipHashCheck bool
  99. logger logger.Logger
  100. }
  101. func (s *v3Manager) Save(ctx context.Context, dbPath string) error {
  102. partpath := dbPath + ".part"
  103. f, err := os.Create(partpath)
  104. if err != nil {
  105. os.RemoveAll(partpath)
  106. return fmt.Errorf("could not open %s (%v)", partpath, err)
  107. }
  108. s.logger.Infof("created temporary db file %q", partpath)
  109. var rd io.ReadCloser
  110. rd, err = s.cli.Snapshot(ctx)
  111. if err != nil {
  112. os.RemoveAll(partpath)
  113. return err
  114. }
  115. s.logger.Infof("copying from snapshot stream")
  116. if _, err = io.Copy(f, rd); err != nil {
  117. os.RemoveAll(partpath)
  118. return err
  119. }
  120. if err = fileutil.Fsync(f); err != nil {
  121. os.RemoveAll(partpath)
  122. return err
  123. }
  124. if err = f.Close(); err != nil {
  125. os.RemoveAll(partpath)
  126. return err
  127. }
  128. s.logger.Infof("renaming from %q to %q", partpath, dbPath)
  129. if err = os.Rename(partpath, dbPath); err != nil {
  130. os.RemoveAll(partpath)
  131. return fmt.Errorf("could not rename %s to %s (%v)", partpath, dbPath, err)
  132. }
  133. return nil
  134. }
  135. func (s *v3Manager) Status(dbPath string) (ds Status, err error) {
  136. if _, err = os.Stat(dbPath); err != nil {
  137. return ds, err
  138. }
  139. db, err := bolt.Open(dbPath, 0400, &bolt.Options{ReadOnly: true})
  140. if err != nil {
  141. return ds, err
  142. }
  143. defer db.Close()
  144. h := crc32.New(crc32.MakeTable(crc32.Castagnoli))
  145. if err = db.View(func(tx *bolt.Tx) error {
  146. ds.TotalSize = tx.Size()
  147. c := tx.Cursor()
  148. for next, _ := c.First(); next != nil; next, _ = c.Next() {
  149. b := tx.Bucket(next)
  150. if b == nil {
  151. return fmt.Errorf("cannot get hash of bucket %s", string(next))
  152. }
  153. h.Write(next)
  154. iskeyb := (string(next) == "key")
  155. b.ForEach(func(k, v []byte) error {
  156. h.Write(k)
  157. h.Write(v)
  158. if iskeyb {
  159. rev := bytesToRev(k)
  160. ds.Revision = rev.main
  161. }
  162. ds.TotalKey++
  163. return nil
  164. })
  165. }
  166. return nil
  167. }); err != nil {
  168. return ds, err
  169. }
  170. ds.Hash = h.Sum32()
  171. return ds, nil
  172. }
  173. func (s *v3Manager) Restore(dbPath string, cfg RestoreConfig) error {
  174. srv := etcdserver.ServerConfig{
  175. Name: cfg.Name,
  176. InitialClusterToken: cfg.InitialClusterToken,
  177. InitialPeerURLsMap: cfg.InitialCluster,
  178. PeerURLs: cfg.PeerURLs,
  179. }
  180. if err := srv.VerifyBootstrap(); err != nil {
  181. return err
  182. }
  183. var err error
  184. s.cl, err = membership.NewClusterFromURLsMap(cfg.InitialClusterToken, cfg.InitialCluster)
  185. if err != nil {
  186. return err
  187. }
  188. dataDir := cfg.OutputDataDir
  189. if dataDir == "" {
  190. dataDir = cfg.Name + ".etcd"
  191. }
  192. if _, err = os.Stat(dataDir); err == nil {
  193. return fmt.Errorf("data-dir %q exists", dataDir)
  194. }
  195. walDir := cfg.OutputWALDir
  196. if walDir == "" {
  197. walDir = filepath.Join(dataDir, "member", "wal")
  198. } else if _, err = os.Stat(walDir); err == nil {
  199. return fmt.Errorf("wal-dir %q exists", walDir)
  200. }
  201. s.logger.Infof("restoring snapshot file %q to data-dir %q, wal-dir %q", dbPath, dataDir, walDir)
  202. s.name = cfg.Name
  203. s.dbPath = dbPath
  204. s.walDir = walDir
  205. s.snapDir = filepath.Join(dataDir, "member", "snap")
  206. s.skipHashCheck = cfg.SkipHashCheck
  207. s.logger.Infof("writing snapshot directory %q", s.snapDir)
  208. if err = s.saveDB(); err != nil {
  209. return err
  210. }
  211. s.logger.Infof("writing WAL directory %q and raft snapshot to %q", s.walDir, s.snapDir)
  212. err = s.saveWALAndSnap()
  213. if err == nil {
  214. s.logger.Infof("finished restore %q to data directory %q, wal directory %q", dbPath, dataDir, walDir)
  215. }
  216. return err
  217. }
  218. // saveDB copies the database snapshot to the snapshot directory
  219. func (s *v3Manager) saveDB() error {
  220. f, ferr := os.OpenFile(s.dbPath, os.O_RDONLY, 0600)
  221. if ferr != nil {
  222. return ferr
  223. }
  224. defer f.Close()
  225. // get snapshot integrity hash
  226. if _, err := f.Seek(-sha256.Size, io.SeekEnd); err != nil {
  227. return err
  228. }
  229. sha := make([]byte, sha256.Size)
  230. if _, err := f.Read(sha); err != nil {
  231. return err
  232. }
  233. if _, err := f.Seek(0, io.SeekStart); err != nil {
  234. return err
  235. }
  236. if err := fileutil.CreateDirAll(s.snapDir); err != nil {
  237. return err
  238. }
  239. dbpath := filepath.Join(s.snapDir, "db")
  240. db, dberr := os.OpenFile(dbpath, os.O_RDWR|os.O_CREATE, 0600)
  241. if dberr != nil {
  242. return dberr
  243. }
  244. if _, err := io.Copy(db, f); err != nil {
  245. return err
  246. }
  247. // truncate away integrity hash, if any.
  248. off, serr := db.Seek(0, io.SeekEnd)
  249. if serr != nil {
  250. return serr
  251. }
  252. hasHash := (off % 512) == sha256.Size
  253. if hasHash {
  254. if err := db.Truncate(off - sha256.Size); err != nil {
  255. return err
  256. }
  257. }
  258. if !hasHash && !s.skipHashCheck {
  259. return fmt.Errorf("snapshot missing hash but --skip-hash-check=false")
  260. }
  261. if hasHash && !s.skipHashCheck {
  262. // check for match
  263. if _, err := db.Seek(0, io.SeekStart); err != nil {
  264. return err
  265. }
  266. h := sha256.New()
  267. if _, err := io.Copy(h, db); err != nil {
  268. return err
  269. }
  270. dbsha := h.Sum(nil)
  271. if !reflect.DeepEqual(sha, dbsha) {
  272. return fmt.Errorf("expected sha256 %v, got %v", sha, dbsha)
  273. }
  274. }
  275. // db hash is OK, can now modify DB so it can be part of a new cluster
  276. db.Close()
  277. commit := len(s.cl.Members())
  278. // update consistentIndex so applies go through on etcdserver despite
  279. // having a new raft instance
  280. be := backend.NewDefaultBackend(dbpath)
  281. // a lessor never timeouts leases
  282. lessor := lease.NewLessor(be, math.MaxInt64)
  283. mvs := mvcc.NewStore(be, lessor, (*initIndex)(&commit))
  284. txn := mvs.Write()
  285. btx := be.BatchTx()
  286. del := func(k, v []byte) error {
  287. txn.DeleteRange(k, nil)
  288. return nil
  289. }
  290. // delete stored members from old cluster since using new members
  291. btx.UnsafeForEach([]byte("members"), del)
  292. // todo: add back new members when we start to deprecate old snap file.
  293. btx.UnsafeForEach([]byte("members_removed"), del)
  294. // trigger write-out of new consistent index
  295. txn.End()
  296. mvs.Commit()
  297. mvs.Close()
  298. be.Close()
  299. return nil
  300. }
  301. // saveWALAndSnap creates a WAL for the initial cluster
  302. func (s *v3Manager) saveWALAndSnap() error {
  303. if err := fileutil.CreateDirAll(s.walDir); err != nil {
  304. return err
  305. }
  306. // add members again to persist them to the store we create.
  307. st := store.New(etcdserver.StoreClusterPrefix, etcdserver.StoreKeysPrefix)
  308. s.cl.SetStore(st)
  309. for _, m := range s.cl.Members() {
  310. s.cl.AddMember(m)
  311. }
  312. m := s.cl.MemberByName(s.name)
  313. md := &etcdserverpb.Metadata{NodeID: uint64(m.ID), ClusterID: uint64(s.cl.ID())}
  314. metadata, merr := md.Marshal()
  315. if merr != nil {
  316. return merr
  317. }
  318. w, walerr := wal.Create(s.walDir, metadata)
  319. if walerr != nil {
  320. return walerr
  321. }
  322. defer w.Close()
  323. peers := make([]raft.Peer, len(s.cl.MemberIDs()))
  324. for i, id := range s.cl.MemberIDs() {
  325. ctx, err := json.Marshal((*s.cl).Member(id))
  326. if err != nil {
  327. return err
  328. }
  329. peers[i] = raft.Peer{ID: uint64(id), Context: ctx}
  330. }
  331. ents := make([]raftpb.Entry, len(peers))
  332. nodeIDs := make([]uint64, len(peers))
  333. for i, p := range peers {
  334. nodeIDs[i] = p.ID
  335. cc := raftpb.ConfChange{
  336. Type: raftpb.ConfChangeAddNode,
  337. NodeID: p.ID,
  338. Context: p.Context,
  339. }
  340. d, err := cc.Marshal()
  341. if err != nil {
  342. return err
  343. }
  344. ents[i] = raftpb.Entry{
  345. Type: raftpb.EntryConfChange,
  346. Term: 1,
  347. Index: uint64(i + 1),
  348. Data: d,
  349. }
  350. }
  351. commit, term := uint64(len(ents)), uint64(1)
  352. if err := w.Save(raftpb.HardState{
  353. Term: term,
  354. Vote: peers[0].ID,
  355. Commit: commit,
  356. }, ents); err != nil {
  357. return err
  358. }
  359. b, berr := st.Save()
  360. if berr != nil {
  361. return berr
  362. }
  363. raftSnap := raftpb.Snapshot{
  364. Data: b,
  365. Metadata: raftpb.SnapshotMetadata{
  366. Index: commit,
  367. Term: term,
  368. ConfState: raftpb.ConfState{
  369. Nodes: nodeIDs,
  370. },
  371. },
  372. }
  373. sn := snap.New(s.snapDir)
  374. if err := sn.SaveSnap(raftSnap); err != nil {
  375. return err
  376. }
  377. err := w.SaveSnapshot(walpb.Snapshot{Index: commit, Term: term})
  378. if err == nil {
  379. s.logger.Infof("wrote WAL snapshot to %q", s.walDir)
  380. }
  381. return err
  382. }