Browse Source

wal: add Verify function to perform corruption check on wal contents

Signed-off-by: Shreyas Rao <shreyas.sriganesh.rao@sap.com>
shreyas-s-rao 6 years ago
parent
commit
3d6862fe0d
1 changed files with 123 additions and 27 deletions
  1. 123 27
      wal/wal.go

+ 123 - 27
wal/wal.go

@@ -299,17 +299,55 @@ func OpenForRead(lg *zap.Logger, dirpath string, snap walpb.Snapshot) (*WAL, err
 }
 }
 
 
 func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
 func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool) (*WAL, error) {
-	names, err := readWALNames(lg, dirpath)
+	names, nameIndex, err := selectWALFiles(lg, dirpath, snap)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
+	rs, ls, closer, err := openWALFiles(lg, dirpath, names, nameIndex, write)
+	if err != nil {
+		return nil, err
+	}
+
+	// create a WAL ready for reading
+	w := &WAL{
+		dir:       dirpath,
+		start:     snap,
+		decoder:   newDecoder(rs...),
+		readClose: closer,
+		locks:     ls,
+	}
+
+	if write {
+		// write reuses the file descriptors from read; don't close so
+		// WAL can append without dropping the file lock
+		w.readClose = nil
+		if _, _, err := parseWALName(filepath.Base(w.tail().Name())); err != nil {
+			closer()
+			return nil, err
+		}
+		w.fp = newFilePipeline(lg, w.dir, SegmentSizeBytes)
+	}
+
+	return w, nil
+}
+
+func selectWALFiles(lg *zap.Logger, dirpath string, snap walpb.Snapshot) ([]string, int, error) {
+	names, err := readWALNames(lg, dirpath)
+	if err != nil {
+		return nil, -1, err
+	}
+
 	nameIndex, ok := searchIndex(lg, names, snap.Index)
 	nameIndex, ok := searchIndex(lg, names, snap.Index)
 	if !ok || !isValidSeq(lg, names[nameIndex:]) {
 	if !ok || !isValidSeq(lg, names[nameIndex:]) {
-		return nil, ErrFileNotFound
+		err = ErrFileNotFound
+		return nil, -1, err
 	}
 	}
 
 
-	// open the wal files
+	return names, nameIndex, nil
+}
+
+func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) ([]io.Reader, []*fileutil.LockedFile, func() error, error) {
 	rcs := make([]io.ReadCloser, 0)
 	rcs := make([]io.ReadCloser, 0)
 	rs := make([]io.Reader, 0)
 	rs := make([]io.Reader, 0)
 	ls := make([]*fileutil.LockedFile, 0)
 	ls := make([]*fileutil.LockedFile, 0)
@@ -319,7 +357,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
 			l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
 			l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
 			if err != nil {
 			if err != nil {
 				closeAll(rcs...)
 				closeAll(rcs...)
-				return nil, err
+				return nil, nil, nil, err
 			}
 			}
 			ls = append(ls, l)
 			ls = append(ls, l)
 			rcs = append(rcs, l)
 			rcs = append(rcs, l)
@@ -327,7 +365,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
 			rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
 			rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
 			if err != nil {
 			if err != nil {
 				closeAll(rcs...)
 				closeAll(rcs...)
-				return nil, err
+				return nil, nil, nil, err
 			}
 			}
 			ls = append(ls, nil)
 			ls = append(ls, nil)
 			rcs = append(rcs, rf)
 			rcs = append(rcs, rf)
@@ -337,28 +375,7 @@ func openAtIndex(lg *zap.Logger, dirpath string, snap walpb.Snapshot, write bool
 
 
 	closer := func() error { return closeAll(rcs...) }
 	closer := func() error { return closeAll(rcs...) }
 
 
-	// create a WAL ready for reading
-	w := &WAL{
-		lg:        lg,
-		dir:       dirpath,
-		start:     snap,
-		decoder:   newDecoder(rs...),
-		readClose: closer,
-		locks:     ls,
-	}
-
-	if write {
-		// write reuses the file descriptors from read; don't close so
-		// WAL can append without dropping the file lock
-		w.readClose = nil
-		if _, _, err := parseWALName(filepath.Base(w.tail().Name())); err != nil {
-			closer()
-			return nil, err
-		}
-		w.fp = newFilePipeline(w.lg, w.dir, SegmentSizeBytes)
-	}
-
-	return w, nil
+	return rs, ls, closer, nil
 }
 }
 
 
 // ReadAll reads out records of the current WAL.
 // ReadAll reads out records of the current WAL.
@@ -480,6 +497,85 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
 	return metadata, state, ents, err
 	return metadata, state, ents, err
 }
 }
 
 
+// Verify reads through the given WAL and verifies that it is not corrupted.
+// It creates a new decoder to read through the records of the given WAL.
+// It does not conflict with any open WAL, but it is recommended not to
+// call this function after opening the WAL for writing.
+// If it cannot read out the expected snap, it will return ErrSnapshotNotFound.
+// If the loaded snap doesn't match with the expected one, it will
+// return error ErrSnapshotMismatch.
+func Verify(lg *zap.Logger, walDir string, snap walpb.Snapshot) error {
+	var metadata []byte
+	var err error
+	var match bool
+
+	rec := &walpb.Record{}
+
+	names, nameIndex, err := selectWALFiles(lg, walDir, snap)
+	if err != nil {
+		return err
+	}
+
+	// open wal files in read mode, so that there is no conflict
+	// when the same WAL is opened elsewhere in write mode
+	rs, _, closer, err := openWALFiles(lg, walDir, names, nameIndex, false)
+	if err != nil {
+		return err
+	}
+
+	// create a new decoder from the readers on the WAL files
+	decoder := newDecoder(rs...)
+
+	for err = decoder.decode(rec); err == nil; err = decoder.decode(rec) {
+		switch rec.Type {
+		case metadataType:
+			if metadata != nil && !bytes.Equal(metadata, rec.Data) {
+				return ErrMetadataConflict
+			}
+			metadata = rec.Data
+		case crcType:
+			crc := decoder.crc.Sum32()
+			// Current crc of decoder must match the crc of the record.
+			// We need not match 0 crc, since the decoder is a new one at this point.
+			if crc != 0 && rec.Validate(crc) != nil {
+				return ErrCRCMismatch
+			}
+			decoder.updateCRC(rec.Crc)
+		case snapshotType:
+			var loadedSnap walpb.Snapshot
+			pbutil.MustUnmarshal(&loadedSnap, rec.Data)
+			if loadedSnap.Index == snap.Index {
+				if loadedSnap.Term != snap.Term {
+					return ErrSnapshotMismatch
+				}
+				match = true
+			}
+		// We ignore all entry and state type records as these
+		// are not necessary for validating the WAL contents
+		case entryType:
+		case stateType:
+		default:
+			return fmt.Errorf("unexpected block type %d", rec.Type)
+		}
+	}
+
+	if closer != nil {
+		closer()
+	}
+
+	// We do not have to read out all the WAL entries
+	// as the decoder is opened in read mode.
+	if err != io.EOF && err != io.ErrUnexpectedEOF {
+		return err
+	}
+
+	if !match {
+		return ErrSnapshotNotFound
+	}
+
+	return nil
+}
+
 // cut closes current file written and creates a new one ready to append.
 // cut closes current file written and creates a new one ready to append.
 // cut first creates a temp wal file and writes necessary headers into it.
 // cut first creates a temp wal file and writes necessary headers into it.
 // Then cut atomically rename temp wal file to a wal file.
 // Then cut atomically rename temp wal file to a wal file.