Browse Source

Merge pull request #1387 from xiangli-cmu/fix_raft

Fix raft
Xiang Li 11 years ago
parent
commit
9fb02eb6fa
3 changed files with 31 additions and 11 deletions
  1. 6 4
      raft/log.go
  2. 5 2
      raft/raft.go
  3. 20 5
      raft/raft_test.go

+ 6 - 4
raft/log.go

@@ -53,8 +53,10 @@ func (l *raftLog) String() string {
 	return fmt.Sprintf("offset=%d committed=%d applied=%d len(ents)=%d", l.offset, l.committed, l.applied, len(l.ents))
 }
 
-func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) bool {
-	lastnewi := index + uint64(len(ents))
+// maybeAppend returns (0, false) if the entries cannot be appended. Otherwise,
+// it returns (last index of entries, true).
+func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry) (lastnewi uint64, ok bool) {
+	lastnewi = index + uint64(len(ents))
 	if l.matchTerm(index, logTerm) {
 		from := index + 1
 		ci := l.findConflict(from, ents)
@@ -70,9 +72,9 @@ func (l *raftLog) maybeAppend(index, logTerm, committed uint64, ents ...pb.Entry
 		if l.committed < tocommit {
 			l.committed = tocommit
 		}
-		return true
+		return lastnewi, true
 	}
-	return false
+	return 0, false
 }
 
 func (l *raftLog) append(after uint64, ents ...pb.Entry) uint64 {

+ 5 - 2
raft/raft.go

@@ -379,8 +379,8 @@ func (r *raft) Step(m pb.Message) error {
 }
 
 func (r *raft) handleAppendEntries(m pb.Message) {
-	if r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...) {
-		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.lastIndex()})
+	if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
+		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
 	} else {
 		r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true})
 	}
@@ -428,6 +428,9 @@ func stepLeader(r *raft, m pb.Message) {
 		r.appendEntry(e)
 		r.bcastAppend()
 	case pb.MsgAppResp:
+		if m.Index == 0 {
+			return
+		}
 		if m.Reject {
 			if r.prs[m.From].maybeDecrTo(m.Index) {
 				r.sendAppend(m.From)

+ 20 - 5
raft/raft_test.go

@@ -778,16 +778,22 @@ func TestAllServerStepdown(t *testing.T) {
 }
 
 func TestLeaderAppResp(t *testing.T) {
+	// initial progress: match = 0; netx = 3
 	tests := []struct {
-		index      uint64
-		reject     bool
+		index  uint64
+		reject bool
+		// progress
+		wmatch uint64
+		wnext  uint64
+		// message
 		wmsgNum    int
 		windex     uint64
 		wcommitted uint64
 	}{
-		{3, true, 0, 0, 0},  // stale resp; no replies
-		{2, true, 1, 1, 0},  // denied resp; leader does not commit; decrese next and send probing msg
-		{2, false, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
+		{3, true, 0, 3, 0, 0, 0},  // stale resp; no replies
+		{2, true, 0, 2, 1, 1, 0},  // denied resp; leader does not commit; decrese next and send probing msg
+		{2, false, 2, 3, 2, 2, 2}, // accept resp; leader commits; broadcast with commit index
+		{0, false, 0, 3, 0, 0, 0}, // ignore heartbeat replies
 	}
 
 	for i, tt := range tests {
@@ -799,6 +805,15 @@ func TestLeaderAppResp(t *testing.T) {
 		sm.becomeLeader()
 		sm.ReadMessages()
 		sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject})
+
+		p := sm.prs[2]
+		if p.match != tt.wmatch {
+			t.Errorf("#%d match = %d, want %d", i, p.match, tt.wmatch)
+		}
+		if p.next != tt.wnext {
+			t.Errorf("#%d next = %d, want %d", i, p.next, tt.wnext)
+		}
+
 		msgs := sm.ReadMessages()
 
 		if len(msgs) != tt.wmsgNum {