Browse Source

raft: add node.Unstable
Be able to return all unstable log entries. Application must store unstable
log entries before send out any messages after calling step.

Xiang Li 11 years ago
parent
commit
609e13a240
3 changed files with 66 additions and 5 deletions
  1. 13 1
      raft/log.go
  2. 48 4
      raft/log_test.go
  3. 5 0
      raft/node.go

+ 13 - 1
raft/log.go

@@ -26,6 +26,7 @@ func (e *Entry) isConfig() bool {
 
 type raftLog struct {
 	ents      []Entry
+	unstable  int64
 	committed int64
 	applied   int64
 	offset    int64
@@ -38,6 +39,7 @@ type raftLog struct {
 func newLog() *raftLog {
 	return &raftLog{
 		ents:             make([]Entry, 1),
+		unstable:         1,
 		committed:        0,
 		applied:          0,
 		compactThreshold: defaultCompactThreshold,
@@ -69,6 +71,7 @@ func (l *raftLog) maybeAppend(index, logTerm, committed int64, ents ...Entry) bo
 
 func (l *raftLog) append(after int64, ents ...Entry) int64 {
 	l.ents = append(l.slice(l.offset, after+1), ents...)
+	l.unstable = min(l.unstable, after+1)
 	return l.lastIndex()
 }
 
@@ -81,6 +84,12 @@ func (l *raftLog) findConflict(from int64, ents []Entry) int64 {
 	return -1
 }
 
+func (l *raftLog) unstableEnts() []Entry {
+	ents := l.entries(l.unstable)
+	l.unstable = l.lastIndex() + 1
+	return ents
+}
+
 func (l *raftLog) lastIndex() int64 {
 	return int64(len(l.ents)) - 1 + l.offset
 }
@@ -132,7 +141,8 @@ func (l *raftLog) nextEnts() (ents []Entry) {
 	return ents
 }
 
-// compact removes the log entries before i, exclusive.
+// compact compacts all log entries until i.
+// It removes the log entries before i, exclusive.
 // i must be not smaller than the index of the first entry
 // and not greater than the index of the last entry.
 // the number of entries after compaction will be returned.
@@ -141,6 +151,7 @@ func (l *raftLog) compact(i int64) int64 {
 		panic(fmt.Sprintf("compact %d out of bounds [%d:%d]", i, l.offset, l.lastIndex()))
 	}
 	l.ents = l.slice(i, l.lastIndex()+1)
+	l.unstable = max(i+1, l.unstable)
 	l.offset = i
 	return int64(len(l.ents))
 }
@@ -151,6 +162,7 @@ func (l *raftLog) shouldCompact() bool {
 
 func (l *raftLog) restore(index, term int64) {
 	l.ents = []Entry{{Term: term}}
+	l.unstable = index + 1
 	l.committed = index
 	l.applied = index
 	l.offset = index

+ 48 - 4
raft/log_test.go

@@ -12,23 +12,27 @@ import (
 // 2.Append any new entries not already in the log
 func TestAppend(t *testing.T) {
 	previousEnts := []Entry{{Term: 1}, {Term: 2}}
+	previousUnstable := int64(3)
 	tests := []struct {
-		after  int64
-		ents   []Entry
-		windex int64
-		wents  []Entry
+		after     int64
+		ents      []Entry
+		windex    int64
+		wents     []Entry
+		wunstable int64
 	}{
 		{
 			2,
 			[]Entry{},
 			2,
 			[]Entry{{Term: 1}, {Term: 2}},
+			3,
 		},
 		{
 			2,
 			[]Entry{{Term: 2}},
 			3,
 			[]Entry{{Term: 1}, {Term: 2}, {Term: 2}},
+			3,
 		},
 		// conflicts with index 1
 		{
@@ -36,6 +40,7 @@ func TestAppend(t *testing.T) {
 			[]Entry{{Term: 2}},
 			1,
 			[]Entry{{Term: 2}},
+			1,
 		},
 		// conflicts with index 2
 		{
@@ -43,12 +48,14 @@ func TestAppend(t *testing.T) {
 			[]Entry{{Term: 3}, {Term: 3}},
 			3,
 			[]Entry{{Term: 1}, {Term: 3}, {Term: 3}},
+			2,
 		},
 	}
 
 	for i, tt := range tests {
 		raftLog := newLog()
 		raftLog.ents = append(raftLog.ents, previousEnts...)
+		raftLog.unstable = previousUnstable
 		index := raftLog.append(tt.after, tt.ents...)
 		if index != tt.windex {
 			t.Errorf("#%d: lastIndex = %d, want %d", i, index, tt.windex)
@@ -56,6 +63,9 @@ func TestAppend(t *testing.T) {
 		if g := raftLog.entries(1); !reflect.DeepEqual(g, tt.wents) {
 			t.Errorf("#%d: logEnts = %+v, want %+v", i, g, tt.wents)
 		}
+		if g := raftLog.unstable; g != tt.wunstable {
+			t.Errorf("#%d: unstable = %d, want %d", i, g, tt.wunstable)
+		}
 	}
 }
 
@@ -88,6 +98,11 @@ func TestCompactionSideEffects(t *testing.T) {
 		}
 	}
 
+	unstableEnts := raftLog.unstableEnts()
+	if g := len(unstableEnts); g != 500 {
+		t.Errorf("len(unstableEntries) = %d, want = %d", g, 500)
+	}
+
 	prev := raftLog.lastIndex()
 	raftLog.append(raftLog.lastIndex(), Entry{Term: raftLog.lastIndex() + 1})
 	if raftLog.lastIndex() != prev+1 {
@@ -100,6 +115,32 @@ func TestCompactionSideEffects(t *testing.T) {
 	}
 }
 
+func TestUnstableEnts(t *testing.T) {
+	previousEnts := []Entry{{Term: 1}, {Term: 2}}
+	tests := []struct {
+		unstable  int64
+		wents     []Entry
+		wunstable int64
+	}{
+		{3, nil, 3},
+		{1, []Entry{{Term: 1}, {Term: 2}}, 3},
+	}
+
+	for i, tt := range tests {
+		raftLog := newLog()
+		raftLog.ents = append(raftLog.ents, previousEnts...)
+		raftLog.unstable = tt.unstable
+		ents := raftLog.unstableEnts()
+		if !reflect.DeepEqual(ents, tt.wents) {
+			t.Errorf("#%d: unstableEnts = %+v, want %+v", i, ents, tt.wents)
+		}
+		if g := raftLog.unstable; g != tt.wunstable {
+			t.Errorf("#%d: unstable = %d, want %d", i, g, tt.wunstable)
+		}
+	}
+
+}
+
 //TestCompaction ensures that the number of log entreis is correct after compactions.
 func TestCompaction(t *testing.T) {
 	tests := []struct {
@@ -164,6 +205,9 @@ func TestLogRestore(t *testing.T) {
 	if raftLog.committed != index {
 		t.Errorf("comitted = %d, want %d", raftLog.committed, index)
 	}
+	if raftLog.unstable != index+1 {
+		t.Errorf("unstable = %d, want %d", raftLog.unstable, index+1)
+	}
 	if raftLog.term(index) != term {
 		t.Errorf("term = %d, want %d", raftLog.term(index), term)
 	}

+ 5 - 0
raft/node.go

@@ -204,3 +204,8 @@ func (n *Node) UpdateConf(t int64, c *Config) {
 	}
 	n.propose(t, data)
 }
+
+// UnstableEnts retuens all the entries that need to be persistent.
+func (n *Node) UnstableEnts() []Entry {
+	return n.sm.raftLog.unstableEnts()
+}