Browse Source

wal: cut(i uint64) -> cut

Xiang Li 11 years ago
parent
commit
21860bc017
2 changed files with 14 additions and 8 deletions
  1. 10 4
      wal/wal.go
  2. 4 4
      wal/wal_test.go

+ 10 - 4
wal/wal.go

@@ -61,6 +61,7 @@ type WAL struct {
 
 
 	f       *os.File // underlay file opened for appending, sync
 	f       *os.File // underlay file opened for appending, sync
 	seq     int64    // current sequence of the wal file
 	seq     int64    // current sequence of the wal file
+	enti    int64    // index of the last entry that has been saved to wal
 	encoder *encoder // encoder to encode records
 	encoder *encoder // encoder to encode records
 }
 }
 
 
@@ -156,6 +157,7 @@ func (w *WAL) ReadAll() (id int64, state raftpb.State, ents []raftpb.Entry, err
 			if e.Index >= w.ri {
 			if e.Index >= w.ri {
 				ents = append(ents[:e.Index-w.ri], e)
 				ents = append(ents[:e.Index-w.ri], e)
 			}
 			}
+			w.enti = e.Index
 		case stateType:
 		case stateType:
 			state = mustUnmarshalState(rec.Data)
 			state = mustUnmarshalState(rec.Data)
 		case infoType:
 		case infoType:
@@ -196,11 +198,11 @@ func (w *WAL) ReadAll() (id int64, state raftpb.State, ents []raftpb.Entry, err
 
 
 // index should be the index of last log entry.
 // index should be the index of last log entry.
 // Cut closes current file written and creates a new one ready to append.
 // Cut closes current file written and creates a new one ready to append.
-func (w *WAL) Cut(index int64) error {
-	log.Printf("wal.cut index=%d", index)
+func (w *WAL) Cut() error {
+	log.Printf("wal.cut index=%d", w.enti+1)
 
 
 	// create a new wal file with name sequence + 1
 	// create a new wal file with name sequence + 1
-	fpath := path.Join(w.dir, fmt.Sprintf("%016x-%016x.wal", w.seq+1, index+1))
+	fpath := path.Join(w.dir, fmt.Sprintf("%016x-%016x.wal", w.seq+1, w.enti+1))
 	f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
 	f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
 	if err != nil {
 	if err != nil {
 		return err
 		return err
@@ -250,7 +252,11 @@ func (w *WAL) SaveEntry(e *raftpb.Entry) error {
 		panic(err)
 		panic(err)
 	}
 	}
 	rec := &walpb.Record{Type: entryType, Data: b}
 	rec := &walpb.Record{Type: entryType, Data: b}
-	return w.encoder.encode(rec)
+	if err := w.encoder.encode(rec); err != nil {
+		return err
+	}
+	w.enti = e.Index
+	return nil
 }
 }
 
 
 func (w *WAL) SaveState(s *raftpb.State) error {
 func (w *WAL) SaveState(s *raftpb.State) error {

+ 4 - 4
wal/wal_test.go

@@ -130,7 +130,7 @@ func TestCut(t *testing.T) {
 	if err := w.SaveEntry(&raftpb.Entry{}); err != nil {
 	if err := w.SaveEntry(&raftpb.Entry{}); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	if err := w.Cut(0); err != nil {
+	if err := w.Cut(); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	wname := fmt.Sprintf("%016x-%016x.wal", 1, 1)
 	wname := fmt.Sprintf("%016x-%016x.wal", 1, 1)
@@ -142,7 +142,7 @@ func TestCut(t *testing.T) {
 	if err := w.SaveEntry(e); err != nil {
 	if err := w.SaveEntry(e); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	if err := w.Cut(1); err != nil {
+	if err := w.Cut(); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	wname = fmt.Sprintf("%016x-%016x.wal", 2, 2)
 	wname = fmt.Sprintf("%016x-%016x.wal", 2, 2)
@@ -287,7 +287,7 @@ func TestRecoverAfterCut(t *testing.T) {
 	if err = w.SaveEntry(&raftpb.Entry{}); err != nil {
 	if err = w.SaveEntry(&raftpb.Entry{}); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	if err = w.Cut(0); err != nil {
+	if err = w.Cut(); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	for i := 1; i < 10; i++ {
 	for i := 1; i < 10; i++ {
@@ -295,7 +295,7 @@ func TestRecoverAfterCut(t *testing.T) {
 		if err = w.SaveEntry(&e); err != nil {
 		if err = w.SaveEntry(&e); err != nil {
 			t.Fatal(err)
 			t.Fatal(err)
 		}
 		}
-		if err = w.Cut(e.Index); err != nil {
+		if err = w.Cut(); err != nil {
 			t.Fatal(err)
 			t.Fatal(err)
 		}
 		}
 		if err = w.SaveInfo(info); err != nil {
 		if err = w.SaveInfo(info); err != nil {