Просмотр исходного кода

raft: log unreachable remote node

Yicheng Qin 11 лет назад
Родитель
Сommit
09f181f585
1 измененных файлов с 21 добавлено и 11 удалено
  1. 21 11
      raft/raft.go

+ 21 - 11
raft/raft.go

@@ -117,11 +117,12 @@ func (pr *Progress) waitDecr(i int) {
 		pr.Wait = 0
 		pr.Wait = 0
 	}
 	}
 }
 }
-func (pr *Progress) waitSet(w int)    { pr.Wait = w }
-func (pr *Progress) waitReset()       { pr.Wait = 0 }
-func (pr *Progress) reachable()       { pr.Unreachable = false }
-func (pr *Progress) unreachable()     { pr.Unreachable = true }
-func (pr *Progress) shouldWait() bool { return (pr.Unreachable || pr.Match == 0) && pr.Wait > 0 }
+func (pr *Progress) waitSet(w int)       { pr.Wait = w }
+func (pr *Progress) waitReset()          { pr.Wait = 0 }
+func (pr *Progress) isUnreachable() bool { return pr.Unreachable }
+func (pr *Progress) reachable()          { pr.Unreachable = false }
+func (pr *Progress) unreachable()        { pr.Unreachable = true }
+func (pr *Progress) shouldWait() bool    { return (pr.Unreachable || pr.Match == 0) && pr.Wait > 0 }
 
 
 func (pr *Progress) hasPendingSnapshot() bool    { return pr.PendingSnapshot != 0 }
 func (pr *Progress) hasPendingSnapshot() bool    { return pr.PendingSnapshot != 0 }
 func (pr *Progress) setPendingSnapshot(i uint64) { pr.PendingSnapshot = i }
 func (pr *Progress) setPendingSnapshot(i uint64) { pr.PendingSnapshot = i }
@@ -269,7 +270,7 @@ func (r *raft) sendAppend(to uint64) {
 	m := pb.Message{}
 	m := pb.Message{}
 	m.To = to
 	m.To = to
 	if r.needSnapshot(pr.Next) {
 	if r.needSnapshot(pr.Next) {
-		if pr.Unreachable {
+		if pr.isUnreachable() {
 			// do not try to send snapshot until the Progress is
 			// do not try to send snapshot until the Progress is
 			// reachable
 			// reachable
 			return
 			return
@@ -297,9 +298,9 @@ func (r *raft) sendAppend(to uint64) {
 		m.Commit = r.raftLog.committed
 		m.Commit = r.raftLog.committed
 		// optimistically increase the next if the follower
 		// optimistically increase the next if the follower
 		// has been matched.
 		// has been matched.
-		if n := len(m.Entries); pr.Match != 0 && !pr.Unreachable && n != 0 {
+		if n := len(m.Entries); pr.Match != 0 && !pr.isUnreachable() && n != 0 {
 			pr.optimisticUpdate(m.Entries[n-1].Index)
 			pr.optimisticUpdate(m.Entries[n-1].Index)
-		} else if pr.Match == 0 || pr.Unreachable {
+		} else if pr.Match == 0 || pr.isUnreachable() {
 			pr.waitSet(r.heartbeatTimeout)
 			pr.waitSet(r.heartbeatTimeout)
 		}
 		}
 	}
 	}
@@ -535,7 +536,10 @@ func stepLeader(r *raft, m pb.Message) {
 		r.appendEntry(m.Entries...)
 		r.appendEntry(m.Entries...)
 		r.bcastAppend()
 		r.bcastAppend()
 	case pb.MsgAppResp:
 	case pb.MsgAppResp:
-		pr.reachable()
+		if pr.isUnreachable() {
+			pr.reachable()
+			log.Printf("raft: %x received msgAppResp from %x and changed it to be reachable [%s]", r.id, m.From, pr)
+		}
 		if m.Reject {
 		if m.Reject {
 			log.Printf("raft: %x received msgApp rejection(lastindex: %d) from %x for index %d",
 			log.Printf("raft: %x received msgApp rejection(lastindex: %d) from %x for index %d",
 				r.id, m.RejectHint, m.From, m.Index)
 				r.id, m.RejectHint, m.From, m.Index)
@@ -558,7 +562,10 @@ func stepLeader(r *raft, m pb.Message) {
 			}
 			}
 		}
 		}
 	case pb.MsgHeartbeatResp:
 	case pb.MsgHeartbeatResp:
-		pr.reachable()
+		if pr.isUnreachable() {
+			pr.reachable()
+			log.Printf("raft: %x received msgHeartbeatResp from %x and changed it to be reachable [%s]", r.id, m.From, pr)
+		}
 		if pr.Match < r.raftLog.lastIndex() {
 		if pr.Match < r.raftLog.lastIndex() {
 			r.sendAppend(m.From)
 			r.sendAppend(m.From)
 		}
 		}
@@ -581,7 +588,10 @@ func stepLeader(r *raft, m pb.Message) {
 			pr.waitSet(r.electionTimeout)
 			pr.waitSet(r.electionTimeout)
 		}
 		}
 	case pb.MsgUnreachable:
 	case pb.MsgUnreachable:
-		r.prs[m.From].unreachable()
+		if !pr.isUnreachable() {
+			pr.unreachable()
+			log.Printf("raft: %x failed to send message to %x and changed it to be unreachable [%s]", r.id, m.From, pr)
+		}
 	}
 	}
 }
 }