|
|
@@ -284,31 +284,28 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|
|
}
|
|
|
|
|
|
// cut closes current file written and creates a new one ready to append.
|
|
|
+// cut first creates a temp wal file and writes necessary headers into it.
|
|
|
+// Then cut atomtically rename temp wal file to a wal file.
|
|
|
func (w *WAL) cut() error {
|
|
|
- // create a new wal file with name sequence + 1
|
|
|
- fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
|
|
|
- f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
|
|
|
- if err != nil {
|
|
|
+ // close old wal file
|
|
|
+ if err := w.sync(); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- log.Printf("wal: segmented wal file %v is created", fpath)
|
|
|
- l, err := fileutil.NewLock(f.Name())
|
|
|
- if err != nil {
|
|
|
+ if err := w.f.Close(); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- err = l.Lock()
|
|
|
+
|
|
|
+ fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
|
|
|
+ ftpath := fpath + ".tmp"
|
|
|
+
|
|
|
+ // create a temp wal file with name sequence + 1, or tuncate the existing one
|
|
|
+ ft, err := os.OpenFile(ftpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE|os.O_TRUNC, 0600)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- w.locks = append(w.locks, l)
|
|
|
- if err = w.sync(); err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- w.f.Close()
|
|
|
|
|
|
// update writer and save the previous crc
|
|
|
- w.f = f
|
|
|
- w.seq++
|
|
|
+ w.f = ft
|
|
|
prevCrc := w.encoder.crc.Sum32()
|
|
|
w.encoder = newEncoder(w.f, prevCrc)
|
|
|
if err := w.saveCrc(prevCrc); err != nil {
|
|
|
@@ -320,7 +317,45 @@ func (w *WAL) cut() error {
|
|
|
if err := w.saveState(&w.state); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- return w.sync()
|
|
|
+ // close temp wal file
|
|
|
+ if err := w.sync(); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ if err := w.f.Close(); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ // atomically move temp wal file to wal file
|
|
|
+ if err := os.Rename(ftpath, fpath); err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+
|
|
|
+ // open the wal file and update writer again
|
|
|
+ f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND, 0600)
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ w.f = f
|
|
|
+ prevCrc = w.encoder.crc.Sum32()
|
|
|
+ w.encoder = newEncoder(w.f, prevCrc)
|
|
|
+
|
|
|
+ // lock the new wal file
|
|
|
+ l, err := fileutil.NewLock(f.Name())
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ err = l.Lock()
|
|
|
+ if err != nil {
|
|
|
+ return err
|
|
|
+ }
|
|
|
+ w.locks = append(w.locks, l)
|
|
|
+
|
|
|
+ // increase the wal seq
|
|
|
+ w.seq++
|
|
|
+
|
|
|
+ log.Printf("wal: segmented wal file %v is created", fpath)
|
|
|
+
|
|
|
+ return nil
|
|
|
}
|
|
|
|
|
|
func (w *WAL) sync() error {
|