Browse Source

Merge pull request #1067 from unihorn/122

raft: write entry 0 into log
Yicheng Qin 11 years ago
parent
commit
38c074cb05
7 changed files with 31 additions and 17 deletions
  1. 1 1
      main.go
  2. 7 2
      raft/log.go
  3. 3 2
      raft/node_test.go
  4. 1 2
      raft/raft.go
  5. 5 4
      wal/doc.go
  6. 1 1
      wal/wal.go
  7. 13 5
      wal/wal_test.go

+ 1 - 1
main.go

@@ -110,7 +110,7 @@ func startRaft(id int64, peerIDs []int64, waldir string) (raft.Node, *wal.WAL) {
 
 
 	// restart a node from previous wal
 	// restart a node from previous wal
 	// TODO(xiangli): check snapshot; not open from one
 	// TODO(xiangli): check snapshot; not open from one
-	w, err := wal.OpenAtIndex(waldir, 1)
+	w, err := wal.OpenAtIndex(waldir, 0)
 	if err != nil {
 	if err != nil {
 		log.Fatal(err)
 		log.Fatal(err)
 	}
 	}

+ 7 - 2
raft/log.go

@@ -27,7 +27,7 @@ type raftLog struct {
 func newLog() *raftLog {
 func newLog() *raftLog {
 	return &raftLog{
 	return &raftLog{
 		ents:             make([]pb.Entry, 1),
 		ents:             make([]pb.Entry, 1),
-		unstable:         1,
+		unstable:         0,
 		committed:        0,
 		committed:        0,
 		applied:          0,
 		applied:          0,
 		compactThreshold: defaultCompactThreshold,
 		compactThreshold: defaultCompactThreshold,
@@ -38,6 +38,11 @@ func (l *raftLog) isEmpty() bool {
 	return l.offset == 0 && len(l.ents) == 1
 	return l.offset == 0 && len(l.ents) == 1
 }
 }
 
 
+func (l *raftLog) load(ents []pb.Entry) {
+	l.ents = ents
+	l.unstable = l.offset + int64(len(ents))
+}
+
 func (l *raftLog) String() string {
 func (l *raftLog) String() string {
 	return fmt.Sprintf("offset=%d committed=%d applied=%d len(ents)=%d", l.offset, l.committed, l.applied, len(l.ents))
 	return fmt.Sprintf("offset=%d committed=%d applied=%d len(ents)=%d", l.offset, l.committed, l.applied, len(l.ents))
 }
 }
@@ -77,7 +82,7 @@ func (l *raftLog) findConflict(from int64, ents []pb.Entry) int64 {
 }
 }
 
 
 func (l *raftLog) unstableEnts() []pb.Entry {
 func (l *raftLog) unstableEnts() []pb.Entry {
-	ents := l.entries(l.unstable)
+	ents := l.slice(l.unstable, l.lastIndex()+1)
 	if ents == nil {
 	if ents == nil {
 		return nil
 		return nil
 	}
 	}

+ 3 - 2
raft/node_test.go

@@ -141,7 +141,7 @@ func TestNode(t *testing.T) {
 	wants := []Ready{
 	wants := []Ready{
 		{
 		{
 			State:            raftpb.State{Term: 1, Commit: 1, LastIndex: 1},
 			State:            raftpb.State{Term: 1, Commit: 1, LastIndex: 1},
-			Entries:          []raftpb.Entry{{Term: 1, Index: 1}},
+			Entries:          []raftpb.Entry{{}, {Term: 1, Index: 1}},
 			CommittedEntries: []raftpb.Entry{{Term: 1, Index: 1}},
 			CommittedEntries: []raftpb.Entry{{Term: 1, Index: 1}},
 		},
 		},
 		{
 		{
@@ -171,6 +171,7 @@ func TestNode(t *testing.T) {
 
 
 func TestNodeRestart(t *testing.T) {
 func TestNodeRestart(t *testing.T) {
 	entries := []raftpb.Entry{
 	entries := []raftpb.Entry{
+		{},
 		{Term: 1, Index: 1},
 		{Term: 1, Index: 1},
 		{Term: 1, Index: 2, Data: []byte("foo")},
 		{Term: 1, Index: 2, Data: []byte("foo")},
 	}
 	}
@@ -179,7 +180,7 @@ func TestNodeRestart(t *testing.T) {
 	want := Ready{
 	want := Ready{
 		State: emptyState,
 		State: emptyState,
 		// commit upto index commit index in st
 		// commit upto index commit index in st
-		CommittedEntries: entries[:st.Commit],
+		CommittedEntries: entries[1 : st.Commit+1],
 	}
 	}
 
 
 	n := Restart(1, []int64{1}, 0, 0, st, entries)
 	n := Restart(1, []int64{1}, 0, 0, st, entries)

+ 1 - 2
raft/raft.go

@@ -503,8 +503,7 @@ func (r *raft) loadEnts(ents []pb.Entry) {
 	if !r.raftLog.isEmpty() {
 	if !r.raftLog.isEmpty() {
 		panic("cannot load entries when log is not empty")
 		panic("cannot load entries when log is not empty")
 	}
 	}
-	r.raftLog.append(0, ents...)
-	r.raftLog.unstable = r.raftLog.lastIndex() + 1
+	r.raftLog.load(ents)
 }
 }
 
 
 func (r *raft) loadState(state pb.State) {
 func (r *raft) loadState(state pb.State) {

+ 5 - 4
wal/doc.go

@@ -32,15 +32,16 @@ WAL files are placed inside of the directory in the following format:
 $seq-$index.wal
 $seq-$index.wal
 
 
 The first WAL file to be created will be 0000000000000000-0000000000000000.wal
 The first WAL file to be created will be 0000000000000000-0000000000000000.wal
-indicating an initial sequence of 0 and an initial raft index of 0.
+indicating an initial sequence of 0 and an initial raft index of 0. The first
+entry written to WAL MUST have raft index 0.
 
 
 Periodically a user will want to "cut" the WAL and place new entries into a new
 Periodically a user will want to "cut" the WAL and place new entries into a new
 file. This will increment an internal sequence number and cause a new file to
 file. This will increment an internal sequence number and cause a new file to
 be created. If the last raft index saved was 0x20 and this is the first time
 be created. If the last raft index saved was 0x20 and this is the first time
 Cut has been called on this WAL then the sequence will increment from 0x0 to
 Cut has been called on this WAL then the sequence will increment from 0x0 to
-0x1. The new file will be: 0000000000000001-0000000000000020.wal. If a second
-Cut is issues 0x10 entries later then the file will be called:
-0000000000000002-0000000000000030.wal.
+0x1. The new file will be: 0000000000000001-0000000000000021.wal. If a second
+Cut issues 0x10 entries with incremental index later then the file will be called:
+0000000000000002-0000000000000031.wal.
 
 
 At a later time a WAL can be opened at a particular raft index:
 At a later time a WAL can be opened at a particular raft index:
 
 

+ 1 - 1
wal/wal.go

@@ -75,7 +75,7 @@ func Create(dirpath string) (*WAL, error) {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	p := path.Join(dirpath, fmt.Sprintf("%016x-%016x.wal", 0, 1))
+	p := path.Join(dirpath, fmt.Sprintf("%016x-%016x.wal", 0, 0))
 	f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
 	f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err

+ 13 - 5
wal/wal_test.go

@@ -32,7 +32,7 @@ var (
 	infoData   = []byte("\b\xef\xfd\x02")
 	infoData   = []byte("\b\xef\xfd\x02")
 	infoRecord = append([]byte("\x0e\x00\x00\x00\x00\x00\x00\x00\b\x01\x10\x99\xb5\xe4\xd0\x03\x1a\x04"), infoData...)
 	infoRecord = append([]byte("\x0e\x00\x00\x00\x00\x00\x00\x00\b\x01\x10\x99\xb5\xe4\xd0\x03\x1a\x04"), infoData...)
 
 
-	firstWalName = "0000000000000000-0000000000000001.wal"
+	firstWalName = "0000000000000000-0000000000000000.wal"
 )
 )
 
 
 func TestNew(t *testing.T) {
 func TestNew(t *testing.T) {
@@ -78,7 +78,7 @@ func TestOpenAtIndex(t *testing.T) {
 	}
 	}
 	f.Close()
 	f.Close()
 
 
-	w, err := OpenAtIndex(dir, 1)
+	w, err := OpenAtIndex(dir, 0)
 	if err != nil {
 	if err != nil {
 		t.Fatalf("err = %v, want nil", err)
 		t.Fatalf("err = %v, want nil", err)
 	}
 	}
@@ -126,6 +126,10 @@ func TestCut(t *testing.T) {
 	}
 	}
 	defer w.Close()
 	defer w.Close()
 
 
+	// TODO(unihorn): remove this when cut can operate on an empty file
+	if err := w.SaveEntry(&raftpb.Entry{}); err != nil {
+		t.Fatal(err)
+	}
 	if err := w.Cut(0); err != nil {
 	if err := w.Cut(0); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -162,7 +166,7 @@ func TestRecover(t *testing.T) {
 	if err = w.SaveInfo(i); err != nil {
 	if err = w.SaveInfo(i); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	ents := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}}
+	ents := []raftpb.Entry{{Index: 0, Term: 0}, {Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}}
 	for _, e := range ents {
 	for _, e := range ents {
 		if err = w.SaveEntry(&e); err != nil {
 		if err = w.SaveEntry(&e); err != nil {
 			t.Fatal(err)
 			t.Fatal(err)
@@ -176,7 +180,7 @@ func TestRecover(t *testing.T) {
 	}
 	}
 	w.Close()
 	w.Close()
 
 
-	if w, err = OpenAtIndex(p, 1); err != nil {
+	if w, err = OpenAtIndex(p, 0); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	id, state, entries, err := w.ReadAll()
 	id, state, entries, err := w.ReadAll()
@@ -279,6 +283,10 @@ func TestRecoverAfterCut(t *testing.T) {
 	if err = w.SaveInfo(info); err != nil {
 	if err = w.SaveInfo(info); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
+	// TODO(unihorn): remove this when cut can operate on an empty file
+	if err = w.SaveEntry(&raftpb.Entry{}); err != nil {
+		t.Fatal(err)
+	}
 	if err = w.Cut(0); err != nil {
 	if err = w.Cut(0); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -322,7 +330,7 @@ func TestRecoverAfterCut(t *testing.T) {
 		}
 		}
 		for j, e := range entries {
 		for j, e := range entries {
 			if e.Index != int64(j+i) {
 			if e.Index != int64(j+i) {
-				t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i+1)
+				t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i)
 			}
 			}
 		}
 		}
 	}
 	}