|
|
@@ -172,7 +172,7 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- if w, err = w.renameWal(tmpdirpath); err != nil {
|
|
|
+ if w, err = w.renameWAL(tmpdirpath); err != nil {
|
|
|
if lg != nil {
|
|
|
lg.Warn(
|
|
|
"failed to rename the temporary WAL directory",
|
|
|
@@ -223,7 +223,7 @@ func Create(lg *zap.Logger, dirpath string, metadata []byte) (*WAL, error) {
|
|
|
return w, nil
|
|
|
}
|
|
|
|
|
|
-func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
|
|
|
+func (w *WAL) renameWAL(tmpdirpath string) (*WAL, error) {
|
|
|
if err := os.RemoveAll(w.dir); err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
@@ -235,7 +235,7 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
|
|
|
// process holds the lock.
|
|
|
if err := os.Rename(tmpdirpath, w.dir); err != nil {
|
|
|
if _, ok := err.(*os.LinkError); ok {
|
|
|
- return w.renameWalUnlock(tmpdirpath)
|
|
|
+ return w.renameWALUnlock(tmpdirpath)
|
|
|
}
|
|
|
return nil, err
|
|
|
}
|
|
|
@@ -245,23 +245,24 @@ func (w *WAL) renameWal(tmpdirpath string) (*WAL, error) {
|
|
|
return w, err
|
|
|
}
|
|
|
|
|
|
-func (w *WAL) renameWalUnlock(tmpdirpath string) (*WAL, error) {
|
|
|
+func (w *WAL) renameWALUnlock(tmpdirpath string) (*WAL, error) {
|
|
|
// rename of directory with locked files doesn't work on windows/cifs;
|
|
|
// close the WAL to release the locks so the directory can be renamed.
|
|
|
if w.lg != nil {
|
|
|
w.lg.Info(
|
|
|
- "releasing flock to rename",
|
|
|
+ "closing WAL to release flock and retry directory renaming",
|
|
|
zap.String("from", tmpdirpath),
|
|
|
zap.String("to", w.dir),
|
|
|
)
|
|
|
} else {
|
|
|
plog.Infof("releasing file lock to rename %q to %q", tmpdirpath, w.dir)
|
|
|
}
|
|
|
-
|
|
|
w.Close()
|
|
|
+
|
|
|
if err := os.Rename(tmpdirpath, w.dir); err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
+
|
|
|
// reopen and relock
|
|
|
newWAL, oerr := Open(w.lg, w.dir, walpb.Snapshot{})
|
|
|
if oerr != nil {
|
|
|
@@ -298,13 +299,13 @@ func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, err
|
|
|
}
|
|
|
|
|
|
func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
|
|
|
- names, err := readWalNames(lg, dirpath)
|
|
|
+ names, err := readWALNames(lg, dirpath)
|
|
|
if err != nil {
|
|
|
return nil, err
|
|
|
}
|
|
|
|
|
|
- nameIndex, ok := searchIndex(names, snap.Index)
|
|
|
- if !ok || !isValidSeq(names[nameIndex:]) {
|
|
|
+ nameIndex, ok := searchIndex(lg, names, snap.Index)
|
|
|
+ if !ok || !isValidSeq(lg, names[nameIndex:]) {
|
|
|
return nil, ErrFileNotFound
|
|
|
}
|
|
|
|
|
|
@@ -350,7 +351,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
|
|
|
// write reuses the file descriptors from read; don't close so
|
|
|
// WAL can append without dropping the file lock
|
|
|
w.readClose = nil
|
|
|
- if _, _, err := parseWalName(filepath.Base(w.tail().Name())); err != nil {
|
|
|
+ if _, _, err := parseWALName(filepath.Base(w.tail().Name())); err != nil {
|
|
|
closer()
|
|
|
return nil, err
|
|
|
}
|
|
|
@@ -386,14 +387,17 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|
|
ents = append(ents[:e.Index-w.start.Index-1], e)
|
|
|
}
|
|
|
w.enti = e.Index
|
|
|
+
|
|
|
case stateType:
|
|
|
state = mustUnmarshalState(rec.Data)
|
|
|
+
|
|
|
case metadataType:
|
|
|
if metadata != nil && !bytes.Equal(metadata, rec.Data) {
|
|
|
state.Reset()
|
|
|
return nil, state, nil, ErrMetadataConflict
|
|
|
}
|
|
|
metadata = rec.Data
|
|
|
+
|
|
|
case crcType:
|
|
|
crc := decoder.crc.Sum32()
|
|
|
// current crc of decoder must match the crc of the record.
|
|
|
@@ -403,6 +407,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|
|
return nil, state, nil, ErrCRCMismatch
|
|
|
}
|
|
|
decoder.updateCRC(rec.Crc)
|
|
|
+
|
|
|
case snapshotType:
|
|
|
var snap walpb.Snapshot
|
|
|
pbutil.MustUnmarshal(&snap, rec.Data)
|
|
|
@@ -413,6 +418,7 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
|
|
|
}
|
|
|
match = true
|
|
|
}
|
|
|
+
|
|
|
default:
|
|
|
state.Reset()
|
|
|
return nil, state, nil, fmt.Errorf("unexpected block type %d", rec.Type)
|
|
|
@@ -483,9 +489,11 @@ func (w *WAL) cut() error {
|
|
|
if serr != nil {
|
|
|
return serr
|
|
|
}
|
|
|
+
|
|
|
if err := w.tail().Truncate(off); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+
|
|
|
if err := w.sync(); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -505,15 +513,19 @@ func (w *WAL) cut() error {
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+
|
|
|
if err = w.saveCrc(prevCrc); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+
|
|
|
if err = w.encoder.encode(&walpb.Record{Type: metadataType, Data: w.metadata}); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+
|
|
|
if err = w.saveState(&w.state); err != nil {
|
|
|
return err
|
|
|
}
|
|
|
+
|
|
|
// atomically move temp wal file to wal file
|
|
|
if err = w.sync(); err != nil {
|
|
|
return err
|
|
|
@@ -550,7 +562,7 @@ func (w *WAL) cut() error {
|
|
|
}
|
|
|
|
|
|
if w.lg != nil {
|
|
|
-
|
|
|
+ w.lg.Info("created a new WAL segment", zap.String("path", fpath))
|
|
|
} else {
|
|
|
plog.Infof("segmented wal file %v is created", fpath)
|
|
|
}
|
|
|
@@ -578,8 +590,8 @@ func (w *WAL) sync() error {
|
|
|
plog.Warningf("sync duration of %v, expected less than %v", took, warnSyncDuration)
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
syncDurations.Observe(took.Seconds())
|
|
|
+
|
|
|
return err
|
|
|
}
|
|
|
|
|
|
@@ -597,9 +609,8 @@ func (w *WAL) ReleaseLockTo(index uint64) error {
|
|
|
|
|
|
var smaller int
|
|
|
found := false
|
|
|
-
|
|
|
for i, l := range w.locks {
|
|
|
- _, lockIndex, err := parseWalName(filepath.Base(l.Name()))
|
|
|
+ _, lockIndex, err := parseWALName(filepath.Base(l.Name()))
|
|
|
if err != nil {
|
|
|
return err
|
|
|
}
|
|
|
@@ -631,6 +642,7 @@ func (w *WAL) ReleaseLockTo(index uint64) error {
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
+// Close closes the current WAL file and directory.
|
|
|
func (w *WAL) Close() error {
|
|
|
w.mu.Lock()
|
|
|
defer w.mu.Unlock()
|
|
|
@@ -651,7 +663,7 @@ func (w *WAL) Close() error {
|
|
|
}
|
|
|
if err := l.Close(); err != nil {
|
|
|
if w.lg != nil {
|
|
|
- w.lg.Error("failed to close WAL", zap.Error(err))
|
|
|
+ w.lg.Warn("failed to close WAL", zap.Error(err))
|
|
|
} else {
|
|
|
plog.Errorf("failed to unlock during closing wal: %s", err)
|
|
|
}
|
|
|
@@ -750,7 +762,7 @@ func (w *WAL) seq() uint64 {
|
|
|
if t == nil {
|
|
|
return 0
|
|
|
}
|
|
|
- seq, _, err := parseWalName(filepath.Base(t.Name()))
|
|
|
+ seq, _, err := parseWALName(filepath.Base(t.Name()))
|
|
|
if err != nil {
|
|
|
if w.lg != nil {
|
|
|
w.lg.Fatal("failed to parse WAL name", zap.String("name", t.Name()), zap.Error(err))
|