Browse Source

raft: write entry 0 into log

Yicheng Qin 11 years ago
parent
commit
a9af70c52b
5 changed files with 23 additions and 13 deletions
  1. 2 2
      raft/log.go
  2. 1 1
      raft/node_test.go
  3. 5 4
      wal/doc.go
  4. 2 1
      wal/wal.go
  5. 13 5
      wal/wal_test.go

+ 2 - 2
raft/log.go

@@ -27,7 +27,7 @@ type raftLog struct {
 func newLog() *raftLog {
 func newLog() *raftLog {
 	return &raftLog{
 	return &raftLog{
 		ents:             make([]pb.Entry, 1),
 		ents:             make([]pb.Entry, 1),
-		unstable:         1,
+		unstable:         0,
 		committed:        0,
 		committed:        0,
 		applied:          0,
 		applied:          0,
 		compactThreshold: defaultCompactThreshold,
 		compactThreshold: defaultCompactThreshold,
@@ -77,7 +77,7 @@ func (l *raftLog) findConflict(from int64, ents []pb.Entry) int64 {
 }
 }
 
 
 func (l *raftLog) unstableEnts() []pb.Entry {
 func (l *raftLog) unstableEnts() []pb.Entry {
-	ents := l.entries(l.unstable)
+	ents := l.slice(l.unstable, l.lastIndex()+1)
 	if ents == nil {
 	if ents == nil {
 		return nil
 		return nil
 	}
 	}

+ 1 - 1
raft/node_test.go

@@ -141,7 +141,7 @@ func TestNode(t *testing.T) {
 	wants := []Ready{
 	wants := []Ready{
 		{
 		{
 			State:            raftpb.State{Term: 1, Commit: 1, LastIndex: 1},
 			State:            raftpb.State{Term: 1, Commit: 1, LastIndex: 1},
-			Entries:          []raftpb.Entry{{Term: 1, Index: 1}},
+			Entries:          []raftpb.Entry{{}, {Term: 1, Index: 1}},
 			CommittedEntries: []raftpb.Entry{{Term: 1, Index: 1}},
 			CommittedEntries: []raftpb.Entry{{Term: 1, Index: 1}},
 		},
 		},
 		{
 		{

+ 5 - 4
wal/doc.go

@@ -32,15 +32,16 @@ WAL files are placed inside of the directory in the following format:
 $seq-$index.wal
 $seq-$index.wal
 
 
 The first WAL file to be created will be 0000000000000000-0000000000000000.wal
 The first WAL file to be created will be 0000000000000000-0000000000000000.wal
-indicating an initial sequence of 0 and an initial raft index of 0.
+indicating an initial sequence of 0 and an initial raft index of 0. The first
+entry written to WAL MUST have raft index 0.
 
 
 Periodically a user will want to "cut" the WAL and place new entries into a new
 Periodically a user will want to "cut" the WAL and place new entries into a new
 file. This will increment an internal sequence number and cause a new file to
 file. This will increment an internal sequence number and cause a new file to
 be created. If the last raft index saved was 0x20 and this is the first time
 be created. If the last raft index saved was 0x20 and this is the first time
 Cut has been called on this WAL then the sequence will increment from 0x0 to
 Cut has been called on this WAL then the sequence will increment from 0x0 to
-0x1. The new file will be: 0000000000000001-0000000000000020.wal. If a second
-Cut is issues 0x10 entries later then the file will be called:
-0000000000000002-0000000000000030.wal.
+0x1. The new file will be: 0000000000000001-0000000000000021.wal. If a second
+Cut issues 0x10 entries with incremental index later then the file will be called:
+0000000000000002-0000000000000031.wal.
 
 
 At a later time a WAL can be opened at a particular raft index:
 At a later time a WAL can be opened at a particular raft index:
 
 

+ 2 - 1
wal/wal.go

@@ -65,6 +65,7 @@ type WAL struct {
 }
 }
 
 
 // Create creates a WAL ready for appending records.
 // Create creates a WAL ready for appending records.
+// The index of first record saved MUST be 0.
 func Create(dirpath string) (*WAL, error) {
 func Create(dirpath string) (*WAL, error) {
 	log.Printf("path=%s wal.create", dirpath)
 	log.Printf("path=%s wal.create", dirpath)
 	if Exist(dirpath) {
 	if Exist(dirpath) {
@@ -75,7 +76,7 @@ func Create(dirpath string) (*WAL, error) {
 		return nil, err
 		return nil, err
 	}
 	}
 
 
-	p := path.Join(dirpath, fmt.Sprintf("%016x-%016x.wal", 0, 1))
+	p := path.Join(dirpath, fmt.Sprintf("%016x-%016x.wal", 0, 0))
 	f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
 	f, err := os.OpenFile(p, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
 	if err != nil {
 	if err != nil {
 		return nil, err
 		return nil, err

+ 13 - 5
wal/wal_test.go

@@ -32,7 +32,7 @@ var (
 	infoData   = []byte("\b\xef\xfd\x02")
 	infoData   = []byte("\b\xef\xfd\x02")
 	infoRecord = append([]byte("\x0e\x00\x00\x00\x00\x00\x00\x00\b\x01\x10\x99\xb5\xe4\xd0\x03\x1a\x04"), infoData...)
 	infoRecord = append([]byte("\x0e\x00\x00\x00\x00\x00\x00\x00\b\x01\x10\x99\xb5\xe4\xd0\x03\x1a\x04"), infoData...)
 
 
-	firstWalName = "0000000000000000-0000000000000001.wal"
+	firstWalName = "0000000000000000-0000000000000000.wal"
 )
 )
 
 
 func TestNew(t *testing.T) {
 func TestNew(t *testing.T) {
@@ -78,7 +78,7 @@ func TestOpenAtIndex(t *testing.T) {
 	}
 	}
 	f.Close()
 	f.Close()
 
 
-	w, err := OpenAtIndex(dir, 1)
+	w, err := OpenAtIndex(dir, 0)
 	if err != nil {
 	if err != nil {
 		t.Fatalf("err = %v, want nil", err)
 		t.Fatalf("err = %v, want nil", err)
 	}
 	}
@@ -126,6 +126,10 @@ func TestCut(t *testing.T) {
 	}
 	}
 	defer w.Close()
 	defer w.Close()
 
 
+	// TODO(unihorn): remove this when cut can operate on an empty file
+	if err := w.SaveEntry(&raftpb.Entry{}); err != nil {
+		t.Fatal(err)
+	}
 	if err := w.Cut(0); err != nil {
 	if err := w.Cut(0); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -162,7 +166,7 @@ func TestRecover(t *testing.T) {
 	if err = w.SaveInfo(i); err != nil {
 	if err = w.SaveInfo(i); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
-	ents := []raftpb.Entry{{Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}}
+	ents := []raftpb.Entry{{Index: 0, Term: 0}, {Index: 1, Term: 1, Data: []byte{1}}, {Index: 2, Term: 2, Data: []byte{2}}}
 	for _, e := range ents {
 	for _, e := range ents {
 		if err = w.SaveEntry(&e); err != nil {
 		if err = w.SaveEntry(&e); err != nil {
 			t.Fatal(err)
 			t.Fatal(err)
@@ -176,7 +180,7 @@ func TestRecover(t *testing.T) {
 	}
 	}
 	w.Close()
 	w.Close()
 
 
-	if w, err = OpenAtIndex(p, 1); err != nil {
+	if w, err = OpenAtIndex(p, 0); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
 	id, state, entries, err := w.ReadAll()
 	id, state, entries, err := w.ReadAll()
@@ -279,6 +283,10 @@ func TestRecoverAfterCut(t *testing.T) {
 	if err = w.SaveInfo(info); err != nil {
 	if err = w.SaveInfo(info); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
+	// TODO(unihorn): remove this when cut can operate on an empty file
+	if err = w.SaveEntry(&raftpb.Entry{}); err != nil {
+		t.Fatal(err)
+	}
 	if err = w.Cut(0); err != nil {
 	if err = w.Cut(0); err != nil {
 		t.Fatal(err)
 		t.Fatal(err)
 	}
 	}
@@ -322,7 +330,7 @@ func TestRecoverAfterCut(t *testing.T) {
 		}
 		}
 		for j, e := range entries {
 		for j, e := range entries {
 			if e.Index != int64(j+i) {
 			if e.Index != int64(j+i) {
-				t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i+1)
+				t.Errorf("#%d: ents[%d].Index = %+v, want %+v", i, j, e.Index, j+i)
 			}
 			}
 		}
 		}
 	}
 	}