Procházet zdrojové kódy

raft: use half-populated joint quorum

To ease a future transition into joint quorums, this commit removes the
previous "ad-hoc" majority-based quorum and vote computations with that
introduced in the `raft/quorum` package.

More specifically, the progressTracker now uses a quorum.JointConfig for
which the "second" majority quorum is always empty; in this case the
quorum behaves like the one quorum.MajorityConfig that is actually
present. Or, more briefly, this change is a no-op, but it will take the
busywork out of actually starting to make use of joint quorums in the
future.

On a side node, I suspect that this might've fixed a bug regarding the
read index though I haven't been able to explicitly come up with a
counter-example. The problem was that the acks collected for the read
index weren't taking into account membership changes, so they'd run the
danger of using acks from nodes since removed to claim that a quorum of
acks had been received. There's a chance that there isn't a
counter-example (the only guarantee extracted from the "quorum" is that
there isn't another leader, but even if there's another leader all that
matters is that that leader doesn't have a divergent history from the
stale leader in the hypothetical counter-example), but either way there
is morally a bug here that is now fixed because VoteCommitted doesn't
care about votes from members that are not voters known to the currently
active configuration.
Tobias Schottdorf před 6 roky
rodič
revize
e039629907
4 změnil soubory, kde provedl 69 přidání a 68 odebrání
  1. 47 46
      raft/progress.go
  2. 11 16
      raft/raft.go
  3. 3 2
      raft/raft_test.go
  4. 8 4
      raft/read_only.go

+ 47 - 46
raft/progress.go

@@ -17,6 +17,8 @@ package raft
 import (
 	"fmt"
 	"sort"
+
+	"go.etcd.io/etcd/raft/quorum"
 )
 
 const (
@@ -291,23 +293,25 @@ func (in *inflights) reset() {
 // known about the nodes and learners in it. In particular, it tracks the match
 // index for each peer which in turn allows reasoning about the committed index.
 type progressTracker struct {
-	nodes    map[uint64]struct{}
+	voters   quorum.JointConfig
 	learners map[uint64]struct{}
 	prs      map[uint64]*Progress
 
 	votes map[uint64]bool
 
 	maxInflight int
-	matchBuf    uint64Slice
 }
 
 func makeProgressTracker(maxInflight int) progressTracker {
 	p := progressTracker{
 		maxInflight: maxInflight,
-		prs:         map[uint64]*Progress{},
-		nodes:       map[uint64]struct{}{},
-		learners:    map[uint64]struct{}{},
-		votes:       map[uint64]bool{},
+		voters: quorum.JointConfig{
+			quorum.MajorityConfig{},
+			quorum.MajorityConfig{},
+		},
+		learners: map[uint64]struct{}{},
+		votes:    map[uint64]bool{},
+		prs:      map[uint64]*Progress{},
 	}
 	return p
 }
@@ -315,40 +319,35 @@ func makeProgressTracker(maxInflight int) progressTracker {
 // isSingleton returns true if (and only if) there is only one voting member
 // (i.e. the leader) in the current configuration.
 func (p *progressTracker) isSingleton() bool {
-	return len(p.nodes) == 1
+	return len(p.voters[0]) == 1 && len(p.voters[1]) == 0
 }
 
-func (p *progressTracker) quorum() int {
-	return len(p.nodes)/2 + 1
-}
+type progressAckIndexer map[uint64]*Progress
 
-func (p *progressTracker) hasQuorum(m map[uint64]struct{}) bool {
-	return len(m) >= p.quorum()
+var _ quorum.AckedIndexer = progressAckIndexer(nil)
+
+func (l progressAckIndexer) AckedIndex(id uint64) (quorum.Index, bool) {
+	pr, ok := l[id]
+	if !ok {
+		return 0, false
+	}
+	return quorum.Index(pr.Match), true
 }
 
 // committed returns the largest log index known to be committed based on what
 // the voting members of the group have acknowledged.
 func (p *progressTracker) committed() uint64 {
-	// Preserving matchBuf across calls is an optimization
-	// used to avoid allocating a new slice on each call.
-	if cap(p.matchBuf) < len(p.nodes) {
-		p.matchBuf = make(uint64Slice, len(p.nodes))
-	}
-	p.matchBuf = p.matchBuf[:len(p.nodes)]
-	idx := 0
-	for id := range p.nodes {
-		p.matchBuf[idx] = p.prs[id].Match
-		idx++
-	}
-	sort.Sort(&p.matchBuf)
-	return p.matchBuf[len(p.matchBuf)-p.quorum()]
+	return uint64(p.voters.CommittedIndex(progressAckIndexer(p.prs)))
 }
 
 func (p *progressTracker) removeAny(id uint64) {
 	_, okPR := p.prs[id]
-	_, okV := p.nodes[id]
+	_, okV1 := p.voters[0][id]
+	_, okV2 := p.voters[1][id]
 	_, okL := p.learners[id]
 
+	okV := okV1 || okV2
+
 	if !okPR {
 		panic("attempting to remove unknown peer %x")
 	} else if !okV && !okL {
@@ -357,7 +356,8 @@ func (p *progressTracker) removeAny(id uint64) {
 		panic(fmt.Sprintf("peer %x is both voter and learner", id))
 	}
 
-	delete(p.nodes, id)
+	delete(p.voters[0], id)
+	delete(p.voters[1], id)
 	delete(p.learners, id)
 	delete(p.prs, id)
 }
@@ -369,7 +369,7 @@ func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) {
 		panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
 	}
 	if !isLearner {
-		p.nodes[id] = struct{}{}
+		p.voters[0][id] = struct{}{}
 	} else {
 		p.learners[id] = struct{}{}
 	}
@@ -391,19 +391,21 @@ func (p *progressTracker) visit(f func(id uint64, pr *Progress)) {
 // the view of the local raft state machine. Otherwise, it returns
 // false.
 func (p *progressTracker) quorumActive() bool {
-	var act int
+	votes := map[uint64]bool{}
 	p.visit(func(id uint64, pr *Progress) {
-		if pr.RecentActive && !pr.IsLearner {
-			act++
+		if pr.IsLearner {
+			return
 		}
+		votes[id] = pr.RecentActive
 	})
 
-	return act >= p.quorum()
+	return p.voters.VoteResult(votes) == quorum.VoteWon
 }
 
 func (p *progressTracker) voterNodes() []uint64 {
-	nodes := make([]uint64, 0, len(p.nodes))
-	for id := range p.nodes {
+	m := p.voters.IDs()
+	nodes := make([]uint64, 0, len(m))
+	for id := range m {
 		nodes = append(nodes, id)
 	}
 	sort.Sort(uint64Slice(nodes))
@@ -435,22 +437,21 @@ func (p *progressTracker) recordVote(id uint64, v bool) {
 
 // tallyVotes returns the number of granted and rejected votes, and whether the
 // election outcome is known.
-func (p *progressTracker) tallyVotes() (granted int, rejected int, result electionResult) {
-	for _, v := range p.votes {
-		if v {
+func (p *progressTracker) tallyVotes() (granted int, rejected int, _ quorum.VoteResult) {
+	// Make sure to populate granted/rejected correctly even if the votes slice
+	// contains members no longer part of the configuration. This doesn't really
+	// matter in the way the numbers are used (they're informational), but might
+	// as well get it right.
+	for id, pr := range p.prs {
+		if pr.IsLearner {
+			continue
+		}
+		if p.votes[id] {
 			granted++
 		} else {
 			rejected++
 		}
 	}
-
-	q := p.quorum()
-
-	result = electionIndeterminate
-	if granted >= q {
-		result = electionWon
-	} else if rejected >= q {
-		result = electionLost
-	}
+	result := p.voters.VoteResult(p.votes)
 	return granted, rejected, result
 }

+ 11 - 16
raft/raft.go

@@ -24,6 +24,7 @@ import (
 	"sync"
 	"time"
 
+	"go.etcd.io/etcd/raft/quorum"
 	pb "go.etcd.io/etcd/raft/raftpb"
 )
 
@@ -744,7 +745,7 @@ func (r *raft) campaign(t CampaignType) {
 		voteMsg = pb.MsgVote
 		term = r.Term
 	}
-	if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == electionWon {
+	if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
 		// We won the election after voting for ourselves (which must mean that
 		// this is a single-node cluster). Advance to the next state.
 		if t == campaignPreElection {
@@ -754,7 +755,7 @@ func (r *raft) campaign(t CampaignType) {
 		}
 		return
 	}
-	for id := range r.prs.nodes {
+	for id := range r.prs.voters.IDs() {
 		if id == r.id {
 			continue
 		}
@@ -769,15 +770,7 @@ func (r *raft) campaign(t CampaignType) {
 	}
 }
 
-type electionResult byte
-
-const (
-	electionIndeterminate electionResult = iota
-	electionLost
-	electionWon
-)
-
-func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result electionResult) {
+func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
 	if v {
 		r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
 	} else {
@@ -999,7 +992,9 @@ func stepLeader(r *raft, m pb.Message) error {
 		r.bcastAppend()
 		return nil
 	case pb.MsgReadIndex:
-		if !r.prs.isSingleton() { // more than one voting member in cluster
+		// If more than the local vote is needed, go through a full broadcast,
+		// otherwise optimize.
+		if !r.prs.isSingleton() {
 			if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
 				// Reject read only request when this leader has not committed any log entry at its term.
 				return nil
@@ -1110,7 +1105,7 @@ func stepLeader(r *raft, m pb.Message) error {
 			return nil
 		}
 
-		if !r.prs.hasQuorum(r.readOnly.recvAck(m.From, m.Context)) {
+		if r.prs.voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
 			return nil
 		}
 
@@ -1210,14 +1205,14 @@ func stepCandidate(r *raft, m pb.Message) error {
 		gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
 		r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
 		switch res {
-		case electionWon:
+		case quorum.VoteWon:
 			if r.state == StatePreCandidate {
 				r.campaign(campaignElection)
 			} else {
 				r.becomeLeader()
 				r.bcastAppend()
 			}
-		case electionLost:
+		case quorum.VoteLost:
 			// pb.MsgPreVoteResp contains future term of pre-candidate
 			// m.Term > r.Term; reuse r.Term
 			r.becomeFollower(r.Term, None)
@@ -1417,7 +1412,7 @@ func (r *raft) removeNode(id uint64) {
 	r.prs.removeAny(id)
 
 	// Do not try to commit or abort transferring if the cluster is now empty.
-	if len(r.prs.nodes) == 0 && len(r.prs.learners) == 0 {
+	if len(r.prs.voters[0]) == 0 && len(r.prs.learners) == 0 {
 		return
 	}
 

+ 3 - 2
raft/raft_test.go

@@ -4334,7 +4334,8 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
 				learners[i] = true
 			}
 			v.id = id
-			v.prs.nodes = make(map[uint64]struct{})
+			v.prs.voters[0] = make(map[uint64]struct{})
+			v.prs.voters[1] = make(map[uint64]struct{})
 			v.prs.learners = make(map[uint64]struct{})
 			v.prs.prs = make(map[uint64]*Progress)
 			for i := 0; i < size; i++ {
@@ -4343,7 +4344,7 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
 					pr.IsLearner = true
 					v.prs.learners[peerAddrs[i]] = struct{}{}
 				} else {
-					v.prs.nodes[peerAddrs[i]] = struct{}{}
+					v.prs.voters[0][peerAddrs[i]] = struct{}{}
 				}
 				v.prs.prs[peerAddrs[i]] = pr
 			}

+ 8 - 4
raft/read_only.go

@@ -29,7 +29,11 @@ type ReadState struct {
 type readIndexStatus struct {
 	req   pb.Message
 	index uint64
-	acks  map[uint64]struct{}
+	// NB: this never records 'false', but it's more convenient to use this
+	// instead of a map[uint64]struct{} due to the API of quorum.VoteResult. If
+	// this becomes performance sensitive enough (doubtful), quorum.VoteResult
+	// can change to an API that is closer to that of CommittedIndex.
+	acks map[uint64]bool
 }
 
 type readOnly struct {
@@ -54,20 +58,20 @@ func (ro *readOnly) addRequest(index uint64, m pb.Message) {
 	if _, ok := ro.pendingReadIndex[s]; ok {
 		return
 	}
-	ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
+	ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)}
 	ro.readIndexQueue = append(ro.readIndexQueue, s)
 }
 
 // recvAck notifies the readonly struct that the raft state machine received
 // an acknowledgment of the heartbeat that attached with the read only request
 // context.
-func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]struct{} {
+func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]bool {
 	rs, ok := ro.pendingReadIndex[string(context)]
 	if !ok {
 		return nil
 	}
 
-	rs.acks[id] = struct{}{}
+	rs.acks[id] = true
 	return rs.acks
 }