Browse Source

Merge pull request #10807 from tbg/extract-prs

raft: extract 'tracker' package
Tobias Grieger 6 years ago
parent
commit
948e276ca7

+ 0 - 1
etcdserver/raft.go

@@ -34,7 +34,6 @@ import (
 	"go.etcd.io/etcd/raft/raftpb"
 	"go.etcd.io/etcd/raft/raftpb"
 	"go.etcd.io/etcd/wal"
 	"go.etcd.io/etcd/wal"
 	"go.etcd.io/etcd/wal/walpb"
 	"go.etcd.io/etcd/wal/walpb"
-
 	"go.uber.org/zap"
 	"go.uber.org/zap"
 )
 )
 
 

+ 5 - 5
raft/node.go

@@ -353,15 +353,15 @@ func (n *node) run(r *raft) {
 			}
 			}
 		case m := <-n.recvc:
 		case m := <-n.recvc:
 			// filter out response message from unknown From.
 			// filter out response message from unknown From.
-			if pr := r.prs.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
+			if pr := r.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
 				r.Step(m)
 				r.Step(m)
 			}
 			}
 		case cc := <-n.confc:
 		case cc := <-n.confc:
 			if cc.NodeID == None {
 			if cc.NodeID == None {
 				select {
 				select {
 				case n.confstatec <- pb.ConfState{
 				case n.confstatec <- pb.ConfState{
-					Nodes:    r.prs.voterNodes(),
-					Learners: r.prs.learnerNodes()}:
+					Nodes:    r.prs.VoterNodes(),
+					Learners: r.prs.LearnerNodes()}:
 				case <-n.done:
 				case <-n.done:
 				}
 				}
 				break
 				break
@@ -384,8 +384,8 @@ func (n *node) run(r *raft) {
 			}
 			}
 			select {
 			select {
 			case n.confstatec <- pb.ConfState{
 			case n.confstatec <- pb.ConfState{
-				Nodes:    r.prs.voterNodes(),
-				Learners: r.prs.learnerNodes()}:
+				Nodes:    r.prs.VoterNodes(),
+				Learners: r.prs.LearnerNodes()}:
 			case <-n.done:
 			case <-n.done:
 			}
 			}
 		case <-n.tickc:
 		case <-n.tickc:

+ 0 - 457
raft/progress.go

@@ -1,457 +0,0 @@
-// Copyright 2015 The etcd Authors
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package raft
-
-import (
-	"fmt"
-	"sort"
-
-	"go.etcd.io/etcd/raft/quorum"
-)
-
-const (
-	ProgressStateProbe ProgressStateType = iota
-	ProgressStateReplicate
-	ProgressStateSnapshot
-)
-
-type ProgressStateType uint64
-
-var prstmap = [...]string{
-	"ProgressStateProbe",
-	"ProgressStateReplicate",
-	"ProgressStateSnapshot",
-}
-
-func (st ProgressStateType) String() string { return prstmap[uint64(st)] }
-
-// Progress represents a follower’s progress in the view of the leader. Leader maintains
-// progresses of all followers, and sends entries to the follower based on its progress.
-type Progress struct {
-	Match, Next uint64
-	// State defines how the leader should interact with the follower.
-	//
-	// When in ProgressStateProbe, leader sends at most one replication message
-	// per heartbeat interval. It also probes actual progress of the follower.
-	//
-	// When in ProgressStateReplicate, leader optimistically increases next
-	// to the latest entry sent after sending replication message. This is
-	// an optimized state for fast replicating log entries to the follower.
-	//
-	// When in ProgressStateSnapshot, leader should have sent out snapshot
-	// before and stops sending any replication message.
-	State ProgressStateType
-
-	// Paused is used in ProgressStateProbe.
-	// When Paused is true, raft should pause sending replication message to this peer.
-	Paused bool
-	// PendingSnapshot is used in ProgressStateSnapshot.
-	// If there is a pending snapshot, the pendingSnapshot will be set to the
-	// index of the snapshot. If pendingSnapshot is set, the replication process of
-	// this Progress will be paused. raft will not resend snapshot until the pending one
-	// is reported to be failed.
-	PendingSnapshot uint64
-
-	// RecentActive is true if the progress is recently active. Receiving any messages
-	// from the corresponding follower indicates the progress is active.
-	// RecentActive can be reset to false after an election timeout.
-	RecentActive bool
-
-	// inflights is a sliding window for the inflight messages.
-	// Each inflight message contains one or more log entries.
-	// The max number of entries per message is defined in raft config as MaxSizePerMsg.
-	// Thus inflight effectively limits both the number of inflight messages
-	// and the bandwidth each Progress can use.
-	// When inflights is full, no more message should be sent.
-	// When a leader sends out a message, the index of the last
-	// entry should be added to inflights. The index MUST be added
-	// into inflights in order.
-	// When a leader receives a reply, the previous inflights should
-	// be freed by calling inflights.freeTo with the index of the last
-	// received entry.
-	ins *inflights
-
-	// IsLearner is true if this progress is tracked for a learner.
-	IsLearner bool
-}
-
-func (pr *Progress) resetState(state ProgressStateType) {
-	pr.Paused = false
-	pr.PendingSnapshot = 0
-	pr.State = state
-	pr.ins.reset()
-}
-
-func (pr *Progress) becomeProbe() {
-	// If the original state is ProgressStateSnapshot, progress knows that
-	// the pending snapshot has been sent to this peer successfully, then
-	// probes from pendingSnapshot + 1.
-	if pr.State == ProgressStateSnapshot {
-		pendingSnapshot := pr.PendingSnapshot
-		pr.resetState(ProgressStateProbe)
-		pr.Next = max(pr.Match+1, pendingSnapshot+1)
-	} else {
-		pr.resetState(ProgressStateProbe)
-		pr.Next = pr.Match + 1
-	}
-}
-
-func (pr *Progress) becomeReplicate() {
-	pr.resetState(ProgressStateReplicate)
-	pr.Next = pr.Match + 1
-}
-
-func (pr *Progress) becomeSnapshot(snapshoti uint64) {
-	pr.resetState(ProgressStateSnapshot)
-	pr.PendingSnapshot = snapshoti
-}
-
-// maybeUpdate returns false if the given n index comes from an outdated message.
-// Otherwise it updates the progress and returns true.
-func (pr *Progress) maybeUpdate(n uint64) bool {
-	var updated bool
-	if pr.Match < n {
-		pr.Match = n
-		updated = true
-		pr.resume()
-	}
-	if pr.Next < n+1 {
-		pr.Next = n + 1
-	}
-	return updated
-}
-
-func (pr *Progress) optimisticUpdate(n uint64) { pr.Next = n + 1 }
-
-// maybeDecrTo returns false if the given to index comes from an out of order message.
-// Otherwise it decreases the progress next index to min(rejected, last) and returns true.
-func (pr *Progress) maybeDecrTo(rejected, last uint64) bool {
-	if pr.State == ProgressStateReplicate {
-		// the rejection must be stale if the progress has matched and "rejected"
-		// is smaller than "match".
-		if rejected <= pr.Match {
-			return false
-		}
-		// directly decrease next to match + 1
-		pr.Next = pr.Match + 1
-		return true
-	}
-
-	// the rejection must be stale if "rejected" does not match next - 1
-	if pr.Next-1 != rejected {
-		return false
-	}
-
-	if pr.Next = min(rejected, last+1); pr.Next < 1 {
-		pr.Next = 1
-	}
-	pr.resume()
-	return true
-}
-
-func (pr *Progress) pause()  { pr.Paused = true }
-func (pr *Progress) resume() { pr.Paused = false }
-
-// IsPaused returns whether sending log entries to this node has been
-// paused. A node may be paused because it has rejected recent
-// MsgApps, is currently waiting for a snapshot, or has reached the
-// MaxInflightMsgs limit.
-func (pr *Progress) IsPaused() bool {
-	switch pr.State {
-	case ProgressStateProbe:
-		return pr.Paused
-	case ProgressStateReplicate:
-		return pr.ins.full()
-	case ProgressStateSnapshot:
-		return true
-	default:
-		panic("unexpected state")
-	}
-}
-
-func (pr *Progress) snapshotFailure() { pr.PendingSnapshot = 0 }
-
-// needSnapshotAbort returns true if snapshot progress's Match
-// is equal or higher than the pendingSnapshot.
-func (pr *Progress) needSnapshotAbort() bool {
-	return pr.State == ProgressStateSnapshot && pr.Match >= pr.PendingSnapshot
-}
-
-func (pr *Progress) String() string {
-	return fmt.Sprintf("next = %d, match = %d, state = %s, waiting = %v, pendingSnapshot = %d, recentActive = %v, isLearner = %v",
-		pr.Next, pr.Match, pr.State, pr.IsPaused(), pr.PendingSnapshot, pr.RecentActive, pr.IsLearner)
-}
-
-type inflights struct {
-	// the starting index in the buffer
-	start int
-	// number of inflights in the buffer
-	count int
-
-	// the size of the buffer
-	size int
-
-	// buffer contains the index of the last entry
-	// inside one message.
-	buffer []uint64
-}
-
-func newInflights(size int) *inflights {
-	return &inflights{
-		size: size,
-	}
-}
-
-// add adds an inflight into inflights
-func (in *inflights) add(inflight uint64) {
-	if in.full() {
-		panic("cannot add into a full inflights")
-	}
-	next := in.start + in.count
-	size := in.size
-	if next >= size {
-		next -= size
-	}
-	if next >= len(in.buffer) {
-		in.growBuf()
-	}
-	in.buffer[next] = inflight
-	in.count++
-}
-
-// grow the inflight buffer by doubling up to inflights.size. We grow on demand
-// instead of preallocating to inflights.size to handle systems which have
-// thousands of Raft groups per process.
-func (in *inflights) growBuf() {
-	newSize := len(in.buffer) * 2
-	if newSize == 0 {
-		newSize = 1
-	} else if newSize > in.size {
-		newSize = in.size
-	}
-	newBuffer := make([]uint64, newSize)
-	copy(newBuffer, in.buffer)
-	in.buffer = newBuffer
-}
-
-// freeTo frees the inflights smaller or equal to the given `to` flight.
-func (in *inflights) freeTo(to uint64) {
-	if in.count == 0 || to < in.buffer[in.start] {
-		// out of the left side of the window
-		return
-	}
-
-	idx := in.start
-	var i int
-	for i = 0; i < in.count; i++ {
-		if to < in.buffer[idx] { // found the first large inflight
-			break
-		}
-
-		// increase index and maybe rotate
-		size := in.size
-		if idx++; idx >= size {
-			idx -= size
-		}
-	}
-	// free i inflights and set new start index
-	in.count -= i
-	in.start = idx
-	if in.count == 0 {
-		// inflights is empty, reset the start index so that we don't grow the
-		// buffer unnecessarily.
-		in.start = 0
-	}
-}
-
-func (in *inflights) freeFirstOne() { in.freeTo(in.buffer[in.start]) }
-
-// full returns true if the inflights is full.
-func (in *inflights) full() bool {
-	return in.count == in.size
-}
-
-// resets frees all inflights.
-func (in *inflights) reset() {
-	in.count = 0
-	in.start = 0
-}
-
-// progressTracker tracks the currently active configuration and the information
-// known about the nodes and learners in it. In particular, it tracks the match
-// index for each peer which in turn allows reasoning about the committed index.
-type progressTracker struct {
-	voters   quorum.JointConfig
-	learners map[uint64]struct{}
-	prs      map[uint64]*Progress
-
-	votes map[uint64]bool
-
-	maxInflight int
-}
-
-func makeProgressTracker(maxInflight int) progressTracker {
-	p := progressTracker{
-		maxInflight: maxInflight,
-		voters: quorum.JointConfig{
-			quorum.MajorityConfig{},
-			quorum.MajorityConfig{},
-		},
-		learners: map[uint64]struct{}{},
-		votes:    map[uint64]bool{},
-		prs:      map[uint64]*Progress{},
-	}
-	return p
-}
-
-// isSingleton returns true if (and only if) there is only one voting member
-// (i.e. the leader) in the current configuration.
-func (p *progressTracker) isSingleton() bool {
-	return len(p.voters[0]) == 1 && len(p.voters[1]) == 0
-}
-
-type progressAckIndexer map[uint64]*Progress
-
-var _ quorum.AckedIndexer = progressAckIndexer(nil)
-
-func (l progressAckIndexer) AckedIndex(id uint64) (quorum.Index, bool) {
-	pr, ok := l[id]
-	if !ok {
-		return 0, false
-	}
-	return quorum.Index(pr.Match), true
-}
-
-// committed returns the largest log index known to be committed based on what
-// the voting members of the group have acknowledged.
-func (p *progressTracker) committed() uint64 {
-	return uint64(p.voters.CommittedIndex(progressAckIndexer(p.prs)))
-}
-
-func (p *progressTracker) removeAny(id uint64) {
-	_, okPR := p.prs[id]
-	_, okV1 := p.voters[0][id]
-	_, okV2 := p.voters[1][id]
-	_, okL := p.learners[id]
-
-	okV := okV1 || okV2
-
-	if !okPR {
-		panic("attempting to remove unknown peer %x")
-	} else if !okV && !okL {
-		panic("attempting to remove unknown peer %x")
-	} else if okV && okL {
-		panic(fmt.Sprintf("peer %x is both voter and learner", id))
-	}
-
-	delete(p.voters[0], id)
-	delete(p.voters[1], id)
-	delete(p.learners, id)
-	delete(p.prs, id)
-}
-
-// initProgress initializes a new progress for the given node or learner. The
-// node may not exist yet in either form or a panic will ensue.
-func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) {
-	if pr := p.prs[id]; pr != nil {
-		panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
-	}
-	if !isLearner {
-		p.voters[0][id] = struct{}{}
-	} else {
-		p.learners[id] = struct{}{}
-	}
-	p.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: isLearner}
-}
-
-func (p *progressTracker) getProgress(id uint64) *Progress {
-	return p.prs[id]
-}
-
-// visit invokes the supplied closure for all tracked progresses.
-func (p *progressTracker) visit(f func(id uint64, pr *Progress)) {
-	for id, pr := range p.prs {
-		f(id, pr)
-	}
-}
-
-// checkQuorumActive returns true if the quorum is active from
-// the view of the local raft state machine. Otherwise, it returns
-// false.
-func (p *progressTracker) quorumActive() bool {
-	votes := map[uint64]bool{}
-	p.visit(func(id uint64, pr *Progress) {
-		if pr.IsLearner {
-			return
-		}
-		votes[id] = pr.RecentActive
-	})
-
-	return p.voters.VoteResult(votes) == quorum.VoteWon
-}
-
-func (p *progressTracker) voterNodes() []uint64 {
-	m := p.voters.IDs()
-	nodes := make([]uint64, 0, len(m))
-	for id := range m {
-		nodes = append(nodes, id)
-	}
-	sort.Sort(uint64Slice(nodes))
-	return nodes
-}
-
-func (p *progressTracker) learnerNodes() []uint64 {
-	nodes := make([]uint64, 0, len(p.learners))
-	for id := range p.learners {
-		nodes = append(nodes, id)
-	}
-	sort.Sort(uint64Slice(nodes))
-	return nodes
-}
-
-// resetVotes prepares for a new round of vote counting via recordVote.
-func (p *progressTracker) resetVotes() {
-	p.votes = map[uint64]bool{}
-}
-
-// recordVote records that the node with the given id voted for this Raft
-// instance if v == true (and declined it otherwise).
-func (p *progressTracker) recordVote(id uint64, v bool) {
-	_, ok := p.votes[id]
-	if !ok {
-		p.votes[id] = v
-	}
-}
-
-// tallyVotes returns the number of granted and rejected votes, and whether the
-// election outcome is known.
-func (p *progressTracker) tallyVotes() (granted int, rejected int, _ quorum.VoteResult) {
-	// Make sure to populate granted/rejected correctly even if the votes slice
-	// contains members no longer part of the configuration. This doesn't really
-	// matter in the way the numbers are used (they're informational), but might
-	// as well get it right.
-	for id, pr := range p.prs {
-		if pr.IsLearner {
-			continue
-		}
-		if p.votes[id] {
-			granted++
-		} else {
-			rejected++
-		}
-	}
-	result := p.voters.VoteResult(p.votes)
-	return granted, rejected, result
-}

+ 75 - 68
raft/raft.go

@@ -26,6 +26,7 @@ import (
 
 
 	"go.etcd.io/etcd/raft/quorum"
 	"go.etcd.io/etcd/raft/quorum"
 	pb "go.etcd.io/etcd/raft/raftpb"
 	pb "go.etcd.io/etcd/raft/raftpb"
+	"go.etcd.io/etcd/raft/tracker"
 )
 )
 
 
 // None is a placeholder node ID used when there is no leader.
 // None is a placeholder node ID used when there is no leader.
@@ -261,7 +262,7 @@ type raft struct {
 
 
 	maxMsgSize         uint64
 	maxMsgSize         uint64
 	maxUncommittedSize uint64
 	maxUncommittedSize uint64
-	prs                progressTracker
+	prs                tracker.ProgressTracker
 
 
 	state StateType
 	state StateType
 
 
@@ -344,7 +345,7 @@ func newRaft(c *Config) *raft {
 		raftLog:                   raftlog,
 		raftLog:                   raftlog,
 		maxMsgSize:                c.MaxSizePerMsg,
 		maxMsgSize:                c.MaxSizePerMsg,
 		maxUncommittedSize:        c.MaxUncommittedEntriesSize,
 		maxUncommittedSize:        c.MaxUncommittedEntriesSize,
-		prs:                       makeProgressTracker(c.MaxInflightMsgs),
+		prs:                       tracker.MakeProgressTracker(c.MaxInflightMsgs),
 		electionTimeout:           c.ElectionTick,
 		electionTimeout:           c.ElectionTick,
 		heartbeatTimeout:          c.HeartbeatTick,
 		heartbeatTimeout:          c.HeartbeatTick,
 		logger:                    c.Logger,
 		logger:                    c.Logger,
@@ -355,11 +356,11 @@ func newRaft(c *Config) *raft {
 	}
 	}
 	for _, p := range peers {
 	for _, p := range peers {
 		// Add node to active config.
 		// Add node to active config.
-		r.prs.initProgress(p, 0 /* match */, 1 /* next */, false /* isLearner */)
+		r.prs.InitProgress(p, 0 /* match */, 1 /* next */, false /* isLearner */)
 	}
 	}
 	for _, p := range learners {
 	for _, p := range learners {
 		// Add learner to active config.
 		// Add learner to active config.
-		r.prs.initProgress(p, 0 /* match */, 1 /* next */, true /* isLearner */)
+		r.prs.InitProgress(p, 0 /* match */, 1 /* next */, true /* isLearner */)
 
 
 		if r.id == p {
 		if r.id == p {
 			r.isLearner = true
 			r.isLearner = true
@@ -375,7 +376,7 @@ func newRaft(c *Config) *raft {
 	r.becomeFollower(r.Term, None)
 	r.becomeFollower(r.Term, None)
 
 
 	var nodesStrs []string
 	var nodesStrs []string
-	for _, n := range r.prs.voterNodes() {
+	for _, n := range r.prs.VoterNodes() {
 		nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
 		nodesStrs = append(nodesStrs, fmt.Sprintf("%x", n))
 	}
 	}
 
 
@@ -442,7 +443,7 @@ func (r *raft) sendAppend(to uint64) {
 // ("empty" messages are useful to convey updated Commit indexes, but
 // ("empty" messages are useful to convey updated Commit indexes, but
 // are undesirable when we're sending multiple messages in a batch).
 // are undesirable when we're sending multiple messages in a batch).
 func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
 func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
-	pr := r.prs.getProgress(to)
+	pr := r.prs.Progress[to]
 	if pr.IsPaused() {
 	if pr.IsPaused() {
 		return false
 		return false
 	}
 	}
@@ -477,7 +478,7 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
 		sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
 		sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
 		r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
 		r.logger.Debugf("%x [firstindex: %d, commit: %d] sent snapshot[index: %d, term: %d] to %x [%s]",
 			r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
 			r.id, r.raftLog.firstIndex(), r.raftLog.committed, sindex, sterm, to, pr)
-		pr.becomeSnapshot(sindex)
+		pr.BecomeSnapshot(sindex)
 		r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
 		r.logger.Debugf("%x paused sending replication messages to %x [%s]", r.id, to, pr)
 	} else {
 	} else {
 		m.Type = pb.MsgApp
 		m.Type = pb.MsgApp
@@ -487,13 +488,13 @@ func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
 		m.Commit = r.raftLog.committed
 		m.Commit = r.raftLog.committed
 		if n := len(m.Entries); n != 0 {
 		if n := len(m.Entries); n != 0 {
 			switch pr.State {
 			switch pr.State {
-			// optimistically increase the next when in ProgressStateReplicate
-			case ProgressStateReplicate:
+			// optimistically increase the next when in StateReplicate
+			case tracker.StateReplicate:
 				last := m.Entries[n-1].Index
 				last := m.Entries[n-1].Index
-				pr.optimisticUpdate(last)
-				pr.ins.add(last)
-			case ProgressStateProbe:
-				pr.pause()
+				pr.OptimisticUpdate(last)
+				pr.Inflights.Add(last)
+			case tracker.StateProbe:
+				pr.ProbeSent = true
 			default:
 			default:
 				r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
 				r.logger.Panicf("%x is sending append in unhandled state %s", r.id, pr.State)
 			}
 			}
@@ -511,7 +512,7 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
 	// or it might not have all the committed entries.
 	// or it might not have all the committed entries.
 	// The leader MUST NOT forward the follower's commit to
 	// The leader MUST NOT forward the follower's commit to
 	// an unmatched index.
 	// an unmatched index.
-	commit := min(r.prs.getProgress(to).Match, r.raftLog.committed)
+	commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
 	m := pb.Message{
 	m := pb.Message{
 		To:      to,
 		To:      to,
 		Type:    pb.MsgHeartbeat,
 		Type:    pb.MsgHeartbeat,
@@ -525,7 +526,7 @@ func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
 // bcastAppend sends RPC, with entries to all peers that are not up-to-date
 // bcastAppend sends RPC, with entries to all peers that are not up-to-date
 // according to the progress recorded in r.prs.
 // according to the progress recorded in r.prs.
 func (r *raft) bcastAppend() {
 func (r *raft) bcastAppend() {
-	r.prs.visit(func(id uint64, _ *Progress) {
+	r.prs.Visit(func(id uint64, _ *tracker.Progress) {
 		if id == r.id {
 		if id == r.id {
 			return
 			return
 		}
 		}
@@ -545,7 +546,7 @@ func (r *raft) bcastHeartbeat() {
 }
 }
 
 
 func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
 func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
-	r.prs.visit(func(id uint64, _ *Progress) {
+	r.prs.Visit(func(id uint64, _ *tracker.Progress) {
 		if id == r.id {
 		if id == r.id {
 			return
 			return
 		}
 		}
@@ -557,7 +558,7 @@ func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
 // the commit index changed (in which case the caller should call
 // the commit index changed (in which case the caller should call
 // r.bcastAppend).
 // r.bcastAppend).
 func (r *raft) maybeCommit() bool {
 func (r *raft) maybeCommit() bool {
-	mci := r.prs.committed()
+	mci := r.prs.Committed()
 	return r.raftLog.maybeCommit(mci, r.Term)
 	return r.raftLog.maybeCommit(mci, r.Term)
 }
 }
 
 
@@ -574,12 +575,12 @@ func (r *raft) reset(term uint64) {
 
 
 	r.abortLeaderTransfer()
 	r.abortLeaderTransfer()
 
 
-	r.prs.resetVotes()
-	r.prs.visit(func(id uint64, pr *Progress) {
-		*pr = Progress{
+	r.prs.ResetVotes()
+	r.prs.Visit(func(id uint64, pr *tracker.Progress) {
+		*pr = tracker.Progress{
 			Match:     0,
 			Match:     0,
 			Next:      r.raftLog.lastIndex() + 1,
 			Next:      r.raftLog.lastIndex() + 1,
-			ins:       newInflights(r.prs.maxInflight),
+			Inflights: tracker.NewInflights(r.prs.MaxInflight),
 			IsLearner: pr.IsLearner,
 			IsLearner: pr.IsLearner,
 		}
 		}
 		if id == r.id {
 		if id == r.id {
@@ -609,7 +610,7 @@ func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
 	}
 	}
 	// use latest "last" index after truncate/append
 	// use latest "last" index after truncate/append
 	li = r.raftLog.append(es...)
 	li = r.raftLog.append(es...)
-	r.prs.getProgress(r.id).maybeUpdate(li)
+	r.prs.Progress[r.id].MaybeUpdate(li)
 	// Regardless of maybeCommit's return, our caller will call bcastAppend.
 	// Regardless of maybeCommit's return, our caller will call bcastAppend.
 	r.maybeCommit()
 	r.maybeCommit()
 	return true
 	return true
@@ -682,7 +683,7 @@ func (r *raft) becomePreCandidate() {
 	// but doesn't change anything else. In particular it does not increase
 	// but doesn't change anything else. In particular it does not increase
 	// r.Term or change r.Vote.
 	// r.Term or change r.Vote.
 	r.step = stepCandidate
 	r.step = stepCandidate
-	r.prs.resetVotes()
+	r.prs.ResetVotes()
 	r.tick = r.tickElection
 	r.tick = r.tickElection
 	r.lead = None
 	r.lead = None
 	r.state = StatePreCandidate
 	r.state = StatePreCandidate
@@ -703,7 +704,7 @@ func (r *raft) becomeLeader() {
 	// (perhaps after having received a snapshot as a result). The leader is
 	// (perhaps after having received a snapshot as a result). The leader is
 	// trivially in this state. Note that r.reset() has initialized this
 	// trivially in this state. Note that r.reset() has initialized this
 	// progress with the last index already.
 	// progress with the last index already.
-	r.prs.getProgress(r.id).becomeReplicate()
+	r.prs.Progress[r.id].BecomeReplicate()
 
 
 	// Conservatively set the pendingConfIndex to the last index in the
 	// Conservatively set the pendingConfIndex to the last index in the
 	// log. There may or may not be a pending config change, but it's
 	// log. There may or may not be a pending config change, but it's
@@ -755,7 +756,7 @@ func (r *raft) campaign(t CampaignType) {
 		}
 		}
 		return
 		return
 	}
 	}
-	for id := range r.prs.voters.IDs() {
+	for id := range r.prs.Voters.IDs() {
 		if id == r.id {
 		if id == r.id {
 			continue
 			continue
 		}
 		}
@@ -776,8 +777,8 @@ func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected
 	} else {
 	} else {
 		r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
 		r.logger.Infof("%x received %s rejection from %x at term %d", r.id, t, id, r.Term)
 	}
 	}
-	r.prs.recordVote(id, v)
-	return r.prs.tallyVotes()
+	r.prs.RecordVote(id, v)
+	return r.prs.TallyVotes()
 }
 }
 
 
 func (r *raft) Step(m pb.Message) error {
 func (r *raft) Step(m pb.Message) error {
@@ -943,16 +944,16 @@ func stepLeader(r *raft, m pb.Message) error {
 		//
 		//
 		// TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
 		// TODO(tbg): I added a TODO in removeNode, it doesn't seem that the
 		// leader steps down when removing itself. I might be missing something.
 		// leader steps down when removing itself. I might be missing something.
-		if pr := r.prs.getProgress(r.id); pr != nil {
+		if pr := r.prs.Progress[r.id]; pr != nil {
 			pr.RecentActive = true
 			pr.RecentActive = true
 		}
 		}
-		if !r.prs.quorumActive() {
+		if !r.prs.QuorumActive() {
 			r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
 			r.logger.Warningf("%x stepped down to follower since quorum is not active", r.id)
 			r.becomeFollower(r.Term, None)
 			r.becomeFollower(r.Term, None)
 		}
 		}
 		// Mark everyone (but ourselves) as inactive in preparation for the next
 		// Mark everyone (but ourselves) as inactive in preparation for the next
 		// CheckQuorum.
 		// CheckQuorum.
-		r.prs.visit(func(id uint64, pr *Progress) {
+		r.prs.Visit(func(id uint64, pr *tracker.Progress) {
 			if id != r.id {
 			if id != r.id {
 				pr.RecentActive = false
 				pr.RecentActive = false
 			}
 			}
@@ -962,7 +963,7 @@ func stepLeader(r *raft, m pb.Message) error {
 		if len(m.Entries) == 0 {
 		if len(m.Entries) == 0 {
 			r.logger.Panicf("%x stepped empty MsgProp", r.id)
 			r.logger.Panicf("%x stepped empty MsgProp", r.id)
 		}
 		}
-		if r.prs.getProgress(r.id) == nil {
+		if r.prs.Progress[r.id] == nil {
 			// If we are not currently a member of the range (i.e. this node
 			// If we are not currently a member of the range (i.e. this node
 			// was removed from the configuration while serving as leader),
 			// was removed from the configuration while serving as leader),
 			// drop any new proposals.
 			// drop any new proposals.
@@ -994,7 +995,7 @@ func stepLeader(r *raft, m pb.Message) error {
 	case pb.MsgReadIndex:
 	case pb.MsgReadIndex:
 		// If more than the local vote is needed, go through a full broadcast,
 		// If more than the local vote is needed, go through a full broadcast,
 		// otherwise optimize.
 		// otherwise optimize.
-		if !r.prs.isSingleton() {
+		if !r.prs.IsSingleton() {
 			if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
 			if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
 				// Reject read only request when this leader has not committed any log entry at its term.
 				// Reject read only request when this leader has not committed any log entry at its term.
 				return nil
 				return nil
@@ -1029,7 +1030,7 @@ func stepLeader(r *raft, m pb.Message) error {
 	}
 	}
 
 
 	// All other message types require a progress for m.From (pr).
 	// All other message types require a progress for m.From (pr).
-	pr := r.prs.getProgress(m.From)
+	pr := r.prs.Progress[m.From]
 	if pr == nil {
 	if pr == nil {
 		r.logger.Debugf("%x no progress available for %x", r.id, m.From)
 		r.logger.Debugf("%x no progress available for %x", r.id, m.From)
 		return nil
 		return nil
@@ -1041,30 +1042,30 @@ func stepLeader(r *raft, m pb.Message) error {
 		if m.Reject {
 		if m.Reject {
 			r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
 			r.logger.Debugf("%x received msgApp rejection(lastindex: %d) from %x for index %d",
 				r.id, m.RejectHint, m.From, m.Index)
 				r.id, m.RejectHint, m.From, m.Index)
-			if pr.maybeDecrTo(m.Index, m.RejectHint) {
+			if pr.MaybeDecrTo(m.Index, m.RejectHint) {
 				r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
 				r.logger.Debugf("%x decreased progress of %x to [%s]", r.id, m.From, pr)
-				if pr.State == ProgressStateReplicate {
-					pr.becomeProbe()
+				if pr.State == tracker.StateReplicate {
+					pr.BecomeProbe()
 				}
 				}
 				r.sendAppend(m.From)
 				r.sendAppend(m.From)
 			}
 			}
 		} else {
 		} else {
 			oldPaused := pr.IsPaused()
 			oldPaused := pr.IsPaused()
-			if pr.maybeUpdate(m.Index) {
+			if pr.MaybeUpdate(m.Index) {
 				switch {
 				switch {
-				case pr.State == ProgressStateProbe:
-					pr.becomeReplicate()
-				case pr.State == ProgressStateSnapshot && pr.needSnapshotAbort():
-					r.logger.Debugf("%x snapshot aborted, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
+				case pr.State == tracker.StateProbe:
+					pr.BecomeReplicate()
+				case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
+					r.logger.Debugf("%x recovered from needing snapshot, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
 					// Transition back to replicating state via probing state
 					// Transition back to replicating state via probing state
 					// (which takes the snapshot into account). If we didn't
 					// (which takes the snapshot into account). If we didn't
 					// move to replicating state, that would only happen with
 					// move to replicating state, that would only happen with
 					// the next round of appends (but there may not be a next
 					// the next round of appends (but there may not be a next
 					// round for a while, exposing an inconsistent RaftStatus).
 					// round for a while, exposing an inconsistent RaftStatus).
-					pr.becomeProbe()
-					pr.becomeReplicate()
-				case pr.State == ProgressStateReplicate:
-					pr.ins.freeTo(m.Index)
+					pr.BecomeProbe()
+					pr.BecomeReplicate()
+				case pr.State == tracker.StateReplicate:
+					pr.Inflights.FreeLE(m.Index)
 				}
 				}
 
 
 				if r.maybeCommit() {
 				if r.maybeCommit() {
@@ -1091,11 +1092,11 @@ func stepLeader(r *raft, m pb.Message) error {
 		}
 		}
 	case pb.MsgHeartbeatResp:
 	case pb.MsgHeartbeatResp:
 		pr.RecentActive = true
 		pr.RecentActive = true
-		pr.resume()
+		pr.ProbeSent = false
 
 
 		// free one slot for the full inflights window to allow progress.
 		// free one slot for the full inflights window to allow progress.
-		if pr.State == ProgressStateReplicate && pr.ins.full() {
-			pr.ins.freeFirstOne()
+		if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
+			pr.Inflights.FreeFirstOne()
 		}
 		}
 		if pr.Match < r.raftLog.lastIndex() {
 		if pr.Match < r.raftLog.lastIndex() {
 			r.sendAppend(m.From)
 			r.sendAppend(m.From)
@@ -1105,7 +1106,7 @@ func stepLeader(r *raft, m pb.Message) error {
 			return nil
 			return nil
 		}
 		}
 
 
-		if r.prs.voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
+		if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
 			return nil
 			return nil
 		}
 		}
 
 
@@ -1119,26 +1120,32 @@ func stepLeader(r *raft, m pb.Message) error {
 			}
 			}
 		}
 		}
 	case pb.MsgSnapStatus:
 	case pb.MsgSnapStatus:
-		if pr.State != ProgressStateSnapshot {
+		if pr.State != tracker.StateSnapshot {
 			return nil
 			return nil
 		}
 		}
+		// TODO(tbg): this code is very similar to the snapshot handling in
+		// MsgAppResp above. In fact, the code there is more correct than the
+		// code here and should likely be updated to match (or even better, the
+		// logic pulled into a newly created Progress state machine handler).
 		if !m.Reject {
 		if !m.Reject {
-			pr.becomeProbe()
+			pr.BecomeProbe()
 			r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
 			r.logger.Debugf("%x snapshot succeeded, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
 		} else {
 		} else {
-			pr.snapshotFailure()
-			pr.becomeProbe()
+			// NB: the order here matters or we'll be probing erroneously from
+			// the snapshot index, but the snapshot never applied.
+			pr.PendingSnapshot = 0
+			pr.BecomeProbe()
 			r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
 			r.logger.Debugf("%x snapshot failed, resumed sending replication messages to %x [%s]", r.id, m.From, pr)
 		}
 		}
 		// If snapshot finish, wait for the msgAppResp from the remote node before sending
 		// If snapshot finish, wait for the msgAppResp from the remote node before sending
 		// out the next msgApp.
 		// out the next msgApp.
 		// If snapshot failure, wait for a heartbeat interval before next try
 		// If snapshot failure, wait for a heartbeat interval before next try
-		pr.pause()
+		pr.ProbeSent = true
 	case pb.MsgUnreachable:
 	case pb.MsgUnreachable:
 		// During optimistic replication, if the remote becomes unreachable,
 		// During optimistic replication, if the remote becomes unreachable,
 		// there is huge probability that a MsgApp is lost.
 		// there is huge probability that a MsgApp is lost.
-		if pr.State == ProgressStateReplicate {
-			pr.becomeProbe()
+		if pr.State == tracker.StateReplicate {
+			pr.BecomeProbe()
 		}
 		}
 		r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
 		r.logger.Debugf("%x failed to send message to %x because it is unreachable [%s]", r.id, m.From, pr)
 	case pb.MsgTransferLeader:
 	case pb.MsgTransferLeader:
@@ -1341,7 +1348,7 @@ func (r *raft) restore(s pb.Snapshot) bool {
 		r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
 		r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
 
 
 	r.raftLog.restore(s)
 	r.raftLog.restore(s)
-	r.prs = makeProgressTracker(r.prs.maxInflight)
+	r.prs = tracker.MakeProgressTracker(r.prs.MaxInflight)
 	r.restoreNode(s.Metadata.ConfState.Nodes, false)
 	r.restoreNode(s.Metadata.ConfState.Nodes, false)
 	r.restoreNode(s.Metadata.ConfState.Learners, true)
 	r.restoreNode(s.Metadata.ConfState.Learners, true)
 	return true
 	return true
@@ -1354,15 +1361,15 @@ func (r *raft) restoreNode(nodes []uint64, isLearner bool) {
 			match = next - 1
 			match = next - 1
 			r.isLearner = isLearner
 			r.isLearner = isLearner
 		}
 		}
-		r.prs.initProgress(n, match, next, isLearner)
-		r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.getProgress(n))
+		r.prs.InitProgress(n, match, next, isLearner)
+		r.logger.Infof("%x restored progress of %x [%s]", r.id, n, r.prs.Progress[n])
 	}
 	}
 }
 }
 
 
 // promotable indicates whether state machine can be promoted to leader,
 // promotable indicates whether state machine can be promoted to leader,
 // which is true when its own id is in progress list.
 // which is true when its own id is in progress list.
 func (r *raft) promotable() bool {
 func (r *raft) promotable() bool {
-	pr := r.prs.getProgress(r.id)
+	pr := r.prs.Progress[r.id]
 	return pr != nil && !pr.IsLearner
 	return pr != nil && !pr.IsLearner
 }
 }
 
 
@@ -1375,9 +1382,9 @@ func (r *raft) addLearner(id uint64) {
 }
 }
 
 
 func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
 func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
-	pr := r.prs.getProgress(id)
+	pr := r.prs.Progress[id]
 	if pr == nil {
 	if pr == nil {
-		r.prs.initProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
+		r.prs.InitProgress(id, 0, r.raftLog.lastIndex()+1, isLearner)
 	} else {
 	} else {
 		if isLearner && !pr.IsLearner {
 		if isLearner && !pr.IsLearner {
 			// Can only change Learner to Voter.
 			// Can only change Learner to Voter.
@@ -1392,10 +1399,10 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
 		}
 		}
 
 
 		// Change Learner to Voter, use origin Learner progress.
 		// Change Learner to Voter, use origin Learner progress.
-		r.prs.removeAny(id)
-		r.prs.initProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */)
+		r.prs.RemoveAny(id)
+		r.prs.InitProgress(id, 0 /* match */, 1 /* next */, false /* isLearner */)
 		pr.IsLearner = false
 		pr.IsLearner = false
-		*r.prs.getProgress(id) = *pr
+		*r.prs.Progress[id] = *pr
 	}
 	}
 
 
 	if r.id == id {
 	if r.id == id {
@@ -1405,14 +1412,14 @@ func (r *raft) addNodeOrLearnerNode(id uint64, isLearner bool) {
 	// When a node is first added, we should mark it as recently active.
 	// When a node is first added, we should mark it as recently active.
 	// Otherwise, CheckQuorum may cause us to step down if it is invoked
 	// Otherwise, CheckQuorum may cause us to step down if it is invoked
 	// before the added node has a chance to communicate with us.
 	// before the added node has a chance to communicate with us.
-	r.prs.getProgress(id).RecentActive = true
+	r.prs.Progress[id].RecentActive = true
 }
 }
 
 
 func (r *raft) removeNode(id uint64) {
 func (r *raft) removeNode(id uint64) {
-	r.prs.removeAny(id)
+	r.prs.RemoveAny(id)
 
 
 	// Do not try to commit or abort transferring if the cluster is now empty.
 	// Do not try to commit or abort transferring if the cluster is now empty.
-	if len(r.prs.voters[0]) == 0 && len(r.prs.learners) == 0 {
+	if len(r.prs.Voters[0]) == 0 && len(r.prs.Learners) == 0 {
 		return
 		return
 	}
 	}
 
 

+ 20 - 20
raft/raft_flow_control_test.go

@@ -29,11 +29,11 @@ func TestMsgAppFlowControlFull(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 
 
-	pr2 := r.prs.prs[2]
+	pr2 := r.prs.Progress[2]
 	// force the progress to be in replicate state
 	// force the progress to be in replicate state
-	pr2.becomeReplicate()
+	pr2.BecomeReplicate()
 	// fill in the inflights window
 	// fill in the inflights window
-	for i := 0; i < r.prs.maxInflight; i++ {
+	for i := 0; i < r.prs.MaxInflight; i++ {
 		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 		ms := r.readMessages()
 		ms := r.readMessages()
 		if len(ms) != 1 {
 		if len(ms) != 1 {
@@ -42,8 +42,8 @@ func TestMsgAppFlowControlFull(t *testing.T) {
 	}
 	}
 
 
 	// ensure 1
 	// ensure 1
-	if !pr2.ins.full() {
-		t.Fatalf("inflights.full = %t, want %t", pr2.ins.full(), true)
+	if !pr2.Inflights.Full() {
+		t.Fatalf("inflights.full = %t, want %t", pr2.Inflights.Full(), true)
 	}
 	}
 
 
 	// ensure 2
 	// ensure 2
@@ -65,18 +65,18 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 
 
-	pr2 := r.prs.prs[2]
+	pr2 := r.prs.Progress[2]
 	// force the progress to be in replicate state
 	// force the progress to be in replicate state
-	pr2.becomeReplicate()
+	pr2.BecomeReplicate()
 	// fill in the inflights window
 	// fill in the inflights window
-	for i := 0; i < r.prs.maxInflight; i++ {
+	for i := 0; i < r.prs.MaxInflight; i++ {
 		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 		r.readMessages()
 		r.readMessages()
 	}
 	}
 
 
 	// 1 is noop, 2 is the first proposal we just sent.
 	// 1 is noop, 2 is the first proposal we just sent.
 	// so we start with 2.
 	// so we start with 2.
-	for tt := 2; tt < r.prs.maxInflight; tt++ {
+	for tt := 2; tt < r.prs.MaxInflight; tt++ {
 		// move forward the window
 		// move forward the window
 		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(tt)})
 		r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(tt)})
 		r.readMessages()
 		r.readMessages()
@@ -89,15 +89,15 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
 		}
 		}
 
 
 		// ensure 1
 		// ensure 1
-		if !pr2.ins.full() {
-			t.Fatalf("inflights.full = %t, want %t", pr2.ins.full(), true)
+		if !pr2.Inflights.Full() {
+			t.Fatalf("inflights.full = %t, want %t", pr2.Inflights.Full(), true)
 		}
 		}
 
 
 		// ensure 2
 		// ensure 2
 		for i := 0; i < tt; i++ {
 		for i := 0; i < tt; i++ {
 			r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(i)})
 			r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: uint64(i)})
-			if !pr2.ins.full() {
-				t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.ins.full(), true)
+			if !pr2.Inflights.Full() {
+				t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.Inflights.Full(), true)
 			}
 			}
 		}
 		}
 	}
 	}
@@ -110,26 +110,26 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 
 
-	pr2 := r.prs.prs[2]
+	pr2 := r.prs.Progress[2]
 	// force the progress to be in replicate state
 	// force the progress to be in replicate state
-	pr2.becomeReplicate()
+	pr2.BecomeReplicate()
 	// fill in the inflights window
 	// fill in the inflights window
-	for i := 0; i < r.prs.maxInflight; i++ {
+	for i := 0; i < r.prs.MaxInflight; i++ {
 		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 		r.readMessages()
 		r.readMessages()
 	}
 	}
 
 
 	for tt := 1; tt < 5; tt++ {
 	for tt := 1; tt < 5; tt++ {
-		if !pr2.ins.full() {
-			t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.ins.full(), true)
+		if !pr2.Inflights.Full() {
+			t.Fatalf("#%d: inflights.full = %t, want %t", tt, pr2.Inflights.Full(), true)
 		}
 		}
 
 
 		// recv tt msgHeartbeatResp and expect one free slot
 		// recv tt msgHeartbeatResp and expect one free slot
 		for i := 0; i < tt; i++ {
 		for i := 0; i < tt; i++ {
 			r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
 			r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
 			r.readMessages()
 			r.readMessages()
-			if pr2.ins.full() {
-				t.Fatalf("#%d.%d: inflights.full = %t, want %t", tt, i, pr2.ins.full(), false)
+			if pr2.Inflights.Full() {
+				t.Fatalf("#%d.%d: inflights.full = %t, want %t", tt, i, pr2.Inflights.Full(), false)
 			}
 			}
 		}
 		}
 
 

+ 1 - 1
raft/raft_paper_test.go

@@ -169,7 +169,7 @@ func testNonleaderStartElection(t *testing.T, state StateType) {
 	if r.state != StateCandidate {
 	if r.state != StateCandidate {
 		t.Errorf("state = %s, want %s", r.state, StateCandidate)
 		t.Errorf("state = %s, want %s", r.state, StateCandidate)
 	}
 	}
-	if !r.prs.votes[r.id] {
+	if !r.prs.Votes[r.id] {
 		t.Errorf("vote for self = false, want true")
 		t.Errorf("vote for self = false, want true")
 	}
 	}
 	msgs := r.readMessages()
 	msgs := r.readMessages()

+ 32 - 31
raft/raft_snap_test.go

@@ -18,6 +18,7 @@ import (
 	"testing"
 	"testing"
 
 
 	pb "go.etcd.io/etcd/raft/raftpb"
 	pb "go.etcd.io/etcd/raft/raftpb"
+	"go.etcd.io/etcd/raft/tracker"
 )
 )
 
 
 var (
 var (
@@ -40,11 +41,11 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
 
 
 	// force set the next of node 2, so that
 	// force set the next of node 2, so that
 	// node 2 needs a snapshot
 	// node 2 needs a snapshot
-	sm.prs.prs[2].Next = sm.raftLog.firstIndex()
+	sm.prs.Progress[2].Next = sm.raftLog.firstIndex()
 
 
-	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true})
-	if sm.prs.prs[2].PendingSnapshot != 11 {
-		t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.prs[2].PendingSnapshot)
+	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.Progress[2].Next - 1, Reject: true})
+	if sm.prs.Progress[2].PendingSnapshot != 11 {
+		t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.Progress[2].PendingSnapshot)
 	}
 	}
 }
 }
 
 
@@ -56,7 +57,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) {
 	sm.becomeCandidate()
 	sm.becomeCandidate()
 	sm.becomeLeader()
 	sm.becomeLeader()
 
 
-	sm.prs.prs[2].becomeSnapshot(11)
+	sm.prs.Progress[2].BecomeSnapshot(11)
 
 
 	sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 	sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 	msgs := sm.readMessages()
 	msgs := sm.readMessages()
@@ -73,18 +74,18 @@ func TestSnapshotFailure(t *testing.T) {
 	sm.becomeCandidate()
 	sm.becomeCandidate()
 	sm.becomeLeader()
 	sm.becomeLeader()
 
 
-	sm.prs.prs[2].Next = 1
-	sm.prs.prs[2].becomeSnapshot(11)
+	sm.prs.Progress[2].Next = 1
+	sm.prs.Progress[2].BecomeSnapshot(11)
 
 
 	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
 	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
-	if sm.prs.prs[2].PendingSnapshot != 0 {
-		t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot)
+	if sm.prs.Progress[2].PendingSnapshot != 0 {
+		t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.Progress[2].PendingSnapshot)
 	}
 	}
-	if sm.prs.prs[2].Next != 1 {
-		t.Fatalf("Next = %d, want 1", sm.prs.prs[2].Next)
+	if sm.prs.Progress[2].Next != 1 {
+		t.Fatalf("Next = %d, want 1", sm.prs.Progress[2].Next)
 	}
 	}
-	if !sm.prs.prs[2].Paused {
-		t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused)
+	if !sm.prs.Progress[2].ProbeSent {
+		t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent)
 	}
 	}
 }
 }
 
 
@@ -96,18 +97,18 @@ func TestSnapshotSucceed(t *testing.T) {
 	sm.becomeCandidate()
 	sm.becomeCandidate()
 	sm.becomeLeader()
 	sm.becomeLeader()
 
 
-	sm.prs.prs[2].Next = 1
-	sm.prs.prs[2].becomeSnapshot(11)
+	sm.prs.Progress[2].Next = 1
+	sm.prs.Progress[2].BecomeSnapshot(11)
 
 
 	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
 	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
-	if sm.prs.prs[2].PendingSnapshot != 0 {
-		t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot)
+	if sm.prs.Progress[2].PendingSnapshot != 0 {
+		t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.Progress[2].PendingSnapshot)
 	}
 	}
-	if sm.prs.prs[2].Next != 12 {
-		t.Fatalf("Next = %d, want 12", sm.prs.prs[2].Next)
+	if sm.prs.Progress[2].Next != 12 {
+		t.Fatalf("Next = %d, want 12", sm.prs.Progress[2].Next)
 	}
 	}
-	if !sm.prs.prs[2].Paused {
-		t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused)
+	if !sm.prs.Progress[2].ProbeSent {
+		t.Errorf("ProbeSent = %v, want true", sm.prs.Progress[2].ProbeSent)
 	}
 	}
 }
 }
 
 
@@ -206,8 +207,8 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) {
 	mustSend(n2, n1, pb.MsgAppResp)
 	mustSend(n2, n1, pb.MsgAppResp)
 
 
 	// Leader has correct state for follower.
 	// Leader has correct state for follower.
-	pr := n1.prs.prs[2]
-	if pr.State != ProgressStateReplicate {
+	pr := n1.prs.Progress[2]
+	if pr.State != tracker.StateReplicate {
 		t.Fatalf("unexpected state %v", pr)
 		t.Fatalf("unexpected state %v", pr)
 	}
 	}
 	if pr.Match != expIdx || pr.Next != expIdx+1 {
 	if pr.Match != expIdx || pr.Next != expIdx+1 {
@@ -227,23 +228,23 @@ func TestSnapshotAbort(t *testing.T) {
 	sm.becomeCandidate()
 	sm.becomeCandidate()
 	sm.becomeLeader()
 	sm.becomeLeader()
 
 
-	sm.prs.prs[2].Next = 1
-	sm.prs.prs[2].becomeSnapshot(11)
+	sm.prs.Progress[2].Next = 1
+	sm.prs.Progress[2].BecomeSnapshot(11)
 
 
 	// A successful msgAppResp that has a higher/equal index than the
 	// A successful msgAppResp that has a higher/equal index than the
 	// pending snapshot should abort the pending snapshot.
 	// pending snapshot should abort the pending snapshot.
 	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11})
 	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11})
-	if sm.prs.prs[2].PendingSnapshot != 0 {
-		t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot)
+	if sm.prs.Progress[2].PendingSnapshot != 0 {
+		t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.Progress[2].PendingSnapshot)
 	}
 	}
-	// The follower entered ProgressStateReplicate and the leader send an append
+	// The follower entered StateReplicate and the leader send an append
 	// and optimistically updated the progress (so we see 13 instead of 12).
 	// and optimistically updated the progress (so we see 13 instead of 12).
 	// There is something to append because the leader appended an empty entry
 	// There is something to append because the leader appended an empty entry
 	// to the log at index 12 when it assumed leadership.
 	// to the log at index 12 when it assumed leadership.
-	if sm.prs.prs[2].Next != 13 {
-		t.Fatalf("Next = %d, want 13", sm.prs.prs[2].Next)
+	if sm.prs.Progress[2].Next != 13 {
+		t.Fatalf("Next = %d, want 13", sm.prs.Progress[2].Next)
 	}
 	}
-	if n := sm.prs.prs[2].ins.count; n != 1 {
+	if n := sm.prs.Progress[2].Inflights.Count(); n != 1 {
 		t.Fatalf("expected an inflight message, got %d", n)
 		t.Fatalf("expected an inflight message, got %d", n)
 	}
 	}
 }
 }

+ 81 - 295
raft/raft_test.go

@@ -24,6 +24,7 @@ import (
 	"testing"
 	"testing"
 
 
 	pb "go.etcd.io/etcd/raft/raftpb"
 	pb "go.etcd.io/etcd/raft/raftpb"
+	"go.etcd.io/etcd/raft/tracker"
 )
 )
 
 
 // nextEnts returns the appliable entries and updates the applied index
 // nextEnts returns the appliable entries and updates the applied index
@@ -55,228 +56,16 @@ func (r *raft) readMessages() []pb.Message {
 	return msgs
 	return msgs
 }
 }
 
 
-func TestProgressBecomeProbe(t *testing.T) {
-	match := uint64(1)
-	tests := []struct {
-		p     *Progress
-		wnext uint64
-	}{
-		{
-			&Progress{State: ProgressStateReplicate, Match: match, Next: 5, ins: newInflights(256)},
-			2,
-		},
-		{
-			// snapshot finish
-			&Progress{State: ProgressStateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, ins: newInflights(256)},
-			11,
-		},
-		{
-			// snapshot failure
-			&Progress{State: ProgressStateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, ins: newInflights(256)},
-			2,
-		},
-	}
-	for i, tt := range tests {
-		tt.p.becomeProbe()
-		if tt.p.State != ProgressStateProbe {
-			t.Errorf("#%d: state = %s, want %s", i, tt.p.State, ProgressStateProbe)
-		}
-		if tt.p.Match != match {
-			t.Errorf("#%d: match = %d, want %d", i, tt.p.Match, match)
-		}
-		if tt.p.Next != tt.wnext {
-			t.Errorf("#%d: next = %d, want %d", i, tt.p.Next, tt.wnext)
-		}
-	}
-}
-
-func TestProgressBecomeReplicate(t *testing.T) {
-	p := &Progress{State: ProgressStateProbe, Match: 1, Next: 5, ins: newInflights(256)}
-	p.becomeReplicate()
-
-	if p.State != ProgressStateReplicate {
-		t.Errorf("state = %s, want %s", p.State, ProgressStateReplicate)
-	}
-	if p.Match != 1 {
-		t.Errorf("match = %d, want 1", p.Match)
-	}
-	if w := p.Match + 1; p.Next != w {
-		t.Errorf("next = %d, want %d", p.Next, w)
-	}
-}
-
-func TestProgressBecomeSnapshot(t *testing.T) {
-	p := &Progress{State: ProgressStateProbe, Match: 1, Next: 5, ins: newInflights(256)}
-	p.becomeSnapshot(10)
-
-	if p.State != ProgressStateSnapshot {
-		t.Errorf("state = %s, want %s", p.State, ProgressStateSnapshot)
-	}
-	if p.Match != 1 {
-		t.Errorf("match = %d, want 1", p.Match)
-	}
-	if p.PendingSnapshot != 10 {
-		t.Errorf("pendingSnapshot = %d, want 10", p.PendingSnapshot)
-	}
-}
-
-func TestProgressUpdate(t *testing.T) {
-	prevM, prevN := uint64(3), uint64(5)
-	tests := []struct {
-		update uint64
-
-		wm  uint64
-		wn  uint64
-		wok bool
-	}{
-		{prevM - 1, prevM, prevN, false},        // do not decrease match, next
-		{prevM, prevM, prevN, false},            // do not decrease next
-		{prevM + 1, prevM + 1, prevN, true},     // increase match, do not decrease next
-		{prevM + 2, prevM + 2, prevN + 1, true}, // increase match, next
-	}
-	for i, tt := range tests {
-		p := &Progress{
-			Match: prevM,
-			Next:  prevN,
-		}
-		ok := p.maybeUpdate(tt.update)
-		if ok != tt.wok {
-			t.Errorf("#%d: ok= %v, want %v", i, ok, tt.wok)
-		}
-		if p.Match != tt.wm {
-			t.Errorf("#%d: match= %d, want %d", i, p.Match, tt.wm)
-		}
-		if p.Next != tt.wn {
-			t.Errorf("#%d: next= %d, want %d", i, p.Next, tt.wn)
-		}
-	}
-}
-
-func TestProgressMaybeDecr(t *testing.T) {
-	tests := []struct {
-		state    ProgressStateType
-		m        uint64
-		n        uint64
-		rejected uint64
-		last     uint64
-
-		w  bool
-		wn uint64
-	}{
-		{
-			// state replicate and rejected is not greater than match
-			ProgressStateReplicate, 5, 10, 5, 5, false, 10,
-		},
-		{
-			// state replicate and rejected is not greater than match
-			ProgressStateReplicate, 5, 10, 4, 4, false, 10,
-		},
-		{
-			// state replicate and rejected is greater than match
-			// directly decrease to match+1
-			ProgressStateReplicate, 5, 10, 9, 9, true, 6,
-		},
-		{
-			// next-1 != rejected is always false
-			ProgressStateProbe, 0, 0, 0, 0, false, 0,
-		},
-		{
-			// next-1 != rejected is always false
-			ProgressStateProbe, 0, 10, 5, 5, false, 10,
-		},
-		{
-			// next>1 = decremented by 1
-			ProgressStateProbe, 0, 10, 9, 9, true, 9,
-		},
-		{
-			// next>1 = decremented by 1
-			ProgressStateProbe, 0, 2, 1, 1, true, 1,
-		},
-		{
-			// next<=1 = reset to 1
-			ProgressStateProbe, 0, 1, 0, 0, true, 1,
-		},
-		{
-			// decrease to min(rejected, last+1)
-			ProgressStateProbe, 0, 10, 9, 2, true, 3,
-		},
-		{
-			// rejected < 1, reset to 1
-			ProgressStateProbe, 0, 10, 9, 0, true, 1,
-		},
-	}
-	for i, tt := range tests {
-		p := &Progress{
-			State: tt.state,
-			Match: tt.m,
-			Next:  tt.n,
-		}
-		if g := p.maybeDecrTo(tt.rejected, tt.last); g != tt.w {
-			t.Errorf("#%d: maybeDecrTo= %t, want %t", i, g, tt.w)
-		}
-		if gm := p.Match; gm != tt.m {
-			t.Errorf("#%d: match= %d, want %d", i, gm, tt.m)
-		}
-		if gn := p.Next; gn != tt.wn {
-			t.Errorf("#%d: next= %d, want %d", i, gn, tt.wn)
-		}
-	}
-}
-
-func TestProgressIsPaused(t *testing.T) {
-	tests := []struct {
-		state  ProgressStateType
-		paused bool
-
-		w bool
-	}{
-		{ProgressStateProbe, false, false},
-		{ProgressStateProbe, true, true},
-		{ProgressStateReplicate, false, false},
-		{ProgressStateReplicate, true, false},
-		{ProgressStateSnapshot, false, true},
-		{ProgressStateSnapshot, true, true},
-	}
-	for i, tt := range tests {
-		p := &Progress{
-			State:  tt.state,
-			Paused: tt.paused,
-			ins:    newInflights(256),
-		}
-		if g := p.IsPaused(); g != tt.w {
-			t.Errorf("#%d: paused= %t, want %t", i, g, tt.w)
-		}
-	}
-}
-
-// TestProgressResume ensures that progress.maybeUpdate and progress.maybeDecrTo
-// will reset progress.paused.
-func TestProgressResume(t *testing.T) {
-	p := &Progress{
-		Next:   2,
-		Paused: true,
-	}
-	p.maybeDecrTo(1, 1)
-	if p.Paused {
-		t.Errorf("paused= %v, want false", p.Paused)
-	}
-	p.Paused = true
-	p.maybeUpdate(2)
-	if p.Paused {
-		t.Errorf("paused= %v, want false", p.Paused)
-	}
-}
-
 func TestProgressLeader(t *testing.T) {
 func TestProgressLeader(t *testing.T) {
 	r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
 	r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
-	r.prs.prs[2].becomeReplicate()
+	r.prs.Progress[2].BecomeReplicate()
 
 
 	// Send proposals to r1. The first 5 entries should be appended to the log.
 	// Send proposals to r1. The first 5 entries should be appended to the log.
 	propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}}
 	propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}}
 	for i := 0; i < 5; i++ {
 	for i := 0; i < 5; i++ {
-		if pr := r.prs.prs[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 {
+		if pr := r.prs.Progress[r.id]; pr.State != tracker.StateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 {
 			t.Errorf("unexpected progress %v", pr)
 			t.Errorf("unexpected progress %v", pr)
 		}
 		}
 		if err := r.Step(propMsg); err != nil {
 		if err := r.Step(propMsg); err != nil {
@@ -291,17 +80,17 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 
 
-	r.prs.prs[2].Paused = true
+	r.prs.Progress[2].ProbeSent = true
 
 
 	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
 	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
-	if !r.prs.prs[2].Paused {
-		t.Errorf("paused = %v, want true", r.prs.prs[2].Paused)
+	if !r.prs.Progress[2].ProbeSent {
+		t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
 	}
 	}
 
 
-	r.prs.prs[2].becomeReplicate()
+	r.prs.Progress[2].BecomeReplicate()
 	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
 	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
-	if r.prs.prs[2].Paused {
-		t.Errorf("paused = %v, want false", r.prs.prs[2].Paused)
+	if r.prs.Progress[2].ProbeSent {
+		t.Errorf("paused = %v, want false", r.prs.Progress[2].ProbeSent)
 	}
 	}
 }
 }
 
 
@@ -331,7 +120,7 @@ func TestProgressFlowControl(t *testing.T) {
 	r.readMessages()
 	r.readMessages()
 
 
 	// While node 2 is in probe state, propose a bunch of entries.
 	// While node 2 is in probe state, propose a bunch of entries.
-	r.prs.prs[2].becomeProbe()
+	r.prs.Progress[2].BecomeProbe()
 	blob := []byte(strings.Repeat("a", 1000))
 	blob := []byte(strings.Repeat("a", 1000))
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
 		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
 		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
@@ -409,8 +198,8 @@ func TestUncommittedEntryLimit(t *testing.T) {
 
 
 	// Set the two followers to the replicate state. Commit to tail of log.
 	// Set the two followers to the replicate state. Commit to tail of log.
 	const numFollowers = 2
 	const numFollowers = 2
-	r.prs.prs[2].becomeReplicate()
-	r.prs.prs[3].becomeReplicate()
+	r.prs.Progress[2].BecomeReplicate()
+	r.prs.Progress[3].BecomeReplicate()
 	r.uncommittedSize = 0
 	r.uncommittedSize = 0
 
 
 	// Send proposals to r1. The first 5 entries should be appended to the log.
 	// Send proposals to r1. The first 5 entries should be appended to the log.
@@ -889,7 +678,7 @@ func TestLearnerLogReplication(t *testing.T) {
 		t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed)
 		t.Errorf("peer 2 wants committed to %d, but still %d", n1.raftLog.committed, n2.raftLog.committed)
 	}
 	}
 
 
-	match := n1.prs.getProgress(2).Match
+	match := n1.prs.Progress[2].Match
 	if match != n2.raftLog.committed {
 	if match != n2.raftLog.committed {
 		t.Errorf("progress 2 of leader 1 wants match %d, but got %d", n2.raftLog.committed, match)
 		t.Errorf("progress 2 of leader 1 wants match %d, but got %d", n2.raftLog.committed, match)
 	}
 	}
@@ -1351,9 +1140,9 @@ func TestCommit(t *testing.T) {
 		storage.hardState = pb.HardState{Term: tt.smTerm}
 		storage.hardState = pb.HardState{Term: tt.smTerm}
 
 
 		sm := newTestRaft(1, []uint64{1}, 10, 2, storage)
 		sm := newTestRaft(1, []uint64{1}, 10, 2, storage)
-		sm.prs.removeAny(1)
+		sm.prs.RemoveAny(1)
 		for j := 0; j < len(tt.matches); j++ {
 		for j := 0; j < len(tt.matches); j++ {
-			sm.prs.initProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false)
+			sm.prs.InitProgress(uint64(j)+1, tt.matches[j], tt.matches[j]+1, false)
 		}
 		}
 		sm.maybeCommit()
 		sm.maybeCommit()
 		if g := sm.raftLog.committed; g != tt.w {
 		if g := sm.raftLog.committed; g != tt.w {
@@ -2138,7 +1927,7 @@ func TestNonPromotableVoterWithCheckQuorum(t *testing.T) {
 	nt := newNetwork(a, b)
 	nt := newNetwork(a, b)
 	setRandomizedElectionTimeout(b, b.electionTimeout+1)
 	setRandomizedElectionTimeout(b, b.electionTimeout+1)
 	// Need to remove 2 again to make it a non-promotable node since newNetwork overwritten some internal states
 	// Need to remove 2 again to make it a non-promotable node since newNetwork overwritten some internal states
-	b.prs.removeAny(2)
+	b.prs.RemoveAny(2)
 
 
 	if b.promotable() {
 	if b.promotable() {
 		t.Fatalf("promotable = %v, want false", b.promotable())
 		t.Fatalf("promotable = %v, want false", b.promotable())
@@ -2632,7 +2421,7 @@ func TestLeaderAppResp(t *testing.T) {
 		sm.readMessages()
 		sm.readMessages()
 		sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index})
 		sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index})
 
 
-		p := sm.prs.prs[2]
+		p := sm.prs.Progress[2]
 		if p.Match != tt.wmatch {
 		if p.Match != tt.wmatch {
 			t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch)
 			t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch)
 		}
 		}
@@ -2679,9 +2468,9 @@ func TestBcastBeat(t *testing.T) {
 		mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1})
 		mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1})
 	}
 	}
 	// slow follower
 	// slow follower
-	sm.prs.prs[2].Match, sm.prs.prs[2].Next = 5, 6
+	sm.prs.Progress[2].Match, sm.prs.Progress[2].Next = 5, 6
 	// normal follower
 	// normal follower
-	sm.prs.prs[3].Match, sm.prs.prs[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
+	sm.prs.Progress[3].Match, sm.prs.Progress[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
 
 
 	sm.Step(pb.Message{Type: pb.MsgBeat})
 	sm.Step(pb.Message{Type: pb.MsgBeat})
 	msgs := sm.readMessages()
 	msgs := sm.readMessages()
@@ -2689,8 +2478,8 @@ func TestBcastBeat(t *testing.T) {
 		t.Fatalf("len(msgs) = %v, want 2", len(msgs))
 		t.Fatalf("len(msgs) = %v, want 2", len(msgs))
 	}
 	}
 	wantCommitMap := map[uint64]uint64{
 	wantCommitMap := map[uint64]uint64{
-		2: min(sm.raftLog.committed, sm.prs.prs[2].Match),
-		3: min(sm.raftLog.committed, sm.prs.prs[3].Match),
+		2: min(sm.raftLog.committed, sm.prs.Progress[2].Match),
+		3: min(sm.raftLog.committed, sm.prs.Progress[3].Match),
 	}
 	}
 	for i, m := range msgs {
 	for i, m := range msgs {
 		if m.Type != pb.MsgHeartbeat {
 		if m.Type != pb.MsgHeartbeat {
@@ -2759,16 +2548,16 @@ func TestLeaderIncreaseNext(t *testing.T) {
 	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
 	previousEnts := []pb.Entry{{Term: 1, Index: 1}, {Term: 1, Index: 2}, {Term: 1, Index: 3}}
 	tests := []struct {
 	tests := []struct {
 		// progress
 		// progress
-		state ProgressStateType
+		state tracker.StateType
 		next  uint64
 		next  uint64
 
 
 		wnext uint64
 		wnext uint64
 	}{
 	}{
 		// state replicate, optimistically increase next
 		// state replicate, optimistically increase next
 		// previous entries + noop entry + propose + 1
 		// previous entries + noop entry + propose + 1
-		{ProgressStateReplicate, 2, uint64(len(previousEnts) + 1 + 1 + 1)},
+		{tracker.StateReplicate, 2, uint64(len(previousEnts) + 1 + 1 + 1)},
 		// state probe, not optimistically increase next
 		// state probe, not optimistically increase next
-		{ProgressStateProbe, 2, 2},
+		{tracker.StateProbe, 2, 2},
 	}
 	}
 
 
 	for i, tt := range tests {
 	for i, tt := range tests {
@@ -2776,11 +2565,11 @@ func TestLeaderIncreaseNext(t *testing.T) {
 		sm.raftLog.append(previousEnts...)
 		sm.raftLog.append(previousEnts...)
 		sm.becomeCandidate()
 		sm.becomeCandidate()
 		sm.becomeLeader()
 		sm.becomeLeader()
-		sm.prs.prs[2].State = tt.state
-		sm.prs.prs[2].Next = tt.next
+		sm.prs.Progress[2].State = tt.state
+		sm.prs.Progress[2].Next = tt.next
 		sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 		sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 
 
-		p := sm.prs.prs[2]
+		p := sm.prs.Progress[2]
 		if p.Next != tt.wnext {
 		if p.Next != tt.wnext {
 			t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
 			t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
 		}
 		}
@@ -2792,7 +2581,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 	r.readMessages()
 	r.readMessages()
-	r.prs.prs[2].becomeProbe()
+	r.prs.Progress[2].BecomeProbe()
 
 
 	// each round is a heartbeat
 	// each round is a heartbeat
 	for i := 0; i < 3; i++ {
 	for i := 0; i < 3; i++ {
@@ -2811,8 +2600,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
 			}
 			}
 		}
 		}
 
 
-		if !r.prs.prs[2].Paused {
-			t.Errorf("paused = %v, want true", r.prs.prs[2].Paused)
+		if !r.prs.Progress[2].ProbeSent {
+			t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
 		}
 		}
 		for j := 0; j < 10; j++ {
 		for j := 0; j < 10; j++ {
 			mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
 			mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
@@ -2826,8 +2615,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
 		for j := 0; j < r.heartbeatTimeout; j++ {
 		for j := 0; j < r.heartbeatTimeout; j++ {
 			r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
 			r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
 		}
 		}
-		if !r.prs.prs[2].Paused {
-			t.Errorf("paused = %v, want true", r.prs.prs[2].Paused)
+		if !r.prs.Progress[2].ProbeSent {
+			t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
 		}
 		}
 
 
 		// consume the heartbeat
 		// consume the heartbeat
@@ -2849,8 +2638,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
 	if msg[0].Index != 0 {
 	if msg[0].Index != 0 {
 		t.Errorf("index = %d, want %d", msg[0].Index, 0)
 		t.Errorf("index = %d, want %d", msg[0].Index, 0)
 	}
 	}
-	if !r.prs.prs[2].Paused {
-		t.Errorf("paused = %v, want true", r.prs.prs[2].Paused)
+	if !r.prs.Progress[2].ProbeSent {
+		t.Errorf("paused = %v, want true", r.prs.Progress[2].ProbeSent)
 	}
 	}
 }
 }
 
 
@@ -2859,7 +2648,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 	r.readMessages()
 	r.readMessages()
-	r.prs.prs[2].becomeReplicate()
+	r.prs.Progress[2].BecomeReplicate()
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
 		mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
 		mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
@@ -2876,7 +2665,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 	r.readMessages()
 	r.readMessages()
-	r.prs.prs[2].becomeSnapshot(10)
+	r.prs.Progress[2].BecomeSnapshot(10)
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
 		mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
 		mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
@@ -2897,17 +2686,17 @@ func TestRecvMsgUnreachable(t *testing.T) {
 	r.becomeLeader()
 	r.becomeLeader()
 	r.readMessages()
 	r.readMessages()
 	// set node 2 to state replicate
 	// set node 2 to state replicate
-	r.prs.prs[2].Match = 3
-	r.prs.prs[2].becomeReplicate()
-	r.prs.prs[2].optimisticUpdate(5)
+	r.prs.Progress[2].Match = 3
+	r.prs.Progress[2].BecomeReplicate()
+	r.prs.Progress[2].OptimisticUpdate(5)
 
 
 	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})
 	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})
 
 
-	if r.prs.prs[2].State != ProgressStateProbe {
-		t.Errorf("state = %s, want %s", r.prs.prs[2].State, ProgressStateProbe)
+	if r.prs.Progress[2].State != tracker.StateProbe {
+		t.Errorf("state = %s, want %s", r.prs.Progress[2].State, tracker.StateProbe)
 	}
 	}
-	if wnext := r.prs.prs[2].Match + 1; r.prs.prs[2].Next != wnext {
-		t.Errorf("next = %d, want %d", r.prs.prs[2].Next, wnext)
+	if wnext := r.prs.Progress[2].Match + 1; r.prs.Progress[2].Next != wnext {
+		t.Errorf("next = %d, want %d", r.prs.Progress[2].Next, wnext)
 	}
 	}
 }
 }
 
 
@@ -2932,7 +2721,7 @@ func TestRestore(t *testing.T) {
 	if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
 	if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
 		t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
 		t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
 	}
 	}
-	sg := sm.prs.voterNodes()
+	sg := sm.prs.VoterNodes()
 	if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) {
 	if !reflect.DeepEqual(sg, s.Metadata.ConfState.Nodes) {
 		t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Metadata.ConfState.Nodes)
 		t.Errorf("sm.Nodes = %+v, want %+v", sg, s.Metadata.ConfState.Nodes)
 	}
 	}
@@ -2964,22 +2753,22 @@ func TestRestoreWithLearner(t *testing.T) {
 	if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
 	if mustTerm(sm.raftLog.term(s.Metadata.Index)) != s.Metadata.Term {
 		t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
 		t.Errorf("log.lastTerm = %d, want %d", mustTerm(sm.raftLog.term(s.Metadata.Index)), s.Metadata.Term)
 	}
 	}
-	sg := sm.prs.voterNodes()
+	sg := sm.prs.VoterNodes()
 	if len(sg) != len(s.Metadata.ConfState.Nodes) {
 	if len(sg) != len(s.Metadata.ConfState.Nodes) {
 		t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Nodes)
 		t.Errorf("sm.Nodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Nodes)
 	}
 	}
-	lns := sm.prs.learnerNodes()
+	lns := sm.prs.LearnerNodes()
 	if len(lns) != len(s.Metadata.ConfState.Learners) {
 	if len(lns) != len(s.Metadata.ConfState.Learners) {
 		t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners)
 		t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners)
 	}
 	}
 	for _, n := range s.Metadata.ConfState.Nodes {
 	for _, n := range s.Metadata.ConfState.Nodes {
-		if sm.prs.prs[n].IsLearner {
-			t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.prs[n], false)
+		if sm.prs.Progress[n].IsLearner {
+			t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.Progress[n], false)
 		}
 		}
 	}
 	}
 	for _, n := range s.Metadata.ConfState.Learners {
 	for _, n := range s.Metadata.ConfState.Learners {
-		if !sm.prs.prs[n].IsLearner {
-			t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.prs[n], true)
+		if !sm.prs.Progress[n].IsLearner {
+			t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.Progress[n], true)
 		}
 		}
 	}
 	}
 
 
@@ -3121,8 +2910,8 @@ func TestProvideSnap(t *testing.T) {
 	sm.becomeLeader()
 	sm.becomeLeader()
 
 
 	// force set the next of node 2, so that node 2 needs a snapshot
 	// force set the next of node 2, so that node 2 needs a snapshot
-	sm.prs.prs[2].Next = sm.raftLog.firstIndex()
-	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true})
+	sm.prs.Progress[2].Next = sm.raftLog.firstIndex()
+	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.Progress[2].Next - 1, Reject: true})
 
 
 	msgs := sm.readMessages()
 	msgs := sm.readMessages()
 	if len(msgs) != 1 {
 	if len(msgs) != 1 {
@@ -3152,8 +2941,8 @@ func TestIgnoreProvidingSnap(t *testing.T) {
 
 
 	// force set the next of node 2, so that node 2 needs a snapshot
 	// force set the next of node 2, so that node 2 needs a snapshot
 	// change node 2 to be inactive, expect node 1 ignore sending snapshot to 2
 	// change node 2 to be inactive, expect node 1 ignore sending snapshot to 2
-	sm.prs.prs[2].Next = sm.raftLog.firstIndex() - 1
-	sm.prs.prs[2].RecentActive = false
+	sm.prs.Progress[2].Next = sm.raftLog.firstIndex() - 1
+	sm.prs.Progress[2].RecentActive = false
 
 
 	sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 	sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 
 
@@ -3193,7 +2982,7 @@ func TestSlowNodeRestore(t *testing.T) {
 	}
 	}
 	lead := nt.peers[1].(*raft)
 	lead := nt.peers[1].(*raft)
 	nextEnts(lead, nt.storage[1])
 	nextEnts(lead, nt.storage[1])
-	nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.voterNodes()}, nil)
+	nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.VoterNodes()}, nil)
 	nt.storage[1].Compact(lead.raftLog.applied)
 	nt.storage[1].Compact(lead.raftLog.applied)
 
 
 	nt.recover()
 	nt.recover()
@@ -3201,7 +2990,7 @@ func TestSlowNodeRestore(t *testing.T) {
 	// node 3 will only be considered as active when node 1 receives a reply from it.
 	// node 3 will only be considered as active when node 1 receives a reply from it.
 	for {
 	for {
 		nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
 		nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
-		if lead.prs.prs[3].RecentActive {
+		if lead.prs.Progress[3].RecentActive {
 			break
 			break
 		}
 		}
 	}
 	}
@@ -3288,7 +3077,7 @@ func TestNewLeaderPendingConfig(t *testing.T) {
 func TestAddNode(t *testing.T) {
 func TestAddNode(t *testing.T) {
 	r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
 	r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
 	r.addNode(2)
 	r.addNode(2)
-	nodes := r.prs.voterNodes()
+	nodes := r.prs.VoterNodes()
 	wnodes := []uint64{1, 2}
 	wnodes := []uint64{1, 2}
 	if !reflect.DeepEqual(nodes, wnodes) {
 	if !reflect.DeepEqual(nodes, wnodes) {
 		t.Errorf("nodes = %v, want %v", nodes, wnodes)
 		t.Errorf("nodes = %v, want %v", nodes, wnodes)
@@ -3299,13 +3088,13 @@ func TestAddNode(t *testing.T) {
 func TestAddLearner(t *testing.T) {
 func TestAddLearner(t *testing.T) {
 	r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
 	r := newTestRaft(1, []uint64{1}, 10, 1, NewMemoryStorage())
 	r.addLearner(2)
 	r.addLearner(2)
-	nodes := r.prs.learnerNodes()
+	nodes := r.prs.LearnerNodes()
 	wnodes := []uint64{2}
 	wnodes := []uint64{2}
 	if !reflect.DeepEqual(nodes, wnodes) {
 	if !reflect.DeepEqual(nodes, wnodes) {
 		t.Errorf("nodes = %v, want %v", nodes, wnodes)
 		t.Errorf("nodes = %v, want %v", nodes, wnodes)
 	}
 	}
-	if !r.prs.prs[2].IsLearner {
-		t.Errorf("node 2 is learner %t, want %t", r.prs.prs[2].IsLearner, true)
+	if !r.prs.Progress[2].IsLearner {
+		t.Errorf("node 2 is learner %t, want %t", r.prs.Progress[2].IsLearner, true)
 	}
 	}
 }
 }
 
 
@@ -3349,14 +3138,14 @@ func TestRemoveNode(t *testing.T) {
 	r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
 	r := newTestRaft(1, []uint64{1, 2}, 10, 1, NewMemoryStorage())
 	r.removeNode(2)
 	r.removeNode(2)
 	w := []uint64{1}
 	w := []uint64{1}
-	if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) {
+	if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
 		t.Errorf("nodes = %v, want %v", g, w)
 		t.Errorf("nodes = %v, want %v", g, w)
 	}
 	}
 
 
 	// remove all nodes from cluster
 	// remove all nodes from cluster
 	r.removeNode(1)
 	r.removeNode(1)
 	w = []uint64{}
 	w = []uint64{}
-	if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) {
+	if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
 		t.Errorf("nodes = %v, want %v", g, w)
 		t.Errorf("nodes = %v, want %v", g, w)
 	}
 	}
 }
 }
@@ -3367,18 +3156,18 @@ func TestRemoveLearner(t *testing.T) {
 	r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
 	r := newTestLearnerRaft(1, []uint64{1}, []uint64{2}, 10, 1, NewMemoryStorage())
 	r.removeNode(2)
 	r.removeNode(2)
 	w := []uint64{1}
 	w := []uint64{1}
-	if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) {
+	if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
 		t.Errorf("nodes = %v, want %v", g, w)
 		t.Errorf("nodes = %v, want %v", g, w)
 	}
 	}
 
 
 	w = []uint64{}
 	w = []uint64{}
-	if g := r.prs.learnerNodes(); !reflect.DeepEqual(g, w) {
+	if g := r.prs.LearnerNodes(); !reflect.DeepEqual(g, w) {
 		t.Errorf("nodes = %v, want %v", g, w)
 		t.Errorf("nodes = %v, want %v", g, w)
 	}
 	}
 
 
 	// remove all nodes from cluster
 	// remove all nodes from cluster
 	r.removeNode(1)
 	r.removeNode(1)
-	if g := r.prs.voterNodes(); !reflect.DeepEqual(g, w) {
+	if g := r.prs.VoterNodes(); !reflect.DeepEqual(g, w) {
 		t.Errorf("nodes = %v, want %v", g, w)
 		t.Errorf("nodes = %v, want %v", g, w)
 	}
 	}
 }
 }
@@ -3417,8 +3206,8 @@ func TestRaftNodes(t *testing.T) {
 	}
 	}
 	for i, tt := range tests {
 	for i, tt := range tests {
 		r := newTestRaft(1, tt.ids, 10, 1, NewMemoryStorage())
 		r := newTestRaft(1, tt.ids, 10, 1, NewMemoryStorage())
-		if !reflect.DeepEqual(r.prs.voterNodes(), tt.wids) {
-			t.Errorf("#%d: nodes = %+v, want %+v", i, r.prs.voterNodes(), tt.wids)
+		if !reflect.DeepEqual(r.prs.VoterNodes(), tt.wids) {
+			t.Errorf("#%d: nodes = %+v, want %+v", i, r.prs.VoterNodes(), tt.wids)
 		}
 		}
 	}
 	}
 }
 }
@@ -3619,8 +3408,8 @@ func TestLeaderTransferToSlowFollower(t *testing.T) {
 
 
 	nt.recover()
 	nt.recover()
 	lead := nt.peers[1].(*raft)
 	lead := nt.peers[1].(*raft)
-	if lead.prs.prs[3].Match != 1 {
-		t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.prs[3].Match, 1)
+	if lead.prs.Progress[3].Match != 1 {
+		t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.Progress[3].Match, 1)
 	}
 	}
 
 
 	// Transfer leadership to 3 when node 3 is lack of log.
 	// Transfer leadership to 3 when node 3 is lack of log.
@@ -3638,12 +3427,12 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) {
 	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
 	nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{}}})
 	lead := nt.peers[1].(*raft)
 	lead := nt.peers[1].(*raft)
 	nextEnts(lead, nt.storage[1])
 	nextEnts(lead, nt.storage[1])
-	nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.voterNodes()}, nil)
+	nt.storage[1].CreateSnapshot(lead.raftLog.applied, &pb.ConfState{Nodes: lead.prs.VoterNodes()}, nil)
 	nt.storage[1].Compact(lead.raftLog.applied)
 	nt.storage[1].Compact(lead.raftLog.applied)
 
 
 	nt.recover()
 	nt.recover()
-	if lead.prs.prs[3].Match != 1 {
-		t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.prs[3].Match, 1)
+	if lead.prs.Progress[3].Match != 1 {
+		t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.Progress[3].Match, 1)
 	}
 	}
 
 
 	// Transfer leadership to 3 when node 3 is lack of snapshot.
 	// Transfer leadership to 3 when node 3 is lack of snapshot.
@@ -3722,8 +3511,8 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) {
 		t.Fatalf("should return drop proposal error while transferring")
 		t.Fatalf("should return drop proposal error while transferring")
 	}
 	}
 
 
-	if lead.prs.prs[1].Match != 1 {
-		t.Fatalf("node 1 has match %x, want %x", lead.prs.prs[1].Match, 1)
+	if lead.prs.Progress[1].Match != 1 {
+		t.Fatalf("node 1 has match %x, want %x", lead.prs.Progress[1].Match, 1)
 	}
 	}
 }
 }
 
 
@@ -4329,24 +4118,21 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
 			sm := newRaft(cfg)
 			sm := newRaft(cfg)
 			npeers[id] = sm
 			npeers[id] = sm
 		case *raft:
 		case *raft:
-			learners := make(map[uint64]bool, len(v.prs.learners))
-			for i := range v.prs.learners {
+			learners := make(map[uint64]bool, len(v.prs.Learners))
+			for i := range v.prs.Learners {
 				learners[i] = true
 				learners[i] = true
 			}
 			}
 			v.id = id
 			v.id = id
-			v.prs.voters[0] = make(map[uint64]struct{})
-			v.prs.voters[1] = make(map[uint64]struct{})
-			v.prs.learners = make(map[uint64]struct{})
-			v.prs.prs = make(map[uint64]*Progress)
+			v.prs = tracker.MakeProgressTracker(v.prs.MaxInflight)
 			for i := 0; i < size; i++ {
 			for i := 0; i < size; i++ {
-				pr := &Progress{}
+				pr := &tracker.Progress{}
 				if _, ok := learners[peerAddrs[i]]; ok {
 				if _, ok := learners[peerAddrs[i]]; ok {
 					pr.IsLearner = true
 					pr.IsLearner = true
-					v.prs.learners[peerAddrs[i]] = struct{}{}
+					v.prs.Learners[peerAddrs[i]] = struct{}{}
 				} else {
 				} else {
-					v.prs.voters[0][peerAddrs[i]] = struct{}{}
+					v.prs.Voters[0][peerAddrs[i]] = struct{}{}
 				}
 				}
-				v.prs.prs[peerAddrs[i]] = pr
+				v.prs.Progress[peerAddrs[i]] = pr
 			}
 			}
 			v.reset(v.Term)
 			v.reset(v.Term)
 			npeers[id] = v
 			npeers[id] = v

+ 7 - 6
raft/rawnode.go

@@ -18,6 +18,7 @@ import (
 	"errors"
 	"errors"
 
 
 	pb "go.etcd.io/etcd/raft/raftpb"
 	pb "go.etcd.io/etcd/raft/raftpb"
+	"go.etcd.io/etcd/raft/tracker"
 )
 )
 
 
 // ErrStepLocalMsg is returned when try to step a local raft message
 // ErrStepLocalMsg is returned when try to step a local raft message
@@ -166,7 +167,7 @@ func (rn *RawNode) ProposeConfChange(cc pb.ConfChange) error {
 // ApplyConfChange applies a config change to the local node.
 // ApplyConfChange applies a config change to the local node.
 func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
 func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
 	if cc.NodeID == None {
 	if cc.NodeID == None {
-		return &pb.ConfState{Nodes: rn.raft.prs.voterNodes(), Learners: rn.raft.prs.learnerNodes()}
+		return &pb.ConfState{Nodes: rn.raft.prs.VoterNodes(), Learners: rn.raft.prs.LearnerNodes()}
 	}
 	}
 	switch cc.Type {
 	switch cc.Type {
 	case pb.ConfChangeAddNode:
 	case pb.ConfChangeAddNode:
@@ -179,7 +180,7 @@ func (rn *RawNode) ApplyConfChange(cc pb.ConfChange) *pb.ConfState {
 	default:
 	default:
 		panic("unexpected conf type")
 		panic("unexpected conf type")
 	}
 	}
-	return &pb.ConfState{Nodes: rn.raft.prs.voterNodes(), Learners: rn.raft.prs.learnerNodes()}
+	return &pb.ConfState{Nodes: rn.raft.prs.VoterNodes(), Learners: rn.raft.prs.LearnerNodes()}
 }
 }
 
 
 // Step advances the state machine using the given message.
 // Step advances the state machine using the given message.
@@ -188,7 +189,7 @@ func (rn *RawNode) Step(m pb.Message) error {
 	if IsLocalMsg(m.Type) {
 	if IsLocalMsg(m.Type) {
 		return ErrStepLocalMsg
 		return ErrStepLocalMsg
 	}
 	}
-	if pr := rn.raft.prs.getProgress(m.From); pr != nil || !IsResponseMsg(m.Type) {
+	if pr := rn.raft.prs.Progress[m.From]; pr != nil || !IsResponseMsg(m.Type) {
 		return rn.raft.Step(m)
 		return rn.raft.Step(m)
 	}
 	}
 	return ErrStepPeerNotFound
 	return ErrStepPeerNotFound
@@ -256,14 +257,14 @@ const (
 
 
 // WithProgress is a helper to introspect the Progress for this node and its
 // WithProgress is a helper to introspect the Progress for this node and its
 // peers.
 // peers.
-func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr Progress)) {
-	rn.raft.prs.visit(func(id uint64, pr *Progress) {
+func (rn *RawNode) WithProgress(visitor func(id uint64, typ ProgressType, pr tracker.Progress)) {
+	rn.raft.prs.Visit(func(id uint64, pr *tracker.Progress) {
 		typ := ProgressTypePeer
 		typ := ProgressTypePeer
 		if pr.IsLearner {
 		if pr.IsLearner {
 			typ = ProgressTypeLearner
 			typ = ProgressTypeLearner
 		}
 		}
 		p := *pr
 		p := *pr
-		p.ins = nil
+		p.Inflights = nil
 		visitor(id, typ, p)
 		visitor(id, typ, p)
 	})
 	})
 }
 }

+ 3 - 2
raft/rawnode_test.go

@@ -22,6 +22,7 @@ import (
 	"testing"
 	"testing"
 
 
 	"go.etcd.io/etcd/raft/raftpb"
 	"go.etcd.io/etcd/raft/raftpb"
+	"go.etcd.io/etcd/raft/tracker"
 )
 )
 
 
 // rawNodeAdapter is essentially a lint that makes sure that RawNode implements
 // rawNodeAdapter is essentially a lint that makes sure that RawNode implements
@@ -638,7 +639,7 @@ func BenchmarkStatusProgress(b *testing.B) {
 
 
 			b.Run("WithProgress", func(b *testing.B) {
 			b.Run("WithProgress", func(b *testing.B) {
 				b.ReportAllocs()
 				b.ReportAllocs()
-				visit := func(uint64, ProgressType, Progress) {}
+				visit := func(uint64, ProgressType, tracker.Progress) {}
 
 
 				for i := 0; i < b.N; i++ {
 				for i := 0; i < b.N; i++ {
 					rn.WithProgress(visit)
 					rn.WithProgress(visit)
@@ -648,7 +649,7 @@ func BenchmarkStatusProgress(b *testing.B) {
 				b.ReportAllocs()
 				b.ReportAllocs()
 				for i := 0; i < b.N; i++ {
 				for i := 0; i < b.N; i++ {
 					var n uint64
 					var n uint64
-					visit := func(_ uint64, _ ProgressType, pr Progress) {
+					visit := func(_ uint64, _ ProgressType, pr tracker.Progress) {
 						n += pr.Match
 						n += pr.Match
 					}
 					}
 					rn.WithProgress(visit)
 					rn.WithProgress(visit)

+ 7 - 6
raft/status.go

@@ -18,6 +18,7 @@ import (
 	"fmt"
 	"fmt"
 
 
 	pb "go.etcd.io/etcd/raft/raftpb"
 	pb "go.etcd.io/etcd/raft/raftpb"
+	"go.etcd.io/etcd/raft/tracker"
 )
 )
 
 
 type Status struct {
 type Status struct {
@@ -27,20 +28,20 @@ type Status struct {
 	SoftState
 	SoftState
 
 
 	Applied  uint64
 	Applied  uint64
-	Progress map[uint64]Progress
+	Progress map[uint64]tracker.Progress
 
 
 	LeadTransferee uint64
 	LeadTransferee uint64
 }
 }
 
 
-func getProgressCopy(r *raft) map[uint64]Progress {
-	m := make(map[uint64]Progress)
-	r.prs.visit(func(id uint64, pr *Progress) {
-		var p Progress
+func getProgressCopy(r *raft) map[uint64]tracker.Progress {
+	m := make(map[uint64]tracker.Progress)
+	r.prs.Visit(func(id uint64, pr *tracker.Progress) {
+		var p tracker.Progress
 		p, pr = *pr, nil /* avoid accidental reuse below */
 		p, pr = *pr, nil /* avoid accidental reuse below */
 
 
 		// The inflight buffer is tricky to copy and besides, it isn't exposed
 		// The inflight buffer is tricky to copy and besides, it isn't exposed
 		// to the client, so pretend it's nil.
 		// to the client, so pretend it's nil.
-		p.ins = nil
+		p.Inflights = nil
 
 
 		m[id] = p
 		m[id] = p
 	})
 	})

+ 124 - 0
raft/tracker/inflights.go

@@ -0,0 +1,124 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tracker
+
+// Inflights limits the number of MsgApp (represented by the largest index
+// contained within) sent to followers but not yet acknowledged by them. Callers
+// use Full() to check whether more messages can be sent, call Add() whenever
+// they are sending a new append, and release "quota" via FreeLE() whenever an
+// ack is received.
+type Inflights struct {
+	// the starting index in the buffer
+	start int
+	// number of inflights in the buffer
+	count int
+
+	// the size of the buffer
+	size int
+
+	// buffer contains the index of the last entry
+	// inside one message.
+	buffer []uint64
+}
+
+// NewInflights sets up an Inflights that allows up to 'size' inflight messages.
+func NewInflights(size int) *Inflights {
+	return &Inflights{
+		size: size,
+	}
+}
+
+// Add notifies the Inflights that a new message with the given index is being
+// dispatched. Full() must be called prior to Add() to verify that there is room
+// for one more message, and consecutive calls to add Add() must provide a
+// monotonic sequence of indexes.
+func (in *Inflights) Add(inflight uint64) {
+	if in.Full() {
+		panic("cannot add into a Full inflights")
+	}
+	next := in.start + in.count
+	size := in.size
+	if next >= size {
+		next -= size
+	}
+	if next >= len(in.buffer) {
+		in.grow()
+	}
+	in.buffer[next] = inflight
+	in.count++
+}
+
+// grow the inflight buffer by doubling up to inflights.size. We grow on demand
+// instead of preallocating to inflights.size to handle systems which have
+// thousands of Raft groups per process.
+func (in *Inflights) grow() {
+	newSize := len(in.buffer) * 2
+	if newSize == 0 {
+		newSize = 1
+	} else if newSize > in.size {
+		newSize = in.size
+	}
+	newBuffer := make([]uint64, newSize)
+	copy(newBuffer, in.buffer)
+	in.buffer = newBuffer
+}
+
+// FreeLE frees the inflights smaller or equal to the given `to` flight.
+func (in *Inflights) FreeLE(to uint64) {
+	if in.count == 0 || to < in.buffer[in.start] {
+		// out of the left side of the window
+		return
+	}
+
+	idx := in.start
+	var i int
+	for i = 0; i < in.count; i++ {
+		if to < in.buffer[idx] { // found the first large inflight
+			break
+		}
+
+		// increase index and maybe rotate
+		size := in.size
+		if idx++; idx >= size {
+			idx -= size
+		}
+	}
+	// free i inflights and set new start index
+	in.count -= i
+	in.start = idx
+	if in.count == 0 {
+		// inflights is empty, reset the start index so that we don't grow the
+		// buffer unnecessarily.
+		in.start = 0
+	}
+}
+
+// FreeFirstOne releases the first inflight. This is a no-op if nothing is
+// inflight.
+func (in *Inflights) FreeFirstOne() { in.FreeLE(in.buffer[in.start]) }
+
+// Full returns true if no more messages can be sent at the moment.
+func (in *Inflights) Full() bool {
+	return in.count == in.size
+}
+
+// Count returns the number of inflight messages.
+func (in *Inflights) Count() int { return in.count }
+
+// reset frees all inflights.
+func (in *Inflights) reset() {
+	in.count = 0
+	in.start = 0
+}

+ 27 - 27
raft/progress_test.go → raft/tracker/inflights_test.go

@@ -1,4 +1,4 @@
-// Copyright 2015 The etcd Authors
+// Copyright 2019 The etcd Authors
 //
 //
 // Licensed under the Apache License, Version 2.0 (the "License");
 // Licensed under the Apache License, Version 2.0 (the "License");
 // you may not use this file except in compliance with the License.
 // you may not use this file except in compliance with the License.
@@ -12,7 +12,7 @@
 // See the License for the specific language governing permissions and
 // See the License for the specific language governing permissions and
 // limitations under the License.
 // limitations under the License.
 
 
-package raft
+package tracker
 
 
 import (
 import (
 	"reflect"
 	"reflect"
@@ -21,16 +21,16 @@ import (
 
 
 func TestInflightsAdd(t *testing.T) {
 func TestInflightsAdd(t *testing.T) {
 	// no rotating case
 	// no rotating case
-	in := &inflights{
+	in := &Inflights{
 		size:   10,
 		size:   10,
 		buffer: make([]uint64, 10),
 		buffer: make([]uint64, 10),
 	}
 	}
 
 
 	for i := 0; i < 5; i++ {
 	for i := 0; i < 5; i++ {
-		in.add(uint64(i))
+		in.Add(uint64(i))
 	}
 	}
 
 
-	wantIn := &inflights{
+	wantIn := &Inflights{
 		start: 0,
 		start: 0,
 		count: 5,
 		count: 5,
 		size:  10,
 		size:  10,
@@ -43,10 +43,10 @@ func TestInflightsAdd(t *testing.T) {
 	}
 	}
 
 
 	for i := 5; i < 10; i++ {
 	for i := 5; i < 10; i++ {
-		in.add(uint64(i))
+		in.Add(uint64(i))
 	}
 	}
 
 
-	wantIn2 := &inflights{
+	wantIn2 := &Inflights{
 		start: 0,
 		start: 0,
 		count: 10,
 		count: 10,
 		size:  10,
 		size:  10,
@@ -59,17 +59,17 @@ func TestInflightsAdd(t *testing.T) {
 	}
 	}
 
 
 	// rotating case
 	// rotating case
-	in2 := &inflights{
+	in2 := &Inflights{
 		start:  5,
 		start:  5,
 		size:   10,
 		size:   10,
 		buffer: make([]uint64, 10),
 		buffer: make([]uint64, 10),
 	}
 	}
 
 
 	for i := 0; i < 5; i++ {
 	for i := 0; i < 5; i++ {
-		in2.add(uint64(i))
+		in2.Add(uint64(i))
 	}
 	}
 
 
-	wantIn21 := &inflights{
+	wantIn21 := &Inflights{
 		start: 5,
 		start: 5,
 		count: 5,
 		count: 5,
 		size:  10,
 		size:  10,
@@ -82,10 +82,10 @@ func TestInflightsAdd(t *testing.T) {
 	}
 	}
 
 
 	for i := 5; i < 10; i++ {
 	for i := 5; i < 10; i++ {
-		in2.add(uint64(i))
+		in2.Add(uint64(i))
 	}
 	}
 
 
-	wantIn22 := &inflights{
+	wantIn22 := &Inflights{
 		start: 5,
 		start: 5,
 		count: 10,
 		count: 10,
 		size:  10,
 		size:  10,
@@ -100,14 +100,14 @@ func TestInflightsAdd(t *testing.T) {
 
 
 func TestInflightFreeTo(t *testing.T) {
 func TestInflightFreeTo(t *testing.T) {
 	// no rotating case
 	// no rotating case
-	in := newInflights(10)
+	in := NewInflights(10)
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
-		in.add(uint64(i))
+		in.Add(uint64(i))
 	}
 	}
 
 
-	in.freeTo(4)
+	in.FreeLE(4)
 
 
-	wantIn := &inflights{
+	wantIn := &Inflights{
 		start: 5,
 		start: 5,
 		count: 5,
 		count: 5,
 		size:  10,
 		size:  10,
@@ -119,9 +119,9 @@ func TestInflightFreeTo(t *testing.T) {
 		t.Fatalf("in = %+v, want %+v", in, wantIn)
 		t.Fatalf("in = %+v, want %+v", in, wantIn)
 	}
 	}
 
 
-	in.freeTo(8)
+	in.FreeLE(8)
 
 
-	wantIn2 := &inflights{
+	wantIn2 := &Inflights{
 		start: 9,
 		start: 9,
 		count: 1,
 		count: 1,
 		size:  10,
 		size:  10,
@@ -135,12 +135,12 @@ func TestInflightFreeTo(t *testing.T) {
 
 
 	// rotating case
 	// rotating case
 	for i := 10; i < 15; i++ {
 	for i := 10; i < 15; i++ {
-		in.add(uint64(i))
+		in.Add(uint64(i))
 	}
 	}
 
 
-	in.freeTo(12)
+	in.FreeLE(12)
 
 
-	wantIn3 := &inflights{
+	wantIn3 := &Inflights{
 		start: 3,
 		start: 3,
 		count: 2,
 		count: 2,
 		size:  10,
 		size:  10,
@@ -152,9 +152,9 @@ func TestInflightFreeTo(t *testing.T) {
 		t.Fatalf("in = %+v, want %+v", in, wantIn3)
 		t.Fatalf("in = %+v, want %+v", in, wantIn3)
 	}
 	}
 
 
-	in.freeTo(14)
+	in.FreeLE(14)
 
 
-	wantIn4 := &inflights{
+	wantIn4 := &Inflights{
 		start: 0,
 		start: 0,
 		count: 0,
 		count: 0,
 		size:  10,
 		size:  10,
@@ -168,14 +168,14 @@ func TestInflightFreeTo(t *testing.T) {
 }
 }
 
 
 func TestInflightFreeFirstOne(t *testing.T) {
 func TestInflightFreeFirstOne(t *testing.T) {
-	in := newInflights(10)
+	in := NewInflights(10)
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
-		in.add(uint64(i))
+		in.Add(uint64(i))
 	}
 	}
 
 
-	in.freeFirstOne()
+	in.FreeFirstOne()
 
 
-	wantIn := &inflights{
+	wantIn := &Inflights{
 		start: 1,
 		start: 1,
 		count: 9,
 		count: 9,
 		size:  10,
 		size:  10,

+ 215 - 0
raft/tracker/progress.go

@@ -0,0 +1,215 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tracker
+
+import "fmt"
+
+// Progress represents a follower’s progress in the view of the leader. Leader
+// maintains progresses of all followers, and sends entries to the follower
+// based on its progress.
+//
+// NB(tbg): Progress is basically a state machine whose transitions are mostly
+// strewn around `*raft.raft`. Additionally, some fields are only used when in a
+// certain State. All of this isn't ideal.
+type Progress struct {
+	Match, Next uint64
+	// State defines how the leader should interact with the follower.
+	//
+	// When in StateProbe, leader sends at most one replication message
+	// per heartbeat interval. It also probes actual progress of the follower.
+	//
+	// When in StateReplicate, leader optimistically increases next
+	// to the latest entry sent after sending replication message. This is
+	// an optimized state for fast replicating log entries to the follower.
+	//
+	// When in StateSnapshot, leader should have sent out snapshot
+	// before and stops sending any replication message.
+	State StateType
+
+	// PendingSnapshot is used in StateSnapshot.
+	// If there is a pending snapshot, the pendingSnapshot will be set to the
+	// index of the snapshot. If pendingSnapshot is set, the replication process of
+	// this Progress will be paused. raft will not resend snapshot until the pending one
+	// is reported to be failed.
+	PendingSnapshot uint64
+
+	// RecentActive is true if the progress is recently active. Receiving any messages
+	// from the corresponding follower indicates the progress is active.
+	// RecentActive can be reset to false after an election timeout.
+	RecentActive bool
+
+	// ProbeSent is used while this follow is in StateProbe. When ProbeSent is
+	// true, raft should pause sending replication message to this peer until
+	// ProbeSent is reset. See ProbeAcked() and IsPaused().
+	ProbeSent bool
+
+	// Inflights is a sliding window for the inflight messages.
+	// Each inflight message contains one or more log entries.
+	// The max number of entries per message is defined in raft config as MaxSizePerMsg.
+	// Thus inflight effectively limits both the number of inflight messages
+	// and the bandwidth each Progress can use.
+	// When inflights is Full, no more message should be sent.
+	// When a leader sends out a message, the index of the last
+	// entry should be added to inflights. The index MUST be added
+	// into inflights in order.
+	// When a leader receives a reply, the previous inflights should
+	// be freed by calling inflights.FreeLE with the index of the last
+	// received entry.
+	Inflights *Inflights
+
+	// IsLearner is true if this progress is tracked for a learner.
+	IsLearner bool
+}
+
+// ResetState moves the Progress into the specified State, resetting ProbeSent,
+// PendingSnapshot, and Inflights.
+func (pr *Progress) ResetState(state StateType) {
+	pr.ProbeSent = false
+	pr.PendingSnapshot = 0
+	pr.State = state
+	pr.Inflights.reset()
+}
+
+func max(a, b uint64) uint64 {
+	if a > b {
+		return a
+	}
+	return b
+}
+
+func min(a, b uint64) uint64 {
+	if a > b {
+		return b
+	}
+	return a
+}
+
+// ProbeAcked is called when this peer has accepted an append. It resets
+// ProbeSent to signal that additional append messages should be sent without
+// further delay.
+func (pr *Progress) ProbeAcked() {
+	pr.ProbeSent = false
+}
+
+// BecomeProbe transitions into StateProbe. Next is reset to Match+1 or,
+// optionally and if larger, the index of the pending snapshot.
+func (pr *Progress) BecomeProbe() {
+	// If the original state is StateSnapshot, progress knows that
+	// the pending snapshot has been sent to this peer successfully, then
+	// probes from pendingSnapshot + 1.
+	if pr.State == StateSnapshot {
+		pendingSnapshot := pr.PendingSnapshot
+		pr.ResetState(StateProbe)
+		pr.Next = max(pr.Match+1, pendingSnapshot+1)
+	} else {
+		pr.ResetState(StateProbe)
+		pr.Next = pr.Match + 1
+	}
+}
+
+// BecomeReplicate transitions into StateReplicate, resetting Next to Match+1.
+func (pr *Progress) BecomeReplicate() {
+	pr.ResetState(StateReplicate)
+	pr.Next = pr.Match + 1
+}
+
+// BecomeSnapshot moves the Progress to StateSnapshot with the specified pending
+// snapshot index.
+func (pr *Progress) BecomeSnapshot(snapshoti uint64) {
+	pr.ResetState(StateSnapshot)
+	pr.PendingSnapshot = snapshoti
+}
+
+// MaybeUpdate is called when an MsgAppResp arrives from the follower, with the
+// index acked by it. The method returns false if the given n index comes from
+// an outdated message. Otherwise it updates the progress and returns true.
+func (pr *Progress) MaybeUpdate(n uint64) bool {
+	var updated bool
+	if pr.Match < n {
+		pr.Match = n
+		updated = true
+		pr.ProbeAcked()
+	}
+	if pr.Next < n+1 {
+		pr.Next = n + 1
+	}
+	return updated
+}
+
+// OptimisticUpdate signals that appends all the way up to and including index n
+// are in-flight. As a result, Next is increased to n+1.
+func (pr *Progress) OptimisticUpdate(n uint64) { pr.Next = n + 1 }
+
+// MaybeDecrTo adjusts the Progress to the receipt of a MsgApp rejection. The
+// arguments are the index the follower rejected to append to its log, and its
+// last index.
+//
+// Rejections can happen spuriously as messages are sent out of order or
+// duplicated. In such cases, the rejection pertains to an index that the
+// Progress already knows were previously acknowledged, and false is returned
+// without changing the Progress.
+//
+// If the rejection is genuine, Next is lowered sensibly, and the Progress is
+// cleared for sending log entries.
+func (pr *Progress) MaybeDecrTo(rejected, last uint64) bool {
+	if pr.State == StateReplicate {
+		// The rejection must be stale if the progress has matched and "rejected"
+		// is smaller than "match".
+		if rejected <= pr.Match {
+			return false
+		}
+		// Directly decrease next to match + 1.
+		//
+		// TODO(tbg): why not use last if it's larger?
+		pr.Next = pr.Match + 1
+		return true
+	}
+
+	// The rejection must be stale if "rejected" does not match next - 1. This
+	// is because non-replicating followers are probed one entry at a time.
+	if pr.Next-1 != rejected {
+		return false
+	}
+
+	if pr.Next = min(rejected, last+1); pr.Next < 1 {
+		pr.Next = 1
+	}
+	pr.ProbeSent = false
+	return true
+}
+
+// IsPaused returns whether sending log entries to this node has been throttled.
+// This is done when a node has rejected recent MsgApps, is currently waiting
+// for a snapshot, or has reached the MaxInflightMsgs limit. In normal
+// operation, this is false. A throttled node will be contacted less frequently
+// until it has reached a state in which it's able to accept a steady stream of
+// log entries again.
+func (pr *Progress) IsPaused() bool {
+	switch pr.State {
+	case StateProbe:
+		return pr.ProbeSent
+	case StateReplicate:
+		return pr.Inflights.Full()
+	case StateSnapshot:
+		return true
+	default:
+		panic("unexpected state")
+	}
+}
+
+func (pr *Progress) String() string {
+	return fmt.Sprintf("next = %d, match = %d, state = %s, waiting = %v, pendingSnapshot = %d, recentActive = %v, isLearner = %v",
+		pr.Next, pr.Match, pr.State, pr.IsPaused(), pr.PendingSnapshot, pr.RecentActive, pr.IsLearner)
+}

+ 231 - 0
raft/tracker/progress_test.go

@@ -0,0 +1,231 @@
+// Copyright 2015 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tracker
+
+import (
+	"testing"
+)
+
+func TestProgressIsPaused(t *testing.T) {
+	tests := []struct {
+		state  StateType
+		paused bool
+
+		w bool
+	}{
+		{StateProbe, false, false},
+		{StateProbe, true, true},
+		{StateReplicate, false, false},
+		{StateReplicate, true, false},
+		{StateSnapshot, false, true},
+		{StateSnapshot, true, true},
+	}
+	for i, tt := range tests {
+		p := &Progress{
+			State:     tt.state,
+			ProbeSent: tt.paused,
+			Inflights: NewInflights(256),
+		}
+		if g := p.IsPaused(); g != tt.w {
+			t.Errorf("#%d: paused= %t, want %t", i, g, tt.w)
+		}
+	}
+}
+
+// TestProgressResume ensures that MaybeUpdate and MaybeDecrTo will reset
+// ProbeSent.
+func TestProgressResume(t *testing.T) {
+	p := &Progress{
+		Next:      2,
+		ProbeSent: true,
+	}
+	p.MaybeDecrTo(1, 1)
+	if p.ProbeSent {
+		t.Errorf("paused= %v, want false", p.ProbeSent)
+	}
+	p.ProbeSent = true
+	p.MaybeUpdate(2)
+	if p.ProbeSent {
+		t.Errorf("paused= %v, want false", p.ProbeSent)
+	}
+}
+
+func TestProgressBecomeProbe(t *testing.T) {
+	match := uint64(1)
+	tests := []struct {
+		p     *Progress
+		wnext uint64
+	}{
+		{
+			&Progress{State: StateReplicate, Match: match, Next: 5, Inflights: NewInflights(256)},
+			2,
+		},
+		{
+			// snapshot finish
+			&Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 10, Inflights: NewInflights(256)},
+			11,
+		},
+		{
+			// snapshot failure
+			&Progress{State: StateSnapshot, Match: match, Next: 5, PendingSnapshot: 0, Inflights: NewInflights(256)},
+			2,
+		},
+	}
+	for i, tt := range tests {
+		tt.p.BecomeProbe()
+		if tt.p.State != StateProbe {
+			t.Errorf("#%d: state = %s, want %s", i, tt.p.State, StateProbe)
+		}
+		if tt.p.Match != match {
+			t.Errorf("#%d: match = %d, want %d", i, tt.p.Match, match)
+		}
+		if tt.p.Next != tt.wnext {
+			t.Errorf("#%d: next = %d, want %d", i, tt.p.Next, tt.wnext)
+		}
+	}
+}
+
+func TestProgressBecomeReplicate(t *testing.T) {
+	p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)}
+	p.BecomeReplicate()
+
+	if p.State != StateReplicate {
+		t.Errorf("state = %s, want %s", p.State, StateReplicate)
+	}
+	if p.Match != 1 {
+		t.Errorf("match = %d, want 1", p.Match)
+	}
+	if w := p.Match + 1; p.Next != w {
+		t.Errorf("next = %d, want %d", p.Next, w)
+	}
+}
+
+func TestProgressBecomeSnapshot(t *testing.T) {
+	p := &Progress{State: StateProbe, Match: 1, Next: 5, Inflights: NewInflights(256)}
+	p.BecomeSnapshot(10)
+
+	if p.State != StateSnapshot {
+		t.Errorf("state = %s, want %s", p.State, StateSnapshot)
+	}
+	if p.Match != 1 {
+		t.Errorf("match = %d, want 1", p.Match)
+	}
+	if p.PendingSnapshot != 10 {
+		t.Errorf("pendingSnapshot = %d, want 10", p.PendingSnapshot)
+	}
+}
+
+func TestProgressUpdate(t *testing.T) {
+	prevM, prevN := uint64(3), uint64(5)
+	tests := []struct {
+		update uint64
+
+		wm  uint64
+		wn  uint64
+		wok bool
+	}{
+		{prevM - 1, prevM, prevN, false},        // do not decrease match, next
+		{prevM, prevM, prevN, false},            // do not decrease next
+		{prevM + 1, prevM + 1, prevN, true},     // increase match, do not decrease next
+		{prevM + 2, prevM + 2, prevN + 1, true}, // increase match, next
+	}
+	for i, tt := range tests {
+		p := &Progress{
+			Match: prevM,
+			Next:  prevN,
+		}
+		ok := p.MaybeUpdate(tt.update)
+		if ok != tt.wok {
+			t.Errorf("#%d: ok= %v, want %v", i, ok, tt.wok)
+		}
+		if p.Match != tt.wm {
+			t.Errorf("#%d: match= %d, want %d", i, p.Match, tt.wm)
+		}
+		if p.Next != tt.wn {
+			t.Errorf("#%d: next= %d, want %d", i, p.Next, tt.wn)
+		}
+	}
+}
+
+func TestProgressMaybeDecr(t *testing.T) {
+	tests := []struct {
+		state    StateType
+		m        uint64
+		n        uint64
+		rejected uint64
+		last     uint64
+
+		w  bool
+		wn uint64
+	}{
+		{
+			// state replicate and rejected is not greater than match
+			StateReplicate, 5, 10, 5, 5, false, 10,
+		},
+		{
+			// state replicate and rejected is not greater than match
+			StateReplicate, 5, 10, 4, 4, false, 10,
+		},
+		{
+			// state replicate and rejected is greater than match
+			// directly decrease to match+1
+			StateReplicate, 5, 10, 9, 9, true, 6,
+		},
+		{
+			// next-1 != rejected is always false
+			StateProbe, 0, 0, 0, 0, false, 0,
+		},
+		{
+			// next-1 != rejected is always false
+			StateProbe, 0, 10, 5, 5, false, 10,
+		},
+		{
+			// next>1 = decremented by 1
+			StateProbe, 0, 10, 9, 9, true, 9,
+		},
+		{
+			// next>1 = decremented by 1
+			StateProbe, 0, 2, 1, 1, true, 1,
+		},
+		{
+			// next<=1 = reset to 1
+			StateProbe, 0, 1, 0, 0, true, 1,
+		},
+		{
+			// decrease to min(rejected, last+1)
+			StateProbe, 0, 10, 9, 2, true, 3,
+		},
+		{
+			// rejected < 1, reset to 1
+			StateProbe, 0, 10, 9, 0, true, 1,
+		},
+	}
+	for i, tt := range tests {
+		p := &Progress{
+			State: tt.state,
+			Match: tt.m,
+			Next:  tt.n,
+		}
+		if g := p.MaybeDecrTo(tt.rejected, tt.last); g != tt.w {
+			t.Errorf("#%d: maybeDecrTo= %t, want %t", i, g, tt.w)
+		}
+		if gm := p.Match; gm != tt.m {
+			t.Errorf("#%d: match= %d, want %d", i, gm, tt.m)
+		}
+		if gn := p.Next; gn != tt.wn {
+			t.Errorf("#%d: next= %d, want %d", i, gn, tt.wn)
+		}
+	}
+}

+ 42 - 0
raft/tracker/state.go

@@ -0,0 +1,42 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tracker
+
+// StateType is the state of a tracked follower.
+type StateType uint64
+
+const (
+	// StateProbe indicates a follower whose last index isn't known. Such a
+	// follower is "probed" (i.e. an append sent periodically) to narrow down
+	// its last index. In the ideal (and common) case, only one round of probing
+	// is necessary as the follower will react with a hint. Followers that are
+	// probed over extended periods of time are often offline.
+	StateProbe StateType = iota
+	// StateReplicate is the state steady in which a follower eagerly receives
+	// log entries to append to its log.
+	StateReplicate
+	// StateSnapshot indicates a follower that needs log entries not available
+	// from the leader's Raft log. Such a follower needs a full snapshot to
+	// return to StateReplicate.
+	StateSnapshot
+)
+
+var prstmap = [...]string{
+	"StateProbe",
+	"StateReplicate",
+	"StateSnapshot",
+}
+
+func (st StateType) String() string { return prstmap[uint64(st)] }

+ 195 - 0
raft/tracker/tracker.go

@@ -0,0 +1,195 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package tracker
+
+import (
+	"fmt"
+	"sort"
+
+	"go.etcd.io/etcd/raft/quorum"
+)
+
+// ProgressTracker tracks the currently active configuration and the information
+// known about the nodes and learners in it. In particular, it tracks the match
+// index for each peer which in turn allows reasoning about the committed index.
+type ProgressTracker struct {
+	Voters   quorum.JointConfig
+	Learners map[uint64]struct{}
+
+	Progress map[uint64]*Progress
+
+	Votes map[uint64]bool
+
+	MaxInflight int
+}
+
+// MakeProgressTracker initializes a ProgressTracker.
+func MakeProgressTracker(maxInflight int) ProgressTracker {
+	p := ProgressTracker{
+		MaxInflight: maxInflight,
+		Voters: quorum.JointConfig{
+			quorum.MajorityConfig{},
+			quorum.MajorityConfig{},
+		},
+		Learners: map[uint64]struct{}{},
+		Votes:    map[uint64]bool{},
+		Progress: map[uint64]*Progress{},
+	}
+	return p
+}
+
+// IsSingleton returns true if (and only if) there is only one voting member
+// (i.e. the leader) in the current configuration.
+func (p *ProgressTracker) IsSingleton() bool {
+	return len(p.Voters[0]) == 1 && len(p.Voters[1]) == 0
+}
+
+type matchAckIndexer map[uint64]*Progress
+
+var _ quorum.AckedIndexer = matchAckIndexer(nil)
+
+// AckedIndex implements IndexLookuper.
+func (l matchAckIndexer) AckedIndex(id uint64) (quorum.Index, bool) {
+	pr, ok := l[id]
+	if !ok {
+		return 0, false
+	}
+	return quorum.Index(pr.Match), true
+}
+
+// Committed returns the largest log index known to be committed based on what
+// the voting members of the group have acknowledged.
+func (p *ProgressTracker) Committed() uint64 {
+	return uint64(p.Voters.CommittedIndex(matchAckIndexer(p.Progress)))
+}
+
+// RemoveAny removes this peer, which *must* be tracked as a voter or learner,
+// from the tracker.
+func (p *ProgressTracker) RemoveAny(id uint64) {
+	_, okPR := p.Progress[id]
+	_, okV1 := p.Voters[0][id]
+	_, okV2 := p.Voters[1][id]
+	_, okL := p.Learners[id]
+
+	okV := okV1 || okV2
+
+	if !okPR {
+		panic("attempting to remove unknown peer %x")
+	} else if !okV && !okL {
+		panic("attempting to remove unknown peer %x")
+	} else if okV && okL {
+		panic(fmt.Sprintf("peer %x is both voter and learner", id))
+	}
+
+	delete(p.Voters[0], id)
+	delete(p.Voters[1], id)
+	delete(p.Learners, id)
+	delete(p.Progress, id)
+}
+
+// InitProgress initializes a new progress for the given node or learner. The
+// node may not exist yet in either form or a panic will ensue.
+func (p *ProgressTracker) InitProgress(id, match, next uint64, isLearner bool) {
+	if pr := p.Progress[id]; pr != nil {
+		panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
+	}
+	if !isLearner {
+		p.Voters[0][id] = struct{}{}
+	} else {
+		p.Learners[id] = struct{}{}
+	}
+	p.Progress[id] = &Progress{Next: next, Match: match, Inflights: NewInflights(p.MaxInflight), IsLearner: isLearner}
+}
+
+// Visit invokes the supplied closure for all tracked progresses.
+func (p *ProgressTracker) Visit(f func(id uint64, pr *Progress)) {
+	for id, pr := range p.Progress {
+		f(id, pr)
+	}
+}
+
+// QuorumActive returns true if the quorum is active from the view of the local
+// raft state machine. Otherwise, it returns false.
+func (p *ProgressTracker) QuorumActive() bool {
+	votes := map[uint64]bool{}
+	p.Visit(func(id uint64, pr *Progress) {
+		if pr.IsLearner {
+			return
+		}
+		votes[id] = pr.RecentActive
+	})
+
+	return p.Voters.VoteResult(votes) == quorum.VoteWon
+}
+
+// VoterNodes returns a sorted slice of voters.
+func (p *ProgressTracker) VoterNodes() []uint64 {
+	m := p.Voters.IDs()
+	nodes := make([]uint64, 0, len(m))
+	for id := range m {
+		nodes = append(nodes, id)
+	}
+	sort.Slice(nodes, func(i, j int) bool { return nodes[i] < nodes[j] })
+	return nodes
+}
+
+// LearnerNodes returns a sorted slice of learners.
+func (p *ProgressTracker) LearnerNodes() []uint64 {
+	nodes := make([]uint64, 0, len(p.Learners))
+	for id := range p.Learners {
+		nodes = append(nodes, id)
+	}
+	sort.Slice(nodes, func(i, j int) bool { return nodes[i] < nodes[j] })
+	return nodes
+}
+
+// ResetVotes prepares for a new round of vote counting via recordVote.
+func (p *ProgressTracker) ResetVotes() {
+	p.Votes = map[uint64]bool{}
+}
+
+// RecordVote records that the node with the given id voted for this Raft
+// instance if v == true (and declined it otherwise).
+func (p *ProgressTracker) RecordVote(id uint64, v bool) {
+	_, ok := p.Votes[id]
+	if !ok {
+		p.Votes[id] = v
+	}
+}
+
+// TallyVotes returns the number of granted and rejected Votes, and whether the
+// election outcome is known.
+func (p *ProgressTracker) TallyVotes() (granted int, rejected int, _ quorum.VoteResult) {
+	// Make sure to populate granted/rejected correctly even if the Votes slice
+	// contains members no longer part of the configuration. This doesn't really
+	// matter in the way the numbers are used (they're informational), but might
+	// as well get it right.
+	for id, pr := range p.Progress {
+		if pr.IsLearner {
+			continue
+		}
+		v, voted := p.Votes[id]
+		if !voted {
+			continue
+		}
+		if v {
+			granted++
+		} else {
+			rejected++
+		}
+	}
+	result := p.Voters.VoteResult(p.Votes)
+	return granted, rejected, result
+}