Browse Source

Merge pull request #1077 from coreos/wal_cut

wal: cut(i uint64) -> cut
Xiang Li 11 years ago
parent
commit
bc5791af11
4 changed files with 49 additions and 27 deletions
  1. 4 0
      wal/doc.go
  2. 4 0
      wal/util.go
  3. 21 10
      wal/wal.go
  4. 20 17
      wal/wal_test.go

+ 4 - 0
wal/doc.go

@@ -48,6 +48,10 @@ At a later time a WAL can be opened at a particular raft index:
 	w, err := wal.OpenAtIndex("/var/lib/etcd", 0)
 	...
 
+The raft index must have been written to the WAL. When opening without a
+snapshot the raft index should always be 0. When opening with a snapshot
+the raft index should be the index of the last entry covered by the snapshot.
+
 Additional items cannot be Saved to this WAL until all of the items from 0 to
 the end of the WAL are read first:
 

+ 4 - 0
wal/util.go

@@ -83,6 +83,10 @@ func parseWalName(str string) (seq, index int64, err error) {
 	return
 }
 
+func walName(seq, index int64) string {
+	return fmt.Sprintf("%016x-%016x.wal", seq, index)
+}
+
 func max(a, b int64) int64 {
 	if a > b {
 		return a

+ 21 - 10
wal/wal.go

@@ -60,7 +60,8 @@ type WAL struct {
 	decoder *decoder // decoder to decode records
 
 	f       *os.File // underlay file opened for appending, sync
-	seq     int64    // current sequence of the wal file
+	seq     int64    // sequence of the wal file currently used for writes
+	enti    int64    // index of the last entry saved to the wal
 	encoder *encoder // encoder to encode records
 }
 
@@ -75,7 +76,7 @@ func Create(dirpath string) (*WAL, error) {
 		return nil, err
 	}
 
-	p := path.Join(dirpath, fmt.Sprintf("%016x-%016x.wal", 0, 0))
+	p := path.Join(dirpath, walName(0, 0))
 	f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
 	if err != nil {
 		return nil, err
@@ -93,6 +94,7 @@ func Create(dirpath string) (*WAL, error) {
 }
 
 // OpenAtIndex opens the WAL at the given index.
+// The index MUST have been previously committed to the WAL.
 // The returned WAL is ready to read and the first record will be the given
 // index. The WAL cannot be appended to before reading out all of its
 // previous records.
@@ -126,6 +128,11 @@ func OpenAtIndex(dirpath string, index int64) (*WAL, error) {
 	rc := MultiReadCloser(rcs...)
 
 	// open the lastest wal file for appending
+	seq, _, err := parseWalName(names[len(names)-1])
+	if err != nil {
+		rc.Close()
+		return nil, err
+	}
 	last := path.Join(dirpath, names[len(names)-1])
 	f, err := os.OpenFile(last, os.O_WRONLY|os.O_APPEND, 0)
 	if err != nil {
@@ -138,7 +145,8 @@ func OpenAtIndex(dirpath string, index int64) (*WAL, error) {
 		ri:      index,
 		decoder: newDecoder(rc),
 
-		f: f,
+		f:   f,
+		seq: seq,
 	}
 	return w, nil
 }
@@ -156,6 +164,7 @@ func (w *WAL) ReadAll() (id int64, state raftpb.State, ents []raftpb.Entry, err
 			if e.Index >= w.ri {
 				ents = append(ents[:e.Index-w.ri], e)
 			}
+			w.enti = e.Index
 		case stateType:
 			state = mustUnmarshalState(rec.Data)
 		case infoType:
@@ -194,21 +203,19 @@ func (w *WAL) ReadAll() (id int64, state raftpb.State, ents []raftpb.Entry, err
 	return id, state, ents, nil
 }
 
-// index should be the index of last log entry.
 // Cut closes current file written and creates a new one ready to append.
-func (w *WAL) Cut(index int64) error {
-	log.Printf("wal.cut index=%d", index)
-
+func (w *WAL) Cut() error {
 	// create a new wal file with name sequence + 1
-	fpath := path.Join(w.dir, fmt.Sprintf("%016x-%016x.wal", w.seq+1, index+1))
+	fpath := path.Join(w.dir, walName(w.seq+1, w.enti+1))
 	f, err := os.OpenFile(fpath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
 	if err != nil {
 		return err
 	}
-
 	w.Sync()
 	w.f.Close()
 
+	log.Printf("wal.cut index=%d prevfile=%s curfile=%s", w.enti, w.f.Name(), f.Name())
+
 	// update writer and save the previous crc
 	w.f = f
 	w.seq++
@@ -250,7 +257,11 @@ func (w *WAL) SaveEntry(e *raftpb.Entry) error {
 		panic(err)
 	}
 	rec := &walpb.Record{Type: entryType, Data: b}
-	return w.encoder.encode(rec)
+	if err := w.encoder.encode(rec); err != nil {
+		return err
+	}
+	w.enti = e.Index
+	return nil
 }
 
 func (w *WAL) SaveState(s *raftpb.State) error {

+ 20 - 17
wal/wal_test.go

@@ -18,7 +18,6 @@ package wal
 
 import (
 	"bytes"
-	"fmt"
 	"io/ioutil"
 	"os"
 	"path"
@@ -31,8 +30,6 @@ import (
 var (
 	infoData   = []byte("\b\xef\xfd\x02")
 	infoRecord = append([]byte("\x0e\x00\x00\x00\x00\x00\x00\x00\b\x01\x10\x99\xb5\xe4\xd0\x03\x1a\x04"), infoData...)
-
-	firstWalName = "0000000000000000-0000000000000000.wal"
 )
 
 func TestNew(t *testing.T) {
@@ -46,8 +43,8 @@ func TestNew(t *testing.T) {
 	if err != nil {
 		t.Fatalf("err = %v, want nil", err)
 	}
-	if g := path.Base(w.f.Name()); g != firstWalName {
-		t.Errorf("name = %+v, want %+v", g, firstWalName)
+	if g := path.Base(w.f.Name()); g != walName(0, 0) {
+		t.Errorf("name = %+v, want %+v", g, walName(0, 0))
 	}
 	w.Close()
 }
@@ -59,7 +56,7 @@ func TestNewForInitedDir(t *testing.T) {
 	}
 	defer os.RemoveAll(p)
 
-	os.Create(path.Join(p, firstWalName))
+	os.Create(path.Join(p, walName(0, 0)))
 	if _, err = Create(p); err == nil || err != os.ErrExist {
 		t.Errorf("err = %v, want %v", err, os.ErrExist)
 	}
@@ -72,7 +69,7 @@ func TestOpenAtIndex(t *testing.T) {
 	}
 	defer os.RemoveAll(dir)
 
-	f, err := os.Create(path.Join(dir, firstWalName))
+	f, err := os.Create(path.Join(dir, walName(0, 0)))
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -82,12 +79,15 @@ func TestOpenAtIndex(t *testing.T) {
 	if err != nil {
 		t.Fatalf("err = %v, want nil", err)
 	}
-	if g := path.Base(w.f.Name()); g != firstWalName {
-		t.Errorf("name = %+v, want %+v", g, firstWalName)
+	if g := path.Base(w.f.Name()); g != walName(0, 0) {
+		t.Errorf("name = %+v, want %+v", g, walName(0, 0))
+	}
+	if w.seq != 0 {
+		t.Errorf("seq = %d, want %d", w.seq, 0)
 	}
 	w.Close()
 
-	wname := fmt.Sprintf("%016x-%016x.wal", 2, 10)
+	wname := walName(2, 10)
 	f, err = os.Create(path.Join(dir, wname))
 	if err != nil {
 		t.Fatal(err)
@@ -101,6 +101,9 @@ func TestOpenAtIndex(t *testing.T) {
 	if g := path.Base(w.f.Name()); g != wname {
 		t.Errorf("name = %+v, want %+v", g, wname)
 	}
+	if w.seq != 2 {
+		t.Errorf("seq = %d, want %d", w.seq, 2)
+	}
 	w.Close()
 
 	emptydir, err := ioutil.TempDir(os.TempDir(), "waltestempty")
@@ -130,10 +133,10 @@ func TestCut(t *testing.T) {
 	if err := w.SaveEntry(&raftpb.Entry{}); err != nil {
 		t.Fatal(err)
 	}
-	if err := w.Cut(0); err != nil {
+	if err := w.Cut(); err != nil {
 		t.Fatal(err)
 	}
-	wname := fmt.Sprintf("%016x-%016x.wal", 1, 1)
+	wname := walName(1, 1)
 	if g := path.Base(w.f.Name()); g != wname {
 		t.Errorf("name = %s, want %s", g, wname)
 	}
@@ -142,10 +145,10 @@ func TestCut(t *testing.T) {
 	if err := w.SaveEntry(e); err != nil {
 		t.Fatal(err)
 	}
-	if err := w.Cut(1); err != nil {
+	if err := w.Cut(); err != nil {
 		t.Fatal(err)
 	}
-	wname = fmt.Sprintf("%016x-%016x.wal", 2, 2)
+	wname = walName(2, 2)
 	if g := path.Base(w.f.Name()); g != wname {
 		t.Errorf("name = %s, want %s", g, wname)
 	}
@@ -287,7 +290,7 @@ func TestRecoverAfterCut(t *testing.T) {
 	if err = w.SaveEntry(&raftpb.Entry{}); err != nil {
 		t.Fatal(err)
 	}
-	if err = w.Cut(0); err != nil {
+	if err = w.Cut(); err != nil {
 		t.Fatal(err)
 	}
 	for i := 1; i < 10; i++ {
@@ -295,7 +298,7 @@ func TestRecoverAfterCut(t *testing.T) {
 		if err = w.SaveEntry(&e); err != nil {
 			t.Fatal(err)
 		}
-		if err = w.Cut(e.Index); err != nil {
+		if err = w.Cut(); err != nil {
 			t.Fatal(err)
 		}
 		if err = w.SaveInfo(info); err != nil {
@@ -304,7 +307,7 @@ func TestRecoverAfterCut(t *testing.T) {
 	}
 	w.Close()
 
-	if err := os.Remove(path.Join(p, "0000000000000004-0000000000000004.wal")); err != nil {
+	if err := os.Remove(path.Join(p, walName(4, 4))); err != nil {
 		t.Fatal(err)
 	}