ソースを参照

Merge pull request #2514 from yichengq/340

raft: introduce progress states
Yicheng Qin 10 年 前
コミット
7e7bc76038
4 ファイル変更327 行追加194 行削除
  1. 126 106
      raft/raft.go
  2. 10 4
      raft/raft_snap_test.go
  3. 190 83
      raft/raft_test.go
  4. 1 1
      raft/status.go

+ 126 - 106
raft/raft.go

@@ -49,36 +49,88 @@ func (st StateType) String() string {
 	return stmap[uint64(st)]
 }
 
+const (
+	ProgressStateProbe ProgressStateType = iota
+	ProgressStateReplicate
+	ProgressStateSnapshot
+)
+
+type ProgressStateType uint64
+
+var prstmap = [...]string{
+	"ProgressStateProbe",
+	"ProgressStateReplicate",
+	"ProgressStateSnapshot",
+}
+
+func (st ProgressStateType) String() string { return prstmap[uint64(st)] }
+
 type Progress struct {
 	Match, Next uint64
-	Wait        int
-	// If the last sent to the Progress failed and reported
-	// by the link layer via MsgUnreachable, Unreachable will be set.
-	// If the Progress is unreachable, snapshot and optimistically append
-	// will be disabled.
-	// Unreachable will be unset if raft starts to receive message (msgAppResp,
-	// msgHeartbeatResp) from the remote peer of the Progress.
-	Unreachable bool
+	// When in ProgressStateProbe, leader sends at most one replication message
+	// per heartbeat interval. It also probes actual progress of the follower.
+	//
+	// When in ProgressStateReplicate, leader optimistically increases next
+	// to the latest entry sent after sending replication message. This is
+	// an optimized state for fast replicating log entries to the follower.
+	//
+	// When in ProgressStateSnapshot, leader should have sent out snapshot
+	// before and stops sending any replication message.
+	State ProgressStateType
+	// Paused is used in ProgressStateProbe.
+	// When Paused is true, raft should pause sending replication message to this peer.
+	Paused bool
+	// PendingSnapshot is used in ProgressStateSnapshot.
 	// If there is a pending snapshot, the pendingSnapshot will be set to the
 	// index of the snapshot. If pendingSnapshot is set, the replication process of
 	// this Progress will be paused. raft will not resend snapshot until the pending one
 	// is reported to be failed.
-	//
-	// PendingSnapshot is set when raft sends out a snapshot to this Progress.
-	// PendingSnapshot is unset when the snapshot is reported to be successfully,
-	// or raft updates an equal or higher Match for this Progress.
 	PendingSnapshot uint64
 }
 
-func (pr *Progress) update(n uint64) {
-	pr.waitReset()
+func (pr *Progress) resetState(state ProgressStateType) {
+	pr.Paused = false
+	pr.PendingSnapshot = 0
+	pr.State = state
+}
+
+func (pr *Progress) becomeProbe() {
+	// If the original state is ProgressStateSnapshot, progress knows that
+	// the pending snapshot has been sent to this peer successfully, then
+	// probes from pendingSnapshot + 1.
+	if pr.State == ProgressStateSnapshot {
+		pendingSnapshot := pr.PendingSnapshot
+		pr.resetState(ProgressStateProbe)
+		pr.Next = max(pr.Match+1, pendingSnapshot+1)
+	} else {
+		pr.resetState(ProgressStateProbe)
+		pr.Next = pr.Match + 1
+	}
+}
 
+func (pr *Progress) becomeReplicate() {
+	pr.resetState(ProgressStateReplicate)
+	pr.Next = pr.Match + 1
+}
+
+func (pr *Progress) becomeSnapshot(snapshoti uint64) {
+	pr.resetState(ProgressStateSnapshot)
+	pr.PendingSnapshot = snapshoti
+}
+
+// maybeUpdate returns false if the given n index comes from an outdated message.
+// Otherwise it updates the progress and returns true.
+func (pr *Progress) maybeUpdate(n uint64) bool {
+	var updated bool
 	if pr.Match < n {
 		pr.Match = n
+		updated = true
+		pr.resume()
 	}
 	if pr.Next < n+1 {
 		pr.Next = n + 1
 	}
+	return updated
 }
 
 func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 }
@@ -86,9 +138,7 @@ func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 }
 // maybeDecrTo returns false if the given to index comes from an out of order message.
 // Otherwise it decreases the progress next index to min(rejected, last) and returns true.
 func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
-	pr.waitReset()
-
-	if pr.Match != 0 {
+	if pr.State == ProgressStateReplicate {
 		// the rejection must be stale if the progress has matched and "rejected"
 		// is smaller than "match".
 		if rejected <= pr.Match {
@@ -107,61 +157,28 @@ func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
 	if pr.Next = min(rejected, last+1); pr.Next < 1 {
 		pr.Next = 1
 	}
+	pr.resume()
 	return true
 }
 
-func (pr *Progress) waitDecr(i int) {
-	pr.Wait -= i
-	if pr.Wait < 0 {
-		pr.Wait = 0
-	}
-}
-func (pr *Progress) waitSet(w int)       { pr.Wait = w }
-func (pr *Progress) waitReset()          { pr.Wait = 0 }
-func (pr *Progress) isUnreachable() bool { return pr.Unreachable }
-func (pr *Progress) reachable()          { pr.Unreachable = false }
-
-func (pr *Progress) unreachable() {
-	pr.Unreachable = true
-	// When in optimistic appending path, if the remote becomes unreachable,
-	// there is big probability that it loses MsgApp. Fall back to bad
-	// path to recover it steadily.
-	if pr.Match != 0 {
-		pr.Next = pr.Match + 1
-	}
-}
-
-func (pr *Progress) shouldWait() bool { return (pr.Unreachable || pr.Match == 0) && pr.Wait > 0 }
-
-func (pr *Progress) hasPendingSnapshot() bool    { return pr.PendingSnapshot != 0 }
-func (pr *Progress) setPendingSnapshot(i uint64) { pr.PendingSnapshot = i }
+func (pr *Progress) pause()  { pr.Paused = true }
+func (pr *Progress) resume() { pr.Paused = false }
 
-// finishSnapshot unsets the pending snapshot and optimistically increase Next to
-// the index of pendingSnapshot + 1. The next replication message is expected
-// to be msgApp.
-func (pr *Progress) snapshotFinish() {
-	pr.Next = pr.PendingSnapshot + 1
-	pr.PendingSnapshot = 0
+// isPaused returns whether progress stops sending message.
+func (pr *Progress) isPaused() bool {
+	return pr.State == ProgressStateProbe && pr.Paused || pr.State == ProgressStateSnapshot
 }
 
-// snapshotFail unsets the pending snapshot. The next replication message is expected
-// to be another msgSnap.
-func (pr *Progress) snapshotFail() {
-	pr.PendingSnapshot = 0
-}
+func (pr *Progress) snapshotFailure() { pr.PendingSnapshot = 0 }
 
 // maybeSnapshotAbort unsets pendingSnapshot if Match is equal or higher than
 // the pendingSnapshot
 func (pr *Progress) maybeSnapshotAbort() bool {
-	if pr.hasPendingSnapshot() && pr.Match >= pr.PendingSnapshot {
-		pr.PendingSnapshot = 0
-		return true
-	}
-	return false
+	return pr.State == ProgressStateSnapshot && pr.Match >= pr.PendingSnapshot
 }
 
 func (pr *Progress) String() string {
-	return fmt.Sprintf("next = %d, match = %d, wait = %v", pr.Next, pr.Match, pr.Wait)
+	return fmt.Sprintf("next = %d, match = %d, state = %s, waiting = %v, pendingSnapshot = %d", pr.Next, pr.Match, pr.State, pr.isPaused(), pr.PendingSnapshot)
 }
 
 type raft struct {
@@ -273,18 +290,12 @@ func (r *raft) send(m pb.Message) {
 // sendAppend sends RRPC, with entries to the given peer.
 func (r *raft) sendAppend(to uint64) {
 	pr := r.prs[to]
-	if pr.shouldWait() || pr.hasPendingSnapshot() {
+	if pr.isPaused() {
 		return
 	}
 	m := pb.Message{}
 	m.To = to
 	if r.needSnapshot(pr.Next) {
-		if pr.isUnreachable() {
-			// do not try to send snapshot until the Progress is
-			// reachable
-			return
-		}
-
 		m.Type = pb.MsgSnap
 		snapshot, err := r.raftLog.snapshot()
 		if err != nil {
@@ -297,7 +308,7 @@ func (r *raft) sendAppend(to uint64) {
 		sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
 		raftLogger.Infof("raft: %x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
 			r.id, r.raftLog.firstIndex(), r.Commit, sindex, sterm, to, pr)
-		pr.setPendingSnapshot(sindex)
+		pr.becomeSnapshot(sindex)
 		raftLogger.Infof("raft: %x paused sending replication messages to %x [%s]", r.id, to, pr)
 	} else {
 		m.Type = pb.MsgApp
@@ -305,12 +316,16 @@ func (r *raft) sendAppend(to uint64) {
 		m.LogTerm = r.raftLog.term(pr.Next - 1)
 		m.Entries = r.raftLog.entries(pr.Next)
 		m.Commit = r.raftLog.committed
-		// optimistically increase the next if the follower
-		// has been matched.
-		if n := len(m.Entries); pr.Match != 0 && !pr.isUnreachable() && n != 0 {
-			pr.optimisticUpdate(m.Entries[n-1].Index)
-		} else if pr.Match == 0 || pr.isUnreachable() {
-			pr.waitSet(r.heartbeatTimeout)
+		if n := len(m.Entries); n != 0 {
+			switch pr.State {
+			// optimistically increase the next when in ProgressStateReplicate
+			case ProgressStateReplicate:
+				pr.optimisticUpdate(m.Entries[n-1].Index)
+			case ProgressStateProbe:
+				pr.pause()
+			default:
+				raftLogger.Panicf("raft: %x is sending append in unhandled state %s", r.id, pr.State)
+			}
 		}
 	}
 	r.send(m)
@@ -351,7 +366,7 @@ func (r *raft) bcastHeartbeat() {
 			continue
 		}
 		r.sendHeartbeat(i)
-		r.prs[i].waitDecr(r.heartbeatTimeout)
+		r.prs[i].resume()
 	}
 }
 
@@ -390,7 +405,7 @@ func (r *raft) appendEntry(es ...pb.Entry) {
 		es[i].Index = li + 1 + uint64(i)
 	}
 	r.raftLog.append(es...)
-	r.prs[r.id].update(r.raftLog.lastIndex())
+	r.prs[r.id].maybeUpdate(r.raftLog.lastIndex())
 	r.maybeCommit()
 }
 
@@ -547,36 +562,37 @@ func stepLeader(r *raft, m pb.Message) {
 		r.appendEntry(m.Entries...)
 		r.bcastAppend()
 	case pb.MsgAppResp:
-		if pr.isUnreachable() {
-			pr.reachable()
-			raftLogger.Infof("raft: %x received msgAppResp from %x and changed it to be reachable [%s]", r.id, m.From, pr)
-		}
 		if m.Reject {
 			raftLogger.Infof("raft: %x received msgApp rejection(lastindex: %d) from %x for index %d",
 				r.id, m.RejectHint, m.From, m.Index)
 			if pr.maybeDecrTo(m.Index, m.RejectHint) {
 				raftLogger.Infof("raft: %x decreased progress of %x to [%s]", r.id, m.From, pr)
+				if pr.State == ProgressStateReplicate {
+					pr.becomeProbe()
+				}
 				r.sendAppend(m.From)
 			}
 		} else {
-			oldWait := pr.shouldWait()
-			pr.update(m.Index)
-			if r.prs[m.From].maybeSnapshotAbort() {
-				raftLogger.Infof("raft: %x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
-			}
-			if r.maybeCommit() {
-				r.bcastAppend()
-			} else if oldWait {
-				// update() reset the wait state on this node. If we had delayed sending
-				// an update before, send it now.
-				r.sendAppend(m.From)
+			oldPaused := pr.isPaused()
+			if pr.maybeUpdate(m.Index) {
+				switch {
+				case pr.State == ProgressStateProbe:
+					pr.becomeReplicate()
+				case pr.State == ProgressStateSnapshot && pr.maybeSnapshotAbort():
+					raftLogger.Infof("raft: %x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
+					pr.becomeProbe()
+				}
+
+				if r.maybeCommit() {
+					r.bcastAppend()
+				} else if oldPaused {
+					// update() reset the wait state on this node. If we had delayed sending
+					// an update before, send it now.
+					r.sendAppend(m.From)
+				}
 			}
 		}
 	case pb.MsgHeartbeatResp:
-		if pr.isUnreachable() {
-			pr.reachable()
-			raftLogger.Infof("raft: %x received msgHeartbeatResp from %x and changed it to be reachable [%s]", r.id, m.From, pr)
-		}
 		if pr.Match < r.raftLog.lastIndex() {
 			r.sendAppend(m.From)
 		}
@@ -585,24 +601,28 @@ func stepLeader(r *raft, m pb.Message) {
 			r.id, r.raftLog.lastTerm(), r.raftLog.lastIndex(), r.Vote, m.From, m.LogTerm, m.Index, r.Term)
 		r.send(pb.Message{To: m.From, Type: pb.MsgVoteResp, Reject: true})
 	case pb.MsgSnapStatus:
-		if !pr.hasPendingSnapshot() {
+		if pr.State != ProgressStateSnapshot {
 			return
 		}
-		if m.Reject {
-			pr.snapshotFail()
-			raftLogger.Infof("raft: %x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
-		} else {
-			pr.snapshotFinish()
+		if !m.Reject {
+			pr.becomeProbe()
 			raftLogger.Infof("raft: %x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
-			// wait for the msgAppResp from the remote node before sending
-			// out the next msgApp
-			pr.waitSet(r.electionTimeout)
+		} else {
+			pr.snapshotFailure()
+			pr.becomeProbe()
+			raftLogger.Infof("raft: %x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
 		}
+		// If snapshot finish, wait for the msgAppResp from the remote node before sending
+		// out the next msgApp.
+		// If snapshot failure, wait for a heartbeat interval before next try
+		pr.pause()
 	case pb.MsgUnreachable:
-		if !pr.isUnreachable() {
-			pr.unreachable()
-			raftLogger.Infof("raft: %x failed to send message to %x and changed it to be unreachable [%s]", r.id, m.From, pr)
+		// During optimistic replication, if the remote becomes unreachable,
+		// there is huge probability that a MsgApp is lost.
+		if pr.State == ProgressStateReplicate {
+			pr.becomeProbe()
 		}
+		raftLogger.Infof("raft: %x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
 	}
 }
 

+ 10 - 4
raft/raft_snap_test.go

@@ -56,7 +56,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) {
 	sm.becomeCandidate()
 	sm.becomeLeader()
 
-	sm.prs[2].setPendingSnapshot(11)
+	sm.prs[2].becomeSnapshot(11)
 
 	sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 	msgs := sm.readMessages()
@@ -74,7 +74,7 @@ func TestSnapshotFailure(t *testing.T) {
 	sm.becomeLeader()
 
 	sm.prs[2].Next = 1
-	sm.prs[2].setPendingSnapshot(11)
+	sm.prs[2].becomeSnapshot(11)
 
 	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
 	if sm.prs[2].PendingSnapshot != 0 {
@@ -83,6 +83,9 @@ func TestSnapshotFailure(t *testing.T) {
 	if sm.prs[2].Next != 1 {
 		t.Fatalf("Next = %d, want 1", sm.prs[2].Next)
 	}
+	if sm.prs[2].Paused != true {
+		t.Errorf("Paused = %v, want true", sm.prs[2].Paused)
+	}
 }
 
 func TestSnapshotSucceed(t *testing.T) {
@@ -94,7 +97,7 @@ func TestSnapshotSucceed(t *testing.T) {
 	sm.becomeLeader()
 
 	sm.prs[2].Next = 1
-	sm.prs[2].setPendingSnapshot(11)
+	sm.prs[2].becomeSnapshot(11)
 
 	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
 	if sm.prs[2].PendingSnapshot != 0 {
@@ -103,6 +106,9 @@ func TestSnapshotSucceed(t *testing.T) {
 	if sm.prs[2].Next != 12 {
 		t.Fatalf("Next = %d, want 12", sm.prs[2].Next)
 	}
+	if sm.prs[2].Paused != true {
+		t.Errorf("Paused = %v, want true", sm.prs[2].Paused)
+	}
 }
 
 func TestSnapshotAbort(t *testing.T) {
@@ -114,7 +120,7 @@ func TestSnapshotAbort(t *testing.T) {
 	sm.becomeLeader()
 
 	sm.prs[2].Next = 1
-	sm.prs[2].setPendingSnapshot(11)
+	sm.prs[2].becomeSnapshot(11)
 
 	// A successful msgAppResp that has a higher/equal index than the
 	// pending snapshot should abort the pending snapshot.

+ 190 - 83
raft/raft_test.go

@@ -48,25 +48,94 @@ func (r *raft) readMessages() []pb.Message {
 	return msgs
 }
 
+func TestBecomeProbe(t *testing.T) {
+	match := uint64(1)
+	tests := []struct {
+		p     *Progress
+		wnext uint64
+	}{
+		{
+			&Progress{State: ProgressStateReplicate, Match: match, Next: 5},
+			2,
+		},
+		{
+			// snapshot finish
+			&Progress{State: ProgressStateSnapshot, Match: match, Next: 5, PendingSnapshot: 10},
+			11,
+		},
+		{
+			// snapshot failure
+			&Progress{State: ProgressStateSnapshot, Match: match, Next: 5, PendingSnapshot: 0},
+			2,
+		},
+	}
+	for i, tt := range tests {
+		tt.p.becomeProbe()
+		if tt.p.State != ProgressStateProbe {
+			t.Errorf("#%d: state = %s, want %s", i, tt.p.State, ProgressStateProbe)
+		}
+		if tt.p.Match != match {
+			t.Errorf("#%d: match = %d, want %d", i, tt.p.Match, match)
+		}
+		if tt.p.Next != tt.wnext {
+			t.Errorf("#%d: next = %d, want %d", i, tt.p.Next, tt.wnext)
+		}
+	}
+}
+
+func TestBecomeReplicate(t *testing.T) {
+	p := &Progress{State: ProgressStateProbe, Match: 1, Next: 5}
+	p.becomeReplicate()
+
+	if p.State != ProgressStateReplicate {
+		t.Errorf("state = %s, want %s", p.State, ProgressStateReplicate)
+	}
+	if p.Match != 1 {
+		t.Errorf("match = %d, want 1", p.Match)
+	}
+	if w := p.Match + 1; p.Next != w {
+		t.Errorf("next = %d, want %d", p.Next, w)
+	}
+}
+
+func TestBecomeSnapshot(t *testing.T) {
+	p := &Progress{State: ProgressStateProbe, Match: 1, Next: 5}
+	p.becomeSnapshot(10)
+
+	if p.State != ProgressStateSnapshot {
+		t.Errorf("state = %s, want %s", p.State, ProgressStateSnapshot)
+	}
+	if p.Match != 1 {
+		t.Errorf("match = %d, want 1", p.Match)
+	}
+	if p.PendingSnapshot != 10 {
+		t.Errorf("pendingSnapshot = %d, want 10", p.PendingSnapshot)
+	}
+}
+
 func TestProgressUpdate(t *testing.T) {
 	prevM, prevN := uint64(3), uint64(5)
 	tests := []struct {
 		update uint64
 
-		wm uint64
-		wn uint64
+		wm  uint64
+		wn  uint64
+		wok bool
 	}{
-		{prevM - 1, prevM, prevN},         // do not decrease match, next
-		{prevM, prevM, prevN},             // do not decrease next
-		{prevM + 1, prevM + 1, prevN},     // increase match, do not decrease next
-		{prevM + 2, prevM + 2, prevN + 1}, // increase match, next
+		{prevM - 1, prevM, prevN, false},        // do not decrease match, next
+		{prevM, prevM, prevN, false},            // do not decrease next
+		{prevM + 1, prevM + 1, prevN, true},     // increase match, do not decrease next
+		{prevM + 2, prevM + 2, prevN + 1, true}, // increase match, next
 	}
 	for i, tt := range tests {
 		p := &Progress{
 			Match: prevM,
 			Next:  prevN,
 		}
-		p.update(tt.update)
+		ok := p.maybeUpdate(tt.update)
+		if ok != tt.wok {
+			t.Errorf("#%d: ok= %v, want %v", i, ok, tt.wok)
+		}
 		if p.Match != tt.wm {
 			t.Errorf("#%d: match= %d, want %d", i, p.Match, tt.wm)
 		}
@@ -78,6 +147,7 @@ func TestProgressUpdate(t *testing.T) {
 
 func TestProgressMaybeDecr(t *testing.T) {
 	tests := []struct {
+		state    ProgressStateType
 		m        uint64
 		n        uint64
 		rejected uint64
@@ -87,54 +157,50 @@ func TestProgressMaybeDecr(t *testing.T) {
 		wn uint64
 	}{
 		{
-			// match != 0 is always false
-			1, 0, 0, 0, false, 0,
+			// state replicate and rejected is not greater than match
+			ProgressStateReplicate, 5, 10, 5, 5, false, 10,
 		},
 		{
-			// match != 0 and to is greater than match
-			// directly decrease to match+1
-			5, 10, 5, 5, false, 10,
+			// state replicate and rejected is not greater than match
+			ProgressStateReplicate, 5, 10, 4, 4, false, 10,
 		},
 		{
-			// match != 0 and to is greater than match
+			// state replicate and rejected is greater than match
 			// directly decrease to match+1
-			5, 10, 4, 4, false, 10,
-		},
-		{
-			// match != 0 and to is not greater than match
-			5, 10, 9, 9, true, 6,
+			ProgressStateReplicate, 5, 10, 9, 9, true, 6,
 		},
 		{
 			// next-1 != rejected is always false
-			0, 0, 0, 0, false, 0,
+			ProgressStateProbe, 0, 0, 0, 0, false, 0,
 		},
 		{
 			// next-1 != rejected is always false
-			0, 10, 5, 5, false, 10,
+			ProgressStateProbe, 0, 10, 5, 5, false, 10,
 		},
 		{
 			// next>1 = decremented by 1
-			0, 10, 9, 9, true, 9,
+			ProgressStateProbe, 0, 10, 9, 9, true, 9,
 		},
 		{
 			// next>1 = decremented by 1
-			0, 2, 1, 1, true, 1,
+			ProgressStateProbe, 0, 2, 1, 1, true, 1,
 		},
 		{
 			// next<=1 = reset to 1
-			0, 1, 0, 0, true, 1,
+			ProgressStateProbe, 0, 1, 0, 0, true, 1,
 		},
 		{
 			// decrease to min(rejected, last+1)
-			0, 10, 9, 2, true, 3,
+			ProgressStateProbe, 0, 10, 9, 2, true, 3,
 		},
 		{
 			// rejected < 1, reset to 1
-			0, 10, 9, 0, true, 1,
+			ProgressStateProbe, 0, 10, 9, 0, true, 1,
 		},
 	}
 	for i, tt := range tests {
 		p := &Progress{
+			State: tt.state,
 			Match: tt.m,
 			Next:  tt.n,
 		}
@@ -150,61 +216,63 @@ func TestProgressMaybeDecr(t *testing.T) {
 	}
 }
 
-func TestProgressShouldWait(t *testing.T) {
+func TestProgressIsPaused(t *testing.T) {
 	tests := []struct {
-		m    uint64
-		wait int
+		state  ProgressStateType
+		paused bool
 
 		w bool
 	}{
-		// match != 0 is always not wait
-		{1, 0, false},
-		{1, 1, false},
-		{0, 1, true},
-		{0, 0, false},
+		{ProgressStateProbe, false, false},
+		{ProgressStateProbe, true, true},
+		{ProgressStateReplicate, false, false},
+		{ProgressStateReplicate, true, false},
+		{ProgressStateSnapshot, false, true},
+		{ProgressStateSnapshot, true, true},
 	}
 	for i, tt := range tests {
 		p := &Progress{
-			Match: tt.m,
-			Wait:  tt.wait,
+			State:  tt.state,
+			Paused: tt.paused,
 		}
-		if g := p.shouldWait(); g != tt.w {
+		if g := p.isPaused(); g != tt.w {
 			t.Errorf("#%d: shouldwait = %t, want %t", i, g, tt.w)
 		}
 	}
 }
 
-// TestProgressWaitReset ensures that progress.Update and progress.DercTo
-// will reset progress.wait.
-func TestProgressWaitReset(t *testing.T) {
+// TestProgressResume ensures that progress.maybeUpdate and progress.maybeDecrTo
+// will reset progress.paused.
+func TestProgressResume(t *testing.T) {
 	p := &Progress{
-		Wait: 1,
+		Next:   2,
+		Paused: true,
 	}
 	p.maybeDecrTo(1, 1)
-	if p.Wait != 0 {
-		t.Errorf("wait= %d, want 0", p.Wait)
+	if p.Paused != false {
+		t.Errorf("paused= %v, want false", p.Paused)
 	}
-	p.Wait = 1
-	p.update(2)
-	if p.Wait != 0 {
-		t.Errorf("wait= %d, want 0", p.Wait)
+	p.Paused = true
+	p.maybeUpdate(2)
+	if p.Paused != false {
+		t.Errorf("paused= %v, want false", p.Paused)
 	}
 }
 
-// TestProgressDecr ensures raft.heartbeat decreases progress.wait by heartbeat.
-func TestProgressDecr(t *testing.T) {
+// TestProgressResumeByHeartbeat ensures raft.heartbeat reset progress.paused by heartbeat.
+func TestProgressResumeByHeartbeat(t *testing.T) {
 	r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0)
 	r.becomeCandidate()
 	r.becomeLeader()
-	r.prs[2].Wait = r.heartbeatTimeout * 2
+	r.prs[2].Paused = true
 
 	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
-	if r.prs[2].Wait != r.heartbeatTimeout*(2-1) {
-		t.Errorf("wait = %d, want %d", r.prs[2].Wait, r.heartbeatTimeout*(2-1))
+	if r.prs[2].Paused != false {
+		t.Errorf("paused = %v, want false", r.prs[2].Paused)
 	}
 }
 
-func TestProgressWait(t *testing.T) {
+func TestProgressPaused(t *testing.T) {
 	r := newRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage(), 0)
 	r.becomeCandidate()
 	r.becomeLeader()
@@ -1262,16 +1330,16 @@ func TestLeaderIncreaseNext(t *testing.T) {
 	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
 	tests := []struct {
 		// progress
-		match uint64
+		state ProgressStateType
 		next  uint64
 
 		wnext uint64
 	}{
-		// match is not zero, optimistically increase next
+		// state replicate, optimistically increase next
 		// previous entries + noop entry + propose + 1
-		{1, 2, uint64(len(previousEnts) + 1 + 1 + 1)},
-		// match is zero, not optimistically increase next
-		{0, 2, 2},
+		{ProgressStateReplicate, 2, uint64(len(previousEnts) + 1 + 1 + 1)},
+		// state probe, not optimistically increase next
+		{ProgressStateProbe, 2, 2},
 	}
 
 	for i, tt := range tests {
@@ -1279,7 +1347,8 @@ func TestLeaderIncreaseNext(t *testing.T) {
 		sm.raftLog.append(previousEnts...)
 		sm.becomeCandidate()
 		sm.becomeLeader()
-		sm.prs[2].Match, sm.prs[2].Next = tt.match, tt.next
+		sm.prs[2].State = tt.state
+		sm.prs[2].Next = tt.next
 		sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 
 		p := sm.prs[2]
@@ -1289,41 +1358,32 @@ func TestLeaderIncreaseNext(t *testing.T) {
 	}
 }
 
-func TestUnreachable(t *testing.T) {
-	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
-	s := NewMemoryStorage()
-	s.Append(previousEnts)
-	r := newRaft(1, []uint64{1, 2}, 10, 1, s, 0)
+func TestSendAppendForProgressProbe(t *testing.T) {
+	r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.readMessages()
+	r.prs[2].becomeProbe()
 
-	// set node 2 to unreachable
-	r.prs[2].Match = 3
-	r.prs[2].Next = 5
-	r.prs[2].Wait = 0
-	r.prs[2].unreachable()
-
-	if wnext := r.prs[2].Match + 1; r.prs[2].Next != wnext {
-		t.Errorf("next = %d, want %d", r.prs[2].Next, wnext)
-	}
-
+	// each round is a heartbeat
 	for i := 0; i < 3; i++ {
-		// node 2 is unreachable, we expect that raft will only send out one msgAPP per heartbeat timeout
-		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
+		// we expect that raft will only send out one msgAPP per heartbeat timeout
+		r.appendEntry(pb.Entry{Data: []byte("somedata")})
+		r.sendAppend(2)
 		msg := r.readMessages()
 		if len(msg) != 1 {
 			t.Errorf("len(msg) = %d, want %d", len(msg), 1)
 		}
-		if msg[0].Index != 3 {
-			t.Errorf("index = %d, want %d", msg[0].Index, 3)
+		if msg[0].Index != 0 {
+			t.Errorf("index = %d, want %d", msg[0].Index, 0)
 		}
 
-		if r.prs[2].Wait != r.heartbeatTimeout {
-			t.Errorf("wait = %d, want %d", r.prs[1].Wait, r.heartbeatTimeout)
+		if r.prs[2].Paused != true {
+			t.Errorf("paused = %v, want true", r.prs[2].Paused)
 		}
 		for j := 0; j < 10; j++ {
-			r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
+			r.appendEntry(pb.Entry{Data: []byte("somedata")})
+			r.sendAppend(2)
 			if l := len(r.readMessages()); l != 0 {
 				t.Errorf("len(msg) = %d, want %d", l, 0)
 			}
@@ -1342,11 +1402,18 @@ func TestUnreachable(t *testing.T) {
 			t.Errorf("type = %s, want %s", msg[0].Type, pb.MsgHeartbeat)
 		}
 	}
+}
+
+func TestSendAppendForProgressReplicate(t *testing.T) {
+	r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
+	r.becomeCandidate()
+	r.becomeLeader()
+	r.readMessages()
+	r.prs[2].becomeReplicate()
 
-	// recover node 2
-	r.prs[2].reachable()
 	for i := 0; i < 10; i++ {
-		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
+		r.appendEntry(pb.Entry{Data: []byte("somedata")})
+		r.sendAppend(2)
 		msgs := r.readMessages()
 		if len(msgs) != 1 {
 			t.Errorf("len(msg) = %d, want %d", len(msgs), 1)
@@ -1354,6 +1421,46 @@ func TestUnreachable(t *testing.T) {
 	}
 }
 
+func TestSendAppendForProgressSnapshot(t *testing.T) {
+	r := newRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage(), 0)
+	r.becomeCandidate()
+	r.becomeLeader()
+	r.readMessages()
+	r.prs[2].becomeSnapshot(10)
+
+	for i := 0; i < 10; i++ {
+		r.appendEntry(pb.Entry{Data: []byte("somedata")})
+		r.sendAppend(2)
+		msgs := r.readMessages()
+		if len(msgs) != 0 {
+			t.Errorf("len(msg) = %d, want %d", len(msgs), 0)
+		}
+	}
+}
+
+func TestRecvMsgUnreachable(t *testing.T) {
+	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
+	s := NewMemoryStorage()
+	s.Append(previousEnts)
+	r := newRaft(1, []uint64{1, 2}, 10, 1, s, 0)
+	r.becomeCandidate()
+	r.becomeLeader()
+	r.readMessages()
+	// set node 2 to state replicate
+	r.prs[2].Match = 3
+	r.prs[2].becomeReplicate()
+	r.prs[2].optimisticUpdate(5)
+
+	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})
+
+	if r.prs[2].State != ProgressStateProbe {
+		t.Errorf("state = %s, want %s", r.prs[2].State, ProgressStateProbe)
+	}
+	if wnext := r.prs[2].Match + 1; r.prs[2].Next != wnext {
+		t.Errorf("next = %d, want %d", r.prs[2].Next, wnext)
+	}
+}
+
 func TestRestore(t *testing.T) {
 	s := pb.Snapshot{
 		Metadata: pb.SnapshotMetadata{

+ 1 - 1
raft/status.go

@@ -57,7 +57,7 @@ func (s Status) MarshalJSON() ([]byte, error) {
 		j += "}}"
 	} else {
 		for k, v := range s.Progress {
-			subj := fmt.Sprintf(`"%x":{"match":%d,"next":%d,"unreachable":%t},`, k, v.Match, v.Next, v.Unreachable)
+			subj := fmt.Sprintf(`"%x":{"match":%d,"next":%d,"state":%s},`, k, v.Match, v.Next, v.State)
 			j += subj
 		}
 		// remove the trailing ","