|
|
@@ -69,7 +69,11 @@ var (
|
|
|
// A just opened WAL is in read mode, and ready for reading records.
|
|
|
// The WAL will be ready for appending after reading out all the previous records.
|
|
|
type WAL struct {
|
|
|
- dir string // the living directory of the underlay files
|
|
|
+ dir string // the living directory of the underlay files
|
|
|
+
|
|
|
+ // dirFile is a fd for the wal directory for syncing on Rename
|
|
|
+ dirFile *os.File
|
|
|
+
|
|
|
metadata []byte // metadata recorded at the head of each WAL
|
|
|
state raftpb.HardState // hardstate recorded at the head of WAL
|
|
|
|
|
|
@@ -108,10 +112,10 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- if _, err := f.Seek(0, os.SEEK_END); err != nil {
|
|
|
+ if _, err = f.Seek(0, os.SEEK_END); err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- if err := fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
|
|
|
+ if err = fileutil.Preallocate(f.File, SegmentSizeBytes, true); err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
@@ -121,17 +125,33 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
|
|
encoder: newEncoder(f, 0),
|
|
|
}
|
|
|
w.locks = append(w.locks, f)
|
|
|
- if err := w.saveCrc(0); err != nil {
|
|
|
+ if err = w.saveCrc(0); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- if err := w.encoder.encode(&walpb.Record{Type: metadataType, Data: metadata}); err != nil {
|
|
|
+ if err = w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- if err := w.SaveSnapshot(walpb.Snapshot{}); err != nil {
|
|
|
+
|
|
|
+ if w, err = w.renameWal(tmpdirpath); err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- return w.renameWal(tmpdirpath)
|
|
|
+ // directory was renamed; sync parent dir to persist rename
|
|
|
+ pdir, perr := fileutil.OpenDir(path.Dir(w.dir))
|
|
|
+ if perr != nil {
|
|
|
+ return nil, perr
|
|
|
+ }
|
|
|
+ if perr = fileutil.Fsync(pdir); perr != nil {
|
|
|
+ return nil, perr
|
|
|
+ }
|
|
|
+ if perr = pdir.Close(); err != nil {
|
|
|
+ return nil, perr
|
|
|
+ }
|
|
|
+
|
|
|
+ return w, nil
|
|
|
}
|
|
|
|
|
|
// Open opens the WAL at the given snap.
|
|
|
@@ -141,7 +161,14 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
|
|
// the given snap. The WAL cannot be appended to before reading out all of its
|
|
|
// previous records.
|
|
|
func Open(dirpath string, snap walpb.Snapshot) (*WAL, error) {
|
|
|
- return openAtIndex(dirpath, snap, true)
|
|
|
+ w, err := openAtIndex(dirpath, snap, true)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ if w.dirFile, err = fileutil.OpenDir(w.dir); err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ return w, nil
|
|
|
}
|
|
|
|
|
|
// OpenForRead only opens the wal files for read.
|
|
|
@@ -373,6 +400,10 @@ func (w *WAL) cut() error {
|
|
|
if err = os.Rename(newTail.Name(), fpath); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+ if err = fileutil.Fsync(w.dirFile); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
newTail.Close()
|
|
|
|
|
|
if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
|
|
|
@@ -475,6 +506,11 @@ func (w *WAL) Close() error {
|
|
|
plog.Errorf("failed to unlock during closing wal: %s", err)
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ if err := w.dirFile.Close(); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
return nil
|
|
|
}
|
|
|
|