Browse Source

wal: OpenFromIndex fails if it cannot find previous index

Example:
We save entry 1, 2, 3 to WAL.
If we try to open 100, it should fail.
Yicheng Qin 11 years ago
parent
commit
7160b5ae26
2 changed files with 34 additions and 1 deletions
  1. 7 1
      wal/wal.go
  2. 27 0
      wal/wal_test.go

+ 7 - 1
wal/wal.go

@@ -94,7 +94,8 @@ func Create(dirpath string) (*WAL, error) {
 }
 
 // OpenAtIndex opens the WAL at the given index.
-// The index MUST have been previously committed to the WAL.
+// The index SHOULD have been previously committed to the WAL, or the following
+// ReadAll will fail.
 // The returned WAL is ready to read and the first record will be the given
 // index. The WAL cannot be appended to before reading out all of its
 // previous records.
@@ -152,6 +153,7 @@ func OpenAtIndex(dirpath string, index int64) (*WAL, error) {
 }
 
 // ReadAll reads out all records of the current WAL.
+// If it cannot read out the expected entry, it will return ErrNotFound.
 // After ReadAll, the WAL will be ready for appending new records.
 func (w *WAL) ReadAll() (id int64, state raftpb.HardState, ents []raftpb.Entry, err error) {
 	rec := &walpb.Record{}
@@ -192,6 +194,10 @@ func (w *WAL) ReadAll() (id int64, state raftpb.HardState, ents []raftpb.Entry,
 		state.Reset()
 		return 0, state, nil, err
 	}
+	if w.enti < w.ri {
+		state.Reset()
+		return 0, state, nil, ErrNotFound
+	}
 
 	// close decoder, disable reading
 	w.decoder.close()

+ 27 - 0
wal/wal_test.go

@@ -339,6 +339,33 @@ func TestRecoverAfterCut(t *testing.T) {
 	}
 }
 
+func TestOpenAtUncommittedIndex(t *testing.T) {
+	p, err := ioutil.TempDir(os.TempDir(), "waltest")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(p)
+
+	w, err := Create(p)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if err := w.SaveEntry(&raftpb.Entry{Index: 0}); err != nil {
+		t.Fatal(err)
+	}
+	w.Close()
+
+	w, err = OpenAtIndex(p, 1)
+	if err != nil {
+		t.Fatal(err)
+	}
+	// commit up to index 0, try to read index 1
+	if _, _, _, err := w.ReadAll(); err != ErrNotFound {
+		t.Errorf("err = %v, want %v", err, ErrNotFound)
+	}
+	w.Close()
+}
+
 func TestSaveEmpty(t *testing.T) {
 	var buf bytes.Buffer
 	var est raftpb.HardState