storage.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. package etcdserver
  2. import (
  3. "log"
  4. pb "github.com/coreos/etcd/etcdserver/etcdserverpb"
  5. "github.com/coreos/etcd/migrate"
  6. "github.com/coreos/etcd/pkg/pbutil"
  7. "github.com/coreos/etcd/pkg/types"
  8. "github.com/coreos/etcd/raft/raftpb"
  9. "github.com/coreos/etcd/snap"
  10. "github.com/coreos/etcd/wal"
  11. )
  12. type Storage interface {
  13. // Save function saves ents and state to the underlying stable storage.
  14. // Save MUST block until st and ents are on stable storage.
  15. Save(st raftpb.HardState, ents []raftpb.Entry) error
  16. // SaveSnap function saves snapshot to the underlying stable storage.
  17. SaveSnap(snap raftpb.Snapshot) error
  18. // TODO: WAL should be able to control cut itself. After implement self-controlled cut,
  19. // remove it in this interface.
  20. // Cut cuts out a new wal file for saving new state and entries.
  21. Cut() error
  22. // Close closes the Storage and performs finalization.
  23. Close() error
  24. }
  25. type storage struct {
  26. *wal.WAL
  27. *snap.Snapshotter
  28. }
  29. func NewStorage(w *wal.WAL, s *snap.Snapshotter) Storage {
  30. return &storage{w, s}
  31. }
  32. // SaveSnap saves the snapshot to disk and release the locked
  33. // wal files since they will not be used.
  34. func (st *storage) SaveSnap(snap raftpb.Snapshot) error {
  35. err := st.Snapshotter.SaveSnap(snap)
  36. if err != nil {
  37. return err
  38. }
  39. err = st.WAL.ReleaseLockTo(snap.Metadata.Index)
  40. if err != nil {
  41. return err
  42. }
  43. return nil
  44. }
  45. func readWAL(waldir string, index uint64) (w *wal.WAL, id, cid types.ID, st raftpb.HardState, ents []raftpb.Entry) {
  46. var err error
  47. if w, err = wal.Open(waldir, index); err != nil {
  48. log.Fatalf("etcdserver: open wal error: %v", err)
  49. }
  50. var wmetadata []byte
  51. if wmetadata, st, ents, err = w.ReadAll(); err != nil {
  52. log.Fatalf("etcdserver: read wal error: %v", err)
  53. }
  54. var metadata pb.Metadata
  55. pbutil.MustUnmarshal(&metadata, wmetadata)
  56. id = types.ID(metadata.NodeID)
  57. cid = types.ID(metadata.ClusterID)
  58. return
  59. }
  60. // upgradeWAL converts an older version of the etcdServer data to the newest version.
  61. // It must ensure that, after upgrading, the most recent version is present.
  62. func upgradeWAL(cfg *ServerConfig, ver wal.WalVersion) error {
  63. if ver == wal.WALv0_4 {
  64. log.Print("etcdserver: converting v0.4 log to v2.0")
  65. err := migrate.Migrate4To2(cfg.DataDir, cfg.Name)
  66. if err != nil {
  67. log.Fatalf("etcdserver: failed migrating data-dir: %v", err)
  68. return err
  69. }
  70. }
  71. return nil
  72. }