Browse Source

raft: use progress tracker APIs in more places

This doesn't completely eliminate access to prs.nodes, but that's not
really necessary. This commit uses the existing APIs in a few more
places where it's convenient, and also sprinkles some assertions.
Tobias Schottdorf 6 years ago
parent
commit
a11563737c
4 changed files with 56 additions and 37 deletions
  1. 37 22
      raft/progress.go
  2. 9 5
      raft/raft.go
  3. 1 0
      raft/raft_test.go
  4. 9 10
      raft/rawnode.go

+ 37 - 22
raft/progress.go

@@ -328,10 +328,35 @@ func (p *prs) committed() uint64 {
 }
 
 func (p *prs) removeAny(id uint64) {
+	pN := p.nodes[id]
+	pL := p.learners[id]
+
+	if pN == nil && pL == nil {
+		panic("attempting to remove unknown peer %x")
+	} else if pN != nil && pL != nil {
+		panic(fmt.Sprintf("peer %x is both voter and learner", id))
+	}
+
 	delete(p.nodes, id)
 	delete(p.learners, id)
 }
 
+// initProgress initializes a new progress for the given node or learner. The
+// node may not exist yet in either form or a panic will ensue.
+func (p *prs) initProgress(id, match, next uint64, isLearner bool) {
+	if pr := p.nodes[id]; pr != nil {
+		panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
+	}
+	if pr := p.learners[id]; pr != nil {
+		panic(fmt.Sprintf("peer %x already tracked as learner %v", id, pr))
+	}
+	if !isLearner {
+		p.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight)}
+		return
+	}
+	p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true}
+}
+
 func (p *prs) getProgress(id uint64) *Progress {
 	if pr, ok := p.nodes[id]; ok {
 		return pr
@@ -340,20 +365,21 @@ func (p *prs) getProgress(id uint64) *Progress {
 	return p.learners[id]
 }
 
-// initProgress initializes a new progress for the given node, replacing any that
-// may exist. It is invalid to replace a voter by a learner and attempts to do so
-// will result in a panic.
-func (p *prs) initProgress(id, match, next uint64, isLearner bool) {
-	if !isLearner {
-		delete(p.learners, id)
-		p.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight)}
-		return
+// visit invokes the supplied closure for all tracked progresses.
+func (p *prs) visit(f func(id uint64, pr *Progress)) {
+	for id, pr := range p.nodes {
+		f(id, pr)
 	}
 
-	if _, ok := p.nodes[id]; ok {
-		panic(fmt.Sprintf("changing from voter to learner for %x", id))
+	for id, pr := range p.learners {
+		f(id, pr)
 	}
-	p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true}
+}
+
+func (p *prs) reset() {
+	p.nodes = map[uint64]*Progress{}
+	p.learners = map[uint64]*Progress{}
+	p.matchBuf = nil
 }
 
 func (p *prs) voterNodes() []uint64 {
@@ -373,14 +399,3 @@ func (p *prs) learnerNodes() []uint64 {
 	sort.Sort(uint64Slice(nodes))
 	return nodes
 }
-
-// visit invokes the supplied closure for all tracked progresses.
-func (p *prs) visit(f func(id uint64, pr *Progress)) {
-	for id, pr := range p.nodes {
-		f(id, pr)
-	}
-
-	for id, pr := range p.learners {
-		f(id, pr)
-	}
-}

+ 9 - 5
raft/raft.go

@@ -577,7 +577,12 @@ func (r *raft) reset(term uint64) {
 
 	r.votes = make(map[uint64]bool)
 	r.prs.visit(func(id uint64, pr *Progress) {
-		*pr = Progress{Next: r.raftLog.lastIndex() + 1, ins: newInflights(r.prs.maxInflight), IsLearner: pr.IsLearner}
+		*pr = Progress{
+			Match:     0,
+			Next:      r.raftLog.lastIndex() + 1,
+			ins:       newInflights(r.prs.maxInflight),
+			IsLearner: pr.IsLearner,
+		}
 		if id == r.id {
 			pr.Match = r.raftLog.lastIndex()
 		}
@@ -938,7 +943,7 @@ func stepLeader(r *raft, m pb.Message) error {
 		if len(m.Entries) == 0 {
 			r.logger.Panicf("%x stepped empty MsgProp", r.id)
 		}
-		if _, ok := r.prs.nodes[r.id]; !ok {
+		if r.prs.getProgress(r.id) == nil {
 			// If we are not currently a member of the range (i.e. this node
 			// was removed from the configuration while serving as leader),
 			// drop any new proposals.
@@ -1314,8 +1319,7 @@ func (r *raft) restore(s pb.Snapshot) bool {
 		r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
 
 	r.raftLog.restore(s)
-	r.prs.nodes = make(map[uint64]*Progress)
-	r.prs.learners = make(map[uint64]*Progress)
+	r.prs.reset()
 	r.restoreNode(s.Metadata.ConfState.Nodes, false)
 	r.restoreNode(s.Metadata.ConfState.Learners, true)
 	return true
@@ -1385,7 +1389,7 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
 func (r *raft) removeNode(id uint64) {
 	r.prs.removeAny(id)
 
-	// do not try to commit or abort transferring if there is no nodes in the cluster.
+	// Do not try to commit or abort transferring if the cluster is now empty.
 	if len(r.prs.nodes) == 0 && len(r.prs.learners) == 0 {
 		return
 	}

+ 1 - 0
raft/raft_test.go

@@ -1351,6 +1351,7 @@ func TestCommit(t *testing.T) {
 		storage.hardState = pb.HardState{Term: tt.smTerm}
 
 		sm := newTestRaft(1, []uint64{1}, 10, 2, storage)
+		sm.prs.removeAny(1)
 		for j := 0; j < len(tt.matches); j++ {
 			sm.prs.initProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false)
 		}

+ 9 - 10
raft/rawnode.go

@@ -257,16 +257,15 @@ const (
 // WithProgress is a helper to introspect the Progress for this node and its
 // peers.
 func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr Progress)) {
-	for id, pr := range rn.raft.prs.nodes {
-		pr := *pr
-		pr.ins = nil
-		visitor(id, ProgressTypePeer, pr)
-	}
-	for id, pr := range rn.raft.prs.learners {
-		pr := *pr
-		pr.ins = nil
-		visitor(id, ProgressTypeLearner, pr)
-	}
+	rn.raft.prs.visit(func(id uint64, pr *Progress) {
+		typ := ProgressTypePeer
+		if pr.IsLearner {
+			typ = ProgressTypeLearner
+		}
+		p := *pr
+		p.ins = nil
+		visitor(id, typ, p)
+	})
 }
 
 // ReportUnreachable reports the given node is not reachable for the last send.