|
|
@@ -70,16 +70,15 @@ type WAL struct {
|
|
|
metadata []byte // metadata recorded at the head of each WAL
|
|
|
state raftpb.HardState // hardstate recorded at the head of WAL
|
|
|
|
|
|
- start walpb.Snapshot // snapshot to start reading
|
|
|
- decoder *decoder // decoder to decode records
|
|
|
+ start walpb.Snapshot // snapshot to start reading
|
|
|
+ decoder *decoder // decoder to decode records
|
|
|
+ readClose io.Closer // closer for decode reader
|
|
|
|
|
|
mu sync.Mutex
|
|
|
- f *os.File // underlay file opened for appending, sync
|
|
|
- seq uint64 // sequence of the wal file currently used for writes
|
|
|
enti uint64 // index of the last entry saved to the wal
|
|
|
encoder *encoder // encoder to encode records
|
|
|
|
|
|
- locks []fileutil.Lock // the file locks the WAL is holding (the name is increasing)
|
|
|
+ locks []*fileutil.LockedFile // the locked files the WAL holds (the name is increasing)
|
|
|
}
|
|
|
|
|
|
// Create creates a WAL ready for appending records. The given metadata is
|
|
|
@@ -94,26 +93,17 @@ func Create(dirpath string, metadata []byte) (*WAL, error) {
|
|
|
}
|
|
|
|
|
|
p := path.Join(dirpath, walName(0, 0))
|
|
|
- f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
|
|
|
+ f, err := fileutil.LockFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
- l, err := fileutil.NewLock(f.Name())
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- if err = l.Lock(); err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
|
|
|
w := &WAL{
|
|
|
dir: dirpath,
|
|
|
metadata: metadata,
|
|
|
- seq: 0,
|
|
|
- f: f,
|
|
|
encoder: newEncoder(f, 0),
|
|
|
}
|
|
|
- w.locks = append(w.locks, l)
|
|
|
+ w.locks = append(w.locks, f)
|
|
|
if err := w.saveCrc(0); err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
@@ -157,60 +147,56 @@ func openAtIndex(dirpath string, snap walpb.Snapshot, write bool) (*WAL, error)
|
|
|
return nil, ErrFileNotFound
|
|
|
}
|
|
|
|
|
|
- // open the wal files for reading
|
|
|
+ // open the wal files
|
|
|
rcs := make([]io.ReadCloser, 0)
|
|
|
- ls := make([]fileutil.Lock, 0)
|
|
|
+ ls := make([]*fileutil.LockedFile, 0)
|
|
|
for _, name := range names[nameIndex:] {
|
|
|
- f, err := os.Open(path.Join(dirpath, name))
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- l, err := fileutil.NewLock(f.Name())
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- err = l.TryLock()
|
|
|
- if err != nil {
|
|
|
- if write {
|
|
|
+ p := path.Join(dirpath, name)
|
|
|
+ if write {
|
|
|
+ l, err := fileutil.TryLockFile(p, os.O_RDWR, 0600)
|
|
|
+ if err != nil {
|
|
|
+ MultiReadCloser(rcs...).Close()
|
|
|
return nil, err
|
|
|
}
|
|
|
+ ls = append(ls, l)
|
|
|
+ rcs = append(rcs, l)
|
|
|
+ } else {
|
|
|
+ rf, err := os.OpenFile(p, os.O_RDONLY, 0600)
|
|
|
+ if err != nil {
|
|
|
+ return nil, err
|
|
|
+ }
|
|
|
+ ls = append(ls, nil)
|
|
|
+ rcs = append(rcs, rf)
|
|
|
}
|
|
|
- rcs = append(rcs, f)
|
|
|
- ls = append(ls, l)
|
|
|
}
|
|
|
+
|
|
|
rc := MultiReadCloser(rcs...)
|
|
|
+ c := rc
|
|
|
+ if write {
|
|
|
+ // write reuses the file descriptors from read; don't close so
|
|
|
+ // WAL can append without dropping the file lock
|
|
|
+ c = nil
|
|
|
+ }
|
|
|
|
|
|
// create a WAL ready for reading
|
|
|
w := &WAL{
|
|
|
- dir: dirpath,
|
|
|
- start: snap,
|
|
|
- decoder: newDecoder(rc),
|
|
|
- locks: ls,
|
|
|
+ dir: dirpath,
|
|
|
+ start: snap,
|
|
|
+ decoder: newDecoder(rc),
|
|
|
+ readClose: c,
|
|
|
+ locks: ls,
|
|
|
}
|
|
|
|
|
|
if write {
|
|
|
- // open the last wal file for appending
|
|
|
- seq, _, err := parseWalName(names[len(names)-1])
|
|
|
- if err != nil {
|
|
|
+ if _, _, err := parseWalName(path.Base(w.tail().Name())); err != nil {
|
|
|
rc.Close()
|
|
|
return nil, err
|
|
|
}
|
|
|
- last := path.Join(dirpath, names[len(names)-1])
|
|
|
-
|
|
|
- f, err := os.OpenFile(last, os.O_WRONLY|os.O_APPEND, 0)
|
|
|
- if err != nil {
|
|
|
- rc.Close()
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- err = fileutil.Preallocate(f, segmentSizeBytes)
|
|
|
- if err != nil {
|
|
|
+ if err := fileutil.Preallocate(w.tail().File, segmentSizeBytes); err != nil {
|
|
|
rc.Close()
|
|
|
plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
|
|
|
return nil, err
|
|
|
}
|
|
|
-
|
|
|
- w.f = f
|
|
|
- w.seq = seq
|
|
|
}
|
|
|
|
|
|
return w, nil
|
|
|
@@ -275,7 +261,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- switch w.f {
|
|
|
+ switch w.tail() {
|
|
|
case nil:
|
|
|
// We do not have to read out all entries in read mode.
|
|
|
// The last record maybe a partial written one, so
|
|
|
@@ -298,17 +284,20 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|
|
}
|
|
|
|
|
|
// close decoder, disable reading
|
|
|
- w.decoder.close()
|
|
|
+ if w.readClose != nil {
|
|
|
+ w.readClose.Close()
|
|
|
+ w.readClose = nil
|
|
|
+ }
|
|
|
w.start = walpb.Snapshot{}
|
|
|
|
|
|
w.metadata = metadata
|
|
|
|
|
|
- if w.f != nil {
|
|
|
+ if w.tail() != nil {
|
|
|
// create encoder (chain crc with the decoder), enable appending
|
|
|
- w.encoder = newEncoder(w.f, w.decoder.lastCRC())
|
|
|
- w.decoder = nil
|
|
|
+ w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
|
|
|
lastIndexSaved.Set(float64(w.enti))
|
|
|
}
|
|
|
+ w.decoder = nil
|
|
|
|
|
|
return metadata, state, ents, err
|
|
|
}
|
|
|
@@ -321,23 +310,20 @@ func (w *WAL) cut() error {
|
|
|
if err := w.sync(); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- if err := w.f.Close(); err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
|
|
|
- fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
|
|
|
+ fpath := path.Join(w.dir, walName(w.seq()+1, w.enti+1))
|
|
|
ftpath := fpath + ".tmp"
|
|
|
|
|
|
// create a temp wal file with name sequence + 1, or truncate the existing one
|
|
|
- ft, err := os.OpenFile(ftpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE|os.O_TRUNC, 0600)
|
|
|
+ newTail, err := fileutil.LockFile(ftpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
// update writer and save the previous crc
|
|
|
- w.f = ft
|
|
|
+ w.locks = append(w.locks, newTail)
|
|
|
prevCrc := w.encoder.crc.Sum32()
|
|
|
- w.encoder = newEncoder(w.f, prevCrc)
|
|
|
+ w.encoder = newEncoder(w.tail(), prevCrc)
|
|
|
if err = w.saveCrc(prevCrc); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -347,46 +333,27 @@ func (w *WAL) cut() error {
|
|
|
if err = w.saveState(&w.state); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- // close temp wal file
|
|
|
+ // atomically move temp wal file to wal file
|
|
|
if err = w.sync(); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- if err = w.f.Close(); err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
- // atomically move temp wal file to wal file
|
|
|
if err = os.Rename(ftpath, fpath); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+ newTail.Close()
|
|
|
|
|
|
- // open the wal file and update writer again
|
|
|
- f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND, 0600)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- if err = fileutil.Preallocate(f, segmentSizeBytes); err != nil {
|
|
|
- plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
|
|
|
+ if newTail, err = fileutil.LockFile(fpath, os.O_WRONLY|os.O_APPEND, 0600); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+ w.locks[len(w.locks)-1] = newTail
|
|
|
|
|
|
- w.f = f
|
|
|
prevCrc = w.encoder.crc.Sum32()
|
|
|
- w.encoder = newEncoder(w.f, prevCrc)
|
|
|
+ w.encoder = newEncoder(w.tail(), prevCrc)
|
|
|
|
|
|
- // lock the new wal file
|
|
|
- l, err := fileutil.NewLock(f.Name())
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
-
|
|
|
- if err := l.Lock(); err != nil {
|
|
|
+ if err = fileutil.Preallocate(w.tail().File, segmentSizeBytes); err != nil {
|
|
|
+ plog.Errorf("failed to allocate space when creating new wal file (%v)", err)
|
|
|
return err
|
|
|
}
|
|
|
- w.locks = append(w.locks, l)
|
|
|
-
|
|
|
- // increase the wal seq
|
|
|
- w.seq++
|
|
|
|
|
|
plog.Infof("segmented wal file %v is created", fpath)
|
|
|
return nil
|
|
|
@@ -399,7 +366,7 @@ func (w *WAL) sync() error {
|
|
|
}
|
|
|
}
|
|
|
start := time.Now()
|
|
|
- err := fileutil.Fdatasync(w.f)
|
|
|
+ err := fileutil.Fdatasync(w.tail().File)
|
|
|
syncDurations.Observe(float64(time.Since(start)) / float64(time.Second))
|
|
|
return err
|
|
|
}
|
|
|
@@ -438,8 +405,10 @@ func (w *WAL) ReleaseLockTo(index uint64) error {
|
|
|
}
|
|
|
|
|
|
for i := 0; i < smaller; i++ {
|
|
|
- w.locks[i].Unlock()
|
|
|
- w.locks[i].Destroy()
|
|
|
+ if w.locks[i] == nil {
|
|
|
+ continue
|
|
|
+ }
|
|
|
+ w.locks[i].Close()
|
|
|
}
|
|
|
w.locks = w.locks[smaller:]
|
|
|
|
|
|
@@ -450,22 +419,17 @@ func (w *WAL) Close() error {
|
|
|
w.mu.Lock()
|
|
|
defer w.mu.Unlock()
|
|
|
|
|
|
- if w.f != nil {
|
|
|
+ if w.tail() != nil {
|
|
|
if err := w.sync(); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
- if err := w.f.Close(); err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
}
|
|
|
for _, l := range w.locks {
|
|
|
- err := l.Unlock()
|
|
|
- if err != nil {
|
|
|
- plog.Errorf("failed to unlock during closing wal: %s", err)
|
|
|
+ if l == nil {
|
|
|
+ continue
|
|
|
}
|
|
|
- err = l.Destroy()
|
|
|
- if err != nil {
|
|
|
- plog.Errorf("failed to destroy lock during closing wal: %s", err)
|
|
|
+ if err := l.Close(); err != nil {
|
|
|
+ plog.Errorf("failed to unlock during closing wal: %s", err)
|
|
|
}
|
|
|
}
|
|
|
return nil
|
|
|
@@ -514,7 +478,7 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
- fstat, err := w.f.Stat()
|
|
|
+ fstat, err := w.tail().Stat()
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -524,6 +488,7 @@ func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
|
|
|
}
|
|
|
return nil
|
|
|
}
|
|
|
+
|
|
|
// TODO: add a test for this code path when refactoring the tests
|
|
|
return w.cut()
|
|
|
}
|
|
|
@@ -549,6 +514,25 @@ func (w *WAL) saveCrc(prevCrc uint32) error {
|
|
|
return w.encoder.encode(&walpb.Record{Type: crcType, Crc: prevCrc})
|
|
|
}
|
|
|
|
|
|
+func (w *WAL) tail() *fileutil.LockedFile {
|
|
|
+ if len(w.locks) > 0 {
|
|
|
+ return w.locks[len(w.locks)-1]
|
|
|
+ }
|
|
|
+ return nil
|
|
|
+}
|
|
|
+
|
|
|
+func (w *WAL) seq() uint64 {
|
|
|
+ t := w.tail()
|
|
|
+ if t == nil {
|
|
|
+ return 0
|
|
|
+ }
|
|
|
+ seq, _, err := parseWalName(path.Base(t.Name()))
|
|
|
+ if err != nil {
|
|
|
+ plog.Fatalf("bad wal name %s (%v)", t.Name(), err)
|
|
|
+ }
|
|
|
+ return seq
|
|
|
+}
|
|
|
+
|
|
|
func mustSync(st, prevst raftpb.HardState, entsnum int) bool {
|
|
|
// Persistent state on all servers:
|
|
|
// (Updated on stable storage before responding to RPCs)
|