Browse Source

Merge pull request #6267 from heyitsanthony/fix-wal-tear

wal: fix CRC corruption on writes following write tears
Anthony Romano 9 years ago
parent
commit
4f5cacc835
4 changed files with 160 additions and 1 deletions
  1. 23 0
      pkg/fileutil/fileutil.go
  2. 39 0
      pkg/fileutil/fileutil_test.go
  3. 12 1
      wal/wal.go
  4. 86 0
      wal/wal_test.go

+ 23 - 0
pkg/fileutil/fileutil.go

@@ -96,3 +96,26 @@ func Exist(name string) bool {
 	_, err := os.Stat(name)
 	return err == nil
 }
+
+// ZeroToEnd zeros a file starting from SEEK_CUR to its SEEK_END. May temporarily
+// shorten the length of the file.
+func ZeroToEnd(f *os.File) error {
+	// TODO: support FALLOC_FL_ZERO_RANGE
+	off, err := f.Seek(0, os.SEEK_CUR)
+	if err != nil {
+		return err
+	}
+	lenf, lerr := f.Seek(0, os.SEEK_END)
+	if lerr != nil {
+		return lerr
+	}
+	if err = f.Truncate(off); err != nil {
+		return err
+	}
+	// make sure blocks remain allocated
+	if err = Preallocate(f, lenf, true); err != nil {
+		return err
+	}
+	_, err = f.Seek(off, os.SEEK_SET)
+	return err
+}

+ 39 - 0
pkg/fileutil/fileutil_test.go

@@ -118,3 +118,42 @@ func TestExist(t *testing.T) {
 		t.Errorf("exist = %v, want false", g)
 	}
 }
+
+func TestZeroToEnd(t *testing.T) {
+	f, err := ioutil.TempFile(os.TempDir(), "fileutil")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer f.Close()
+
+	b := make([]byte, 1024)
+	for i := range b {
+		b[i] = 12
+	}
+	if _, err = f.Write(b); err != nil {
+		t.Fatal(err)
+	}
+	if _, err = f.Seek(512, os.SEEK_SET); err != nil {
+		t.Fatal(err)
+	}
+	if err = ZeroToEnd(f); err != nil {
+		t.Fatal(err)
+	}
+	off, serr := f.Seek(0, os.SEEK_CUR)
+	if serr != nil {
+		t.Fatal(serr)
+	}
+	if off != 512 {
+		t.Fatalf("expected offset 512, got %d", off)
+	}
+
+	b = make([]byte, 512)
+	if _, err = f.Read(b); err != nil {
+		t.Fatal(err)
+	}
+	for i := range b {
+		if b[i] != 0 {
+			t.Errorf("expected b[%d] = 0, got %d", i, b[i])
+		}
+	}
+}

+ 12 - 1
wal/wal.go

@@ -301,6 +301,18 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
 			state.Reset()
 			return nil, state, nil, err
 		}
+		// decodeRecord() will return io.EOF if it detects a zero record,
+		// but this zero record may be followed by non-zero records from
+		// a torn write. Overwriting some of these non-zero records, but
+		// not all, will cause CRC errors on WAL open. Since the records
+		// were never fully synced to disk in the first place, it's safe
+		// to zero them out to avoid any CRC errors from new writes.
+		if _, err = w.tail().Seek(w.decoder.lastOffset(), os.SEEK_SET); err != nil {
+			return nil, state, nil, err
+		}
+		if err = fileutil.ZeroToEnd(w.tail().File); err != nil {
+			return nil, state, nil, err
+		}
 	}
 
 	err = nil
@@ -319,7 +331,6 @@ func (w *WAL) ReadAll() (metadata []byte, state raftpb.HardState, ents []raftpb.
 
 	if w.tail() != nil {
 		// create encoder (chain crc with the decoder), enable appending
-		_, err = w.tail().Seek(w.decoder.lastOffset(), os.SEEK_SET)
 		w.encoder = newEncoder(w.tail(), w.decoder.lastCRC())
 	}
 	w.decoder = nil

+ 86 - 0
wal/wal_test.go

@@ -636,3 +636,89 @@ func TestRestartCreateWal(t *testing.T) {
 		t.Fatalf("got error %v and meta %q, expected nil and %q", rerr, meta, "abc")
 	}
 }
+
+// TestOpenOnTornWrite ensures that entries past the torn write are truncated.
+func TestOpenOnTornWrite(t *testing.T) {
+	maxEntries := 40
+	clobberIdx := 20
+	overwriteEntries := 5
+
+	p, err := ioutil.TempDir(os.TempDir(), "waltest")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(p)
+	w, err := Create(p, nil)
+	defer w.Close()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// get offset of end of each saved entry
+	offsets := make([]int64, maxEntries)
+	for i := range offsets {
+		es := []raftpb.Entry{{Index: uint64(i)}}
+		if err = w.Save(raftpb.HardState{}, es); err != nil {
+			t.Fatal(err)
+		}
+		if offsets[i], err = w.tail().Seek(0, os.SEEK_CUR); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	fn := w.tail().Name()
+	w.Close()
+
+	// clobber some entry with 0's to simulate a torn write
+	f, ferr := os.OpenFile(fn, os.O_WRONLY, fileutil.PrivateFileMode)
+	if ferr != nil {
+		t.Fatal(ferr)
+	}
+	defer f.Close()
+	_, err = f.Seek(offsets[clobberIdx], os.SEEK_SET)
+	if err != nil {
+		t.Fatal(err)
+	}
+	zeros := make([]byte, offsets[clobberIdx+1]-offsets[clobberIdx])
+	_, err = f.Write(zeros)
+	if err != nil {
+		t.Fatal(err)
+	}
+	f.Close()
+
+	w, err = Open(p, walpb.Snapshot{})
+	if err != nil {
+		t.Fatal(err)
+	}
+	// seek up to clobbered entry
+	_, _, _, err = w.ReadAll()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	// write a few entries past the clobbered entry
+	for i := 0; i < overwriteEntries; i++ {
+		// Index is different from old, truncated entries
+		es := []raftpb.Entry{{Index: uint64(i + clobberIdx), Data: []byte("new")}}
+		if err = w.Save(raftpb.HardState{}, es); err != nil {
+			t.Fatal(err)
+		}
+	}
+	w.Close()
+
+	// read back the entries, confirm number of entries matches expectation
+	w, err = OpenForRead(p, walpb.Snapshot{})
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	_, _, ents, rerr := w.ReadAll()
+	if rerr != nil {
+		// CRC error? the old entries were likely never truncated away
+		t.Fatal(rerr)
+	}
+	wEntries := (clobberIdx - 1) + overwriteEntries
+	if len(ents) != wEntries {
+		t.Fatalf("expected len(ents) = %d, got %d", wEntries, len(ents))
+	}
+}