Browse Source

raft: cleanup unreachable

Xiang Li 10 years ago
parent
commit
2185ac5ac8
1 changed files with 21 additions and 7 deletions
  1. 21 7
      raft/raft.go

+ 21 - 7
raft/raft.go

@@ -53,12 +53,17 @@ func (st StateType) String() string {
 type Progress struct {
 	Match, Next uint64
 	Wait        int
+	// If the last sent to the Progress failed and reported
+	// by the link layer via MsgUnreachable, Unreachable will be set.
+	// If the Progress is unreachable, snapshot and optimistically append
+	// will be disabled.
+	// Unreachable will be unset if raft starts to receive message (msgAppResp,
+	// msgHeartbeatResp) from the remote peer of the Progress.
 	Unreachable bool
 }
 
 func (pr *Progress) update(n uint64) {
 	pr.waitReset()
-	pr.reachable()
 
 	if pr.Match < n {
 		pr.Match = n
@@ -74,7 +79,6 @@ func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 }
 // Otherwise it decreases the progress next index to min(rejected, last) and returns true.
 func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
 	pr.waitReset()
-	pr.reachable()
 
 	if pr.Match != 0 {
 		// the rejection must be stale if the progress has matched and "rejected"
@@ -229,6 +233,12 @@ func (r *raft) sendAppend(to uint64) {
 	m := pb.Message{}
 	m.To = to
 	if r.needSnapshot(pr.Next) {
+		if pr.Unreachable {
+			// do not try to send snapshot until the Progress is
+			// reachable
+			return
+		}
+
 		m.Type = pb.MsgSnap
 		snapshot, err := r.raftLog.snapshot()
 		if err != nil {
@@ -468,6 +478,8 @@ func (r *raft) Step(m pb.Message) error {
 type stepFunc func(r *raft, m pb.Message)
 
 func stepLeader(r *raft, m pb.Message) {
+	pr := r.prs[m.From]
+
 	switch m.Type {
 	case pb.MsgBeat:
 		r.bcastHeartbeat()
@@ -486,16 +498,17 @@ func stepLeader(r *raft, m pb.Message) {
 		r.appendEntry(m.Entries...)
 		r.bcastAppend()
 	case pb.MsgAppResp:
+		pr.reachable()
 		if m.Reject {
 			log.Printf("raft: %x received msgApp rejection(lastindex: %d) from %x for index %d",
 				r.id, m.RejectHint, m.From, m.Index)
-			if r.prs[m.From].maybeDecrTo(m.Index, m.RejectHint) {
-				log.Printf("raft: %x decreased progress of %x to [%s]", r.id, m.From, r.prs[m.From])
+			if pr.maybeDecrTo(m.Index, m.RejectHint) {
+				log.Printf("raft: %x decreased progress of %x to [%s]", r.id, m.From, pr)
 				r.sendAppend(m.From)
 			}
 		} else {
-			oldWait := r.prs[m.From].shouldWait()
-			r.prs[m.From].update(m.Index)
+			oldWait := pr.shouldWait()
+			pr.update(m.Index)
 			if r.maybeCommit() {
 				r.bcastAppend()
 			} else if oldWait {
@@ -505,7 +518,8 @@ func stepLeader(r *raft, m pb.Message) {
 			}
 		}
 	case pb.MsgHeartbeatResp:
-		if r.prs[m.From].Match < r.raftLog.lastIndex() {
+		pr.reachable()
+		if pr.Match < r.raftLog.lastIndex() {
 			r.sendAppend(m.From)
 		}
 	case pb.MsgVote: