Browse Source

Merge pull request #10779 from tbg/jointq-pr

raft: use half-populated joint quorum
Tobias Grieger 6 years ago
parent
commit
755aab6990

+ 1 - 0
go.mod

@@ -3,6 +3,7 @@ module go.etcd.io/etcd
 require (
 require (
 	github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
 	github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 // indirect
 	github.com/bgentry/speakeasy v0.1.0
 	github.com/bgentry/speakeasy v0.1.0
+	github.com/cockroachdb/datadriven v0.0.0-20190531201743-edce55837238
 	github.com/coreos/go-semver v0.2.0
 	github.com/coreos/go-semver v0.2.0
 	github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7
 	github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7
 	github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf
 	github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf

+ 2 - 0
go.sum

@@ -2,6 +2,8 @@ github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLM
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
 github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY=
 github.com/bgentry/speakeasy v0.1.0 h1:ByYyxL9InA1OWqxJqqp2A5pYHUrCiAL6K3J+LKSsQkY=
 github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
 github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
+github.com/cockroachdb/datadriven v0.0.0-20190531201743-edce55837238 h1:uNljlOxtOHrPnRoPPx+JanqjAGZpNiqAGVBfGskd/pg=
+github.com/cockroachdb/datadriven v0.0.0-20190531201743-edce55837238/go.mod h1:zn76sxSg3SzpJ0PPJaLDCu+Bu0Lg3sKTORVIj19EIF8=
 github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY=
 github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY=
 github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
 github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
 github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 h1:u9SHYsPQNyt5tgDm3YN7+9dYrpK96E5wFilTFWIDZOM=
 github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 h1:u9SHYsPQNyt5tgDm3YN7+9dYrpK96E5wFilTFWIDZOM=

+ 63 - 66
raft/progress.go

@@ -17,6 +17,8 @@ package raft
 import (
 import (
 	"fmt"
 	"fmt"
 	"sort"
 	"sort"
+
+	"go.etcd.io/etcd/raft/quorum"
 )
 )
 
 
 const (
 const (
@@ -291,21 +293,25 @@ func (in *inflights) reset() {
 // known about the nodes and learners in it. In particular, it tracks the match
 // known about the nodes and learners in it. In particular, it tracks the match
 // index for each peer which in turn allows reasoning about the committed index.
 // index for each peer which in turn allows reasoning about the committed index.
 type progressTracker struct {
 type progressTracker struct {
-	nodes    map[uint64]*Progress
-	learners map[uint64]*Progress
+	voters   quorum.JointConfig
+	learners map[uint64]struct{}
+	prs      map[uint64]*Progress
 
 
 	votes map[uint64]bool
 	votes map[uint64]bool
 
 
 	maxInflight int
 	maxInflight int
-	matchBuf    uint64Slice
 }
 }
 
 
-func makePRS(maxInflight int) progressTracker {
+func makeProgressTracker(maxInflight int) progressTracker {
 	p := progressTracker{
 	p := progressTracker{
 		maxInflight: maxInflight,
 		maxInflight: maxInflight,
-		nodes:       map[uint64]*Progress{},
-		learners:    map[uint64]*Progress{},
-		votes:       map[uint64]bool{},
+		voters: quorum.JointConfig{
+			quorum.MajorityConfig{},
+			quorum.MajorityConfig{},
+		},
+		learners: map[uint64]struct{}{},
+		votes:    map[uint64]bool{},
+		prs:      map[uint64]*Progress{},
 	}
 	}
 	return p
 	return p
 }
 }
@@ -313,80 +319,70 @@ func makePRS(maxInflight int) progressTracker {
 // isSingleton returns true if (and only if) there is only one voting member
 // isSingleton returns true if (and only if) there is only one voting member
 // (i.e. the leader) in the current configuration.
 // (i.e. the leader) in the current configuration.
 func (p *progressTracker) isSingleton() bool {
 func (p *progressTracker) isSingleton() bool {
-	return len(p.nodes) == 1
+	return len(p.voters[0]) == 1 && len(p.voters[1]) == 0
 }
 }
 
 
-func (p *progressTracker) quorum() int {
-	return len(p.nodes)/2 + 1
-}
+type progressAckIndexer map[uint64]*Progress
+
+var _ quorum.AckedIndexer = progressAckIndexer(nil)
 
 
-func (p *progressTracker) hasQuorum(m map[uint64]struct{}) bool {
-	return len(m) >= p.quorum()
+func (l progressAckIndexer) AckedIndex(id uint64) (quorum.Index, bool) {
+	pr, ok := l[id]
+	if !ok {
+		return 0, false
+	}
+	return quorum.Index(pr.Match), true
 }
 }
 
 
 // committed returns the largest log index known to be committed based on what
 // committed returns the largest log index known to be committed based on what
 // the voting members of the group have acknowledged.
 // the voting members of the group have acknowledged.
 func (p *progressTracker) committed() uint64 {
 func (p *progressTracker) committed() uint64 {
-	// Preserving matchBuf across calls is an optimization
-	// used to avoid allocating a new slice on each call.
-	if cap(p.matchBuf) < len(p.nodes) {
-		p.matchBuf = make(uint64Slice, len(p.nodes))
-	}
-	p.matchBuf = p.matchBuf[:len(p.nodes)]
-	idx := 0
-	for _, pr := range p.nodes {
-		p.matchBuf[idx] = pr.Match
-		idx++
-	}
-	sort.Sort(&p.matchBuf)
-	return p.matchBuf[len(p.matchBuf)-p.quorum()]
+	return uint64(p.voters.CommittedIndex(progressAckIndexer(p.prs)))
 }
 }
 
 
 func (p *progressTracker) removeAny(id uint64) {
 func (p *progressTracker) removeAny(id uint64) {
-	pN := p.nodes[id]
-	pL := p.learners[id]
+	_, okPR := p.prs[id]
+	_, okV1 := p.voters[0][id]
+	_, okV2 := p.voters[1][id]
+	_, okL := p.learners[id]
+
+	okV := okV1 || okV2
 
 
-	if pN == nil && pL == nil {
+	if !okPR {
+		panic("attempting to remove unknown peer %x")
+	} else if !okV && !okL {
 		panic("attempting to remove unknown peer %x")
 		panic("attempting to remove unknown peer %x")
-	} else if pN != nil && pL != nil {
+	} else if okV && okL {
 		panic(fmt.Sprintf("peer %x is both voter and learner", id))
 		panic(fmt.Sprintf("peer %x is both voter and learner", id))
 	}
 	}
 
 
-	delete(p.nodes, id)
+	delete(p.voters[0], id)
+	delete(p.voters[1], id)
 	delete(p.learners, id)
 	delete(p.learners, id)
+	delete(p.prs, id)
 }
 }
 
 
 // initProgress initializes a new progress for the given node or learner. The
 // initProgress initializes a new progress for the given node or learner. The
 // node may not exist yet in either form or a panic will ensue.
 // node may not exist yet in either form or a panic will ensue.
 func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) {
 func (p *progressTracker) initProgress(id, match, next uint64, isLearner bool) {
-	if pr := p.nodes[id]; pr != nil {
+	if pr := p.prs[id]; pr != nil {
 		panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
 		panic(fmt.Sprintf("peer %x already tracked as node %v", id, pr))
 	}
 	}
-	if pr := p.learners[id]; pr != nil {
-		panic(fmt.Sprintf("peer %x already tracked as learner %v", id, pr))
-	}
 	if !isLearner {
 	if !isLearner {
-		p.nodes[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight)}
-		return
+		p.voters[0][id] = struct{}{}
+	} else {
+		p.learners[id] = struct{}{}
 	}
 	}
-	p.learners[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: true}
+	p.prs[id] = &Progress{Next: next, Match: match, ins: newInflights(p.maxInflight), IsLearner: isLearner}
 }
 }
 
 
 func (p *progressTracker) getProgress(id uint64) *Progress {
 func (p *progressTracker) getProgress(id uint64) *Progress {
-	if pr, ok := p.nodes[id]; ok {
-		return pr
-	}
-
-	return p.learners[id]
+	return p.prs[id]
 }
 }
 
 
 // visit invokes the supplied closure for all tracked progresses.
 // visit invokes the supplied closure for all tracked progresses.
 func (p *progressTracker) visit(f func(id uint64, pr *Progress)) {
 func (p *progressTracker) visit(f func(id uint64, pr *Progress)) {
-	for id, pr := range p.nodes {
-		f(id, pr)
-	}
-
-	for id, pr := range p.learners {
+	for id, pr := range p.prs {
 		f(id, pr)
 		f(id, pr)
 	}
 	}
 }
 }
@@ -395,19 +391,21 @@ func (p *progressTracker) visit(f func(id uint64, pr *Progress)) {
 // the view of the local raft state machine. Otherwise, it returns
 // the view of the local raft state machine. Otherwise, it returns
 // false.
 // false.
 func (p *progressTracker) quorumActive() bool {
 func (p *progressTracker) quorumActive() bool {
-	var act int
+	votes := map[uint64]bool{}
 	p.visit(func(id uint64, pr *Progress) {
 	p.visit(func(id uint64, pr *Progress) {
-		if pr.RecentActive && !pr.IsLearner {
-			act++
+		if pr.IsLearner {
+			return
 		}
 		}
+		votes[id] = pr.RecentActive
 	})
 	})
 
 
-	return act >= p.quorum()
+	return p.voters.VoteResult(votes) == quorum.VoteWon
 }
 }
 
 
 func (p *progressTracker) voterNodes() []uint64 {
 func (p *progressTracker) voterNodes() []uint64 {
-	nodes := make([]uint64, 0, len(p.nodes))
-	for id := range p.nodes {
+	m := p.voters.IDs()
+	nodes := make([]uint64, 0, len(m))
+	for id := range m {
 		nodes = append(nodes, id)
 		nodes = append(nodes, id)
 	}
 	}
 	sort.Sort(uint64Slice(nodes))
 	sort.Sort(uint64Slice(nodes))
@@ -439,22 +437,21 @@ func (p *progressTracker) recordVote(id uint64, v bool) {
 
 
 // tallyVotes returns the number of granted and rejected votes, and whether the
 // tallyVotes returns the number of granted and rejected votes, and whether the
 // election outcome is known.
 // election outcome is known.
-func (p *progressTracker) tallyVotes() (granted int, rejected int, result electionResult) {
-	for _, v := range p.votes {
-		if v {
+func (p *progressTracker) tallyVotes() (granted int, rejected int, _ quorum.VoteResult) {
+	// Make sure to populate granted/rejected correctly even if the votes slice
+	// contains members no longer part of the configuration. This doesn't really
+	// matter in the way the numbers are used (they're informational), but might
+	// as well get it right.
+	for id, pr := range p.prs {
+		if pr.IsLearner {
+			continue
+		}
+		if p.votes[id] {
 			granted++
 			granted++
 		} else {
 		} else {
 			rejected++
 			rejected++
 		}
 		}
 	}
 	}
-
-	q := p.quorum()
-
-	result = electionIndeterminate
-	if granted >= q {
-		result = electionWon
-	} else if rejected >= q {
-		result = electionLost
-	}
+	result := p.voters.VoteResult(p.votes)
 	return granted, rejected, result
 	return granted, rejected, result
 }
 }

+ 40 - 0
raft/quorum/bench_test.go

@@ -0,0 +1,40 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package quorum
+
+import (
+	"fmt"
+	"math"
+	"math/rand"
+	"testing"
+)
+
+func BenchmarkMajorityConfig_CommittedIndex(b *testing.B) {
+	// go test -run - -bench . -benchmem ./raft/quorum
+	for _, n := range []int{1, 3, 5, 7, 9, 11} {
+		b.Run(fmt.Sprintf("voters=%d", n), func(b *testing.B) {
+			c := MajorityConfig{}
+			l := mapAckIndexer{}
+			for i := uint64(0); i < uint64(n); i++ {
+				c[i+1] = struct{}{}
+				l[i+1] = Index(rand.Int63n(math.MaxInt64))
+			}
+
+			for i := 0; i < b.N; i++ {
+				_ = c.CommittedIndex(l)
+			}
+		})
+	}
+}

+ 250 - 0
raft/quorum/datadriven_test.go

@@ -0,0 +1,250 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package quorum
+
+import (
+	"fmt"
+	"strings"
+	"testing"
+
+	"github.com/cockroachdb/datadriven"
+)
+
+// TestDataDriven parses and executes the test cases in ./testdata/*. An entry
+// in such a file specifies the command, which is either of "committed" to check
+// CommittedIndex or "vote" to verify a VoteResult. The underlying configuration
+// and inputs are specified via the arguments 'cfg' and 'cfgj' (for the majority
+// config and, optionally, majority config joint to the first one) and 'idx'
+// (for CommittedIndex) and 'votes' (for VoteResult).
+//
+// Internally, the harness runs some additional checks on each test case for
+// which it is known that the result shouldn't change. For example,
+// interchanging the majority configurations of a joint quorum must not
+// influence the result; if it does, this is noted in the test's output.
+func TestDataDriven(t *testing.T) {
+	datadriven.Walk(t, "testdata", func(t *testing.T, path string) {
+		datadriven.RunTest(t, path, func(d *datadriven.TestData) string {
+			// Two majority configs. The first one is always used (though it may
+			// be empty) and the second one is used iff joint is true.
+			var joint bool
+			var ids, idsj []uint64
+			// The committed indexes for the nodes in the config in the order in
+			// which they appear in (ids,idsj), without repetition. An underscore
+			// denotes an omission (i.e. no information for this voter); this is
+			// different from 0. For example,
+			//
+			// cfg=(1,2) cfgj=(2,3,4) idxs=(_,5,_,7) initializes the idx for voter 2
+			// to 5 and that for voter 4 to 7 (and no others).
+			//
+			// cfgj=zero is specified to instruct the test harness to treat cfgj
+			// as zero instead of not specified (i.e. it will trigger a joint
+			// quorum test instead of a majority quorum test for cfg only).
+			var idxs []Index
+			// Votes. These are initialized similar to idxs except the only values
+			// used are 1 (voted against) and 2 (voted for). This looks awkward,
+			// but is convenient because it allows sharing code between the two.
+			var votes []Index
+
+			// Parse the args.
+			for _, arg := range d.CmdArgs {
+				for i := range arg.Vals {
+					switch arg.Key {
+					case "cfg":
+						var n uint64
+						arg.Scan(t, i, &n)
+						ids = append(ids, n)
+					case "cfgj":
+						joint = true
+						if arg.Vals[i] == "zero" {
+							if len(arg.Vals) != 1 {
+								t.Fatalf("cannot mix 'zero' into configuration")
+							}
+						} else {
+							var n uint64
+							arg.Scan(t, i, &n)
+							idsj = append(idsj, n)
+						}
+					case "idx":
+						var n uint64
+						// Register placeholders as zeroes.
+						if arg.Vals[i] != "_" {
+							arg.Scan(t, i, &n)
+							if n == 0 {
+								// This is a restriction caused by the above
+								// special-casing for _.
+								t.Fatalf("cannot use 0 as idx")
+							}
+						}
+						idxs = append(idxs, Index(n))
+					case "votes":
+						var s string
+						arg.Scan(t, i, &s)
+						switch s {
+						case "y":
+							votes = append(votes, 2)
+						case "n":
+							votes = append(votes, 1)
+						case "_":
+							votes = append(votes, 0)
+						default:
+							t.Fatalf("unknown vote: %s", s)
+						}
+					default:
+						t.Fatalf("unknown arg %s", arg.Key)
+					}
+				}
+			}
+
+			// Build the two majority configs.
+			c := MajorityConfig{}
+			for _, id := range ids {
+				c[id] = struct{}{}
+			}
+			cj := MajorityConfig{}
+			for _, id := range idsj {
+				cj[id] = struct{}{}
+			}
+
+			// Helper that returns an AckedIndexer which has the specified indexes
+			// mapped to the right IDs.
+			makeLookuper := func(idxs []Index, ids, idsj []uint64) mapAckIndexer {
+				l := mapAckIndexer{}
+				var p int // next to consume from idxs
+				for _, id := range append(append([]uint64(nil), ids...), idsj...) {
+					if _, ok := l[id]; ok {
+						continue
+					}
+					if p < len(idxs) {
+						// NB: this creates zero entries for placeholders that we remove later.
+						// The upshot of doing it that way is to avoid having to specify place-
+						// holders multiple times when omitting voters present in both halves of
+						// a joint config.
+						l[id] = idxs[p]
+						p++
+					}
+				}
+
+				for id := range l {
+					// Zero entries are created by _ placeholders; we don't want
+					// them in the lookuper because "no entry" is different from
+					// "zero entry". Note that we prevent tests from specifying
+					// zero commit indexes, so that there's no confusion between
+					// the two concepts.
+					if l[id] == 0 {
+						delete(l, id)
+					}
+				}
+				return l
+			}
+
+			{
+				input := idxs
+				if d.Cmd == "vote" {
+					input = votes
+				}
+				if voters := JointConfig([2]MajorityConfig{c, cj}).IDs(); len(voters) != len(input) {
+					return fmt.Sprintf("error: mismatched input (explicit or _) for voters %v: %v",
+						voters, input)
+				}
+			}
+
+			var buf strings.Builder
+			switch d.Cmd {
+			case "committed":
+				l := makeLookuper(idxs, ids, idsj)
+
+				// Branch based on whether this is a majority or joint quorum
+				// test case.
+				if !joint {
+					idx := c.CommittedIndex(l)
+					fmt.Fprintf(&buf, c.Describe(l))
+					// These alternative computations should return the same
+					// result. If not, print to the output.
+					if aIdx := alternativeMajorityCommittedIndex(c, l); aIdx != idx {
+						fmt.Fprintf(&buf, "%s <-- via alternative computation\n", aIdx)
+					}
+					// Joining a majority with the empty majority should give same result.
+					if aIdx := JointConfig([2]MajorityConfig{c, {}}).CommittedIndex(l); aIdx != idx {
+						fmt.Fprintf(&buf, "%s <-- via zero-joint quorum\n", aIdx)
+					}
+					// Joining a majority with itself should give same result.
+					if aIdx := JointConfig([2]MajorityConfig{c, c}).CommittedIndex(l); aIdx != idx {
+						fmt.Fprintf(&buf, "%s <-- via self-joint quorum\n", aIdx)
+					}
+					overlay := func(c MajorityConfig, l AckedIndexer, id uint64, idx Index) AckedIndexer {
+						ll := mapAckIndexer{}
+						for iid := range c {
+							if iid == id {
+								ll[iid] = idx
+							} else if idx, ok := l.AckedIndex(iid); ok {
+								ll[iid] = idx
+							}
+						}
+						return ll
+					}
+					for id := range c {
+						idx, _ := l.AckedIndex(id)
+						if idx > idx && idx > 0 {
+							// If the committed index was definitely above the currently
+							// inspected idx, the result shouldn't change if we lower it
+							// further.
+							lo := overlay(c, l, id, idx-1)
+							if aIdx := c.CommittedIndex(lo); aIdx != idx {
+								fmt.Fprintf(&buf, "%s <-- overlaying %d->%d", aIdx, id, idx)
+							}
+							lo = overlay(c, l, id, 0)
+							if aIdx := c.CommittedIndex(lo); aIdx != idx {
+								fmt.Fprintf(&buf, "%s <-- overlaying %d->0", aIdx, id)
+							}
+						}
+					}
+					fmt.Fprintf(&buf, "%s\n", idx)
+				} else {
+					cc := JointConfig([2]MajorityConfig{c, cj})
+					fmt.Fprintf(&buf, cc.Describe(l))
+					idx := cc.CommittedIndex(l)
+					// Interchanging the majorities shouldn't make a difference. If it does, print.
+					if aIdx := JointConfig([2]MajorityConfig{c, cj}).CommittedIndex(l); aIdx != idx {
+						fmt.Fprintf(&buf, "%s <-- via symmetry\n", aIdx)
+					}
+					fmt.Fprintf(&buf, "%s\n", idx)
+				}
+			case "vote":
+				ll := makeLookuper(votes, ids, idsj)
+				l := map[uint64]bool{}
+				for id, v := range ll {
+					l[id] = v != 1 // NB: 1 == false, 2 == true
+				}
+
+				if !joint {
+					// Test a majority quorum.
+					r := c.VoteResult(l)
+					fmt.Fprintf(&buf, "%v\n", r)
+				} else {
+					// Run a joint quorum test case.
+					r := JointConfig([2]MajorityConfig{c, cj}).VoteResult(l)
+					// Interchanging the majorities shouldn't make a difference. If it does, print.
+					if ar := JointConfig([2]MajorityConfig{cj, c}).VoteResult(l); ar != r {
+						fmt.Fprintf(&buf, "%v <-- via symmetry\n", ar)
+					}
+					fmt.Fprintf(&buf, "%v\n", r)
+				}
+			default:
+				t.Fatalf("unknown command: %s", d.Cmd)
+			}
+			return buf.String()
+		})
+	})
+}

+ 68 - 0
raft/quorum/joint.go

@@ -0,0 +1,68 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package quorum
+
+// JointConfig is a configuration of two groups of (possibly overlapping)
+// majority configurations. Decisions require the support of both majorities.
+type JointConfig [2]MajorityConfig
+
+// IDs returns a newly initialized map representing the set of voters present
+// in the joint configuration.
+func (c JointConfig) IDs() map[uint64]struct{} {
+	m := map[uint64]struct{}{}
+	for _, cc := range c {
+		for id := range cc {
+			m[id] = struct{}{}
+		}
+	}
+	return m
+}
+
+// Describe returns a (multi-line) representation of the commit indexes for the
+// given lookuper.
+func (c JointConfig) Describe(l AckedIndexer) string {
+	return MajorityConfig(c.IDs()).Describe(l)
+}
+
+// CommittedIndex returns the largest committed index for the given joint
+// quorum. An index is jointly committed if it is committed in both constituent
+// majorities.
+func (c JointConfig) CommittedIndex(l AckedIndexer) Index {
+	idx0 := c[0].CommittedIndex(l)
+	idx1 := c[1].CommittedIndex(l)
+	if idx0 < idx1 {
+		return idx0
+	}
+	return idx1
+}
+
+// VoteResult takes a mapping of voters to yes/no (true/false) votes and returns
+// a result indicating whether the vote is pending, lost, or won. A joint quorum
+// requires both majority quorums to vote in favor.
+func (c JointConfig) VoteResult(votes map[uint64]bool) VoteResult {
+	r1 := c[0].VoteResult(votes)
+	r2 := c[1].VoteResult(votes)
+
+	if r1 == r2 {
+		// If they agree, return the agreed state.
+		return r1
+	}
+	if r1 == VoteLost || r2 == VoteLost {
+		// If either config has lost, loss is the only possible outcome.
+		return VoteLost
+	}
+	// One side won, the other one is pending, so the whole outcome is.
+	return VotePending
+}

+ 184 - 0
raft/quorum/majority.go

@@ -0,0 +1,184 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package quorum
+
+import (
+	"fmt"
+	"math"
+	"sort"
+	"strings"
+)
+
+// MajorityConfig is a set of IDs that uses majority quorums to make decisions.
+type MajorityConfig map[uint64]struct{}
+
+// Describe returns a (multi-line) representation of the commit indexes for the
+// given lookuper.
+func (c MajorityConfig) Describe(l AckedIndexer) string {
+	if len(c) == 0 {
+		return "<empty majority quorum>"
+	}
+	type tup struct {
+		id  uint64
+		idx Index
+		ok  bool // idx found?
+		bar int  // length of bar displayed for this tup
+	}
+
+	// Below, populate .bar so that the i-th largest commit index has bar i (we
+	// plot this as sort of a progress bar). The actual code is a bit more
+	// complicated and also makes sure that equal index => equal bar.
+
+	n := len(c)
+	info := make([]tup, 0, n)
+	for id := range c {
+		idx, ok := l.AckedIndex(id)
+		info = append(info, tup{id: id, idx: idx, ok: ok})
+	}
+
+	// Sort by index
+	sort.Slice(info, func(i, j int) bool {
+		if info[i].idx == info[j].idx {
+			return info[i].id < info[j].id
+		}
+		return info[i].idx < info[j].idx
+	})
+
+	// Populate .bar.
+	for i := range info {
+		if i > 0 && info[i-1].idx < info[i].idx {
+			info[i].bar = i
+		}
+	}
+
+	// Sort by ID.
+	sort.Slice(info, func(i, j int) bool {
+		return info[i].id < info[j].id
+	})
+
+	var buf strings.Builder
+
+	// Print.
+	fmt.Fprint(&buf, strings.Repeat(" ", n)+"    idx\n")
+	for i := range info {
+		bar := info[i].bar
+		if !info[i].ok {
+			fmt.Fprint(&buf, "?"+strings.Repeat(" ", n))
+		} else {
+			fmt.Fprint(&buf, strings.Repeat("x", bar)+">"+strings.Repeat(" ", n-bar))
+		}
+		fmt.Fprintf(&buf, " %5d    (id=%d)\n", info[i].idx, info[i].id)
+	}
+	return buf.String()
+}
+
+type uint64Slice []uint64
+
+func insertionSort(sl uint64Slice) {
+	a, b := 0, len(sl)
+	for i := a + 1; i < b; i++ {
+		for j := i; j > a && sl[j] < sl[j-1]; j-- {
+			sl[j], sl[j-1] = sl[j-1], sl[j]
+		}
+	}
+}
+
+// CommittedIndex computes the committed index from those supplied via the
+// provided AckedIndexer (for the active config).
+func (c MajorityConfig) CommittedIndex(l AckedIndexer) Index {
+	n := len(c)
+	if n == 0 {
+		// This plays well with joint quorums which, when one half is the zero
+		// MajorityConfig, should behave like the other half.
+		return math.MaxUint64
+	}
+
+	// Use an on-stack slice to collect the committed indexes when n <= 7
+	// (otherwise we alloc). The alternative is to stash a slice on
+	// MajorityConfig, but this impairs usability (as is, MajorityConfig is just
+	// a map, and that's nice). The assumption is that running with a
+	// replication factor of >7 is rare, and in cases in which it happens
+	// performance is a lesser concern (additionally the performance
+	// implications of an allocation here are far from drastic).
+	var stk [7]uint64
+	srt := uint64Slice(stk[:])
+
+	if cap(srt) < n {
+		srt = make([]uint64, n)
+	}
+	srt = srt[:n]
+
+	{
+		// Fill the slice with the indexes observed. Any unused slots will be
+		// left as zero; these correspond to voters that may report in, but
+		// haven't yet. We fill from the right (since the zeroes will end up on
+		// the left after sorting below anyway).
+		i := n - 1
+		for id := range c {
+			if idx, ok := l.AckedIndex(id); ok {
+				srt[i] = uint64(idx)
+				i--
+			}
+		}
+	}
+
+	// Sort by index. Use a bespoke algorithm (copied from the stdlib's sort
+	// package) to keep srt on the stack.
+	insertionSort(srt)
+
+	// The smallest index into the array for which the value is acked by a
+	// quorum. In other words, from the end of the slice, move n/2+1 to the
+	// left (accounting for zero-indexing).
+	pos := n - (n/2 + 1)
+	return Index(srt[pos])
+}
+
+// VoteResult takes a mapping of voters to yes/no (true/false) votes and returns
+// a result indicating whether the vote is pending (i.e. neither a quorum of
+// yes/no has been reached), won (a quorum of yes has been reached), or lost (a
+// quorum of no has been reached).
+func (c MajorityConfig) VoteResult(votes map[uint64]bool) VoteResult {
+	if len(c) == 0 {
+		// By convention, the elections on an empty config win. This comes in
+		// handy with joint quorums because it'll make a half-populated joint
+		// quorum behave like a majority quorum.
+		return VoteWon
+	}
+
+	ny := [2]int{} // vote counts for no and yes, respectively
+
+	var missing int
+	for id := range c {
+		v, ok := votes[id]
+		if !ok {
+			missing++
+			continue
+		}
+		if v {
+			ny[1]++
+		} else {
+			ny[0]++
+		}
+	}
+
+	q := len(c)/2 + 1
+	if ny[1] >= q {
+		return VoteWon
+	}
+	if ny[1]+missing >= q {
+		return VotePending
+	}
+	return VoteLost
+}

+ 122 - 0
raft/quorum/quick_test.go

@@ -0,0 +1,122 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package quorum
+
+import (
+	"math"
+	"math/rand"
+	"reflect"
+	"testing"
+	"testing/quick"
+)
+
+// TestQuick uses quickcheck to heuristically assert that the main
+// implementation of (MajorityConfig).CommittedIndex agrees with a "dumb"
+// alternative version.
+func TestQuick(t *testing.T) {
+	cfg := &quick.Config{
+		MaxCount: 50000,
+	}
+
+	t.Run("majority_commit", func(t *testing.T) {
+		fn1 := func(c memberMap, l idxMap) uint64 {
+			return uint64(MajorityConfig(c).CommittedIndex(mapAckIndexer(l)))
+		}
+		fn2 := func(c memberMap, l idxMap) uint64 {
+			return uint64(alternativeMajorityCommittedIndex(MajorityConfig(c), mapAckIndexer(l)))
+		}
+		if err := quick.CheckEqual(fn1, fn2, cfg); err != nil {
+			t.Fatal(err)
+		}
+	})
+}
+
+// smallRandIdxMap returns a reasonably sized map of ids to commit indexes.
+func smallRandIdxMap(rand *rand.Rand, size int) map[uint64]Index {
+	// Hard-code a reasonably small size here (quick will hard-code 50, which
+	// is not useful here).
+	size = 10
+
+	n := rand.Intn(size)
+	ids := rand.Perm(2 * n)[:n]
+	idxs := make([]int, len(ids))
+	for i := range idxs {
+		idxs[i] = rand.Intn(n)
+	}
+
+	m := map[uint64]Index{}
+	for i := range ids {
+		m[uint64(ids[i])] = Index(idxs[i])
+	}
+	return m
+}
+
+type idxMap map[uint64]Index
+
+func (idxMap) Generate(rand *rand.Rand, size int) reflect.Value {
+	m := smallRandIdxMap(rand, size)
+	return reflect.ValueOf(m)
+}
+
+type memberMap map[uint64]struct{}
+
+func (memberMap) Generate(rand *rand.Rand, size int) reflect.Value {
+	m := smallRandIdxMap(rand, size)
+	mm := map[uint64]struct{}{}
+	for id := range m {
+		mm[id] = struct{}{}
+	}
+	return reflect.ValueOf(mm)
+}
+
+// This is an alternative implementation of (MajorityConfig).CommittedIndex(l).
+func alternativeMajorityCommittedIndex(c MajorityConfig, l AckedIndexer) Index {
+	if len(c) == 0 {
+		return math.MaxUint64
+	}
+
+	idToIdx := map[uint64]Index{}
+	for id := range c {
+		if idx, ok := l.AckedIndex(id); ok {
+			idToIdx[id] = idx
+		}
+	}
+
+	// Build a map from index to voters who have acked that or any higher index.
+	idxToVotes := map[Index]int{}
+	for _, idx := range idToIdx {
+		idxToVotes[idx] = 0
+	}
+
+	for _, idx := range idToIdx {
+		for idy := range idxToVotes {
+			if idy > idx {
+				continue
+			}
+			idxToVotes[idy]++
+		}
+	}
+
+	// Find the maximum index that has achieved quorum.
+	q := len(c)/2 + 1
+	var maxQuorumIdx Index
+	for idx, n := range idxToVotes {
+		if n >= q && idx > maxQuorumIdx {
+			maxQuorumIdx = idx
+		}
+	}
+
+	return maxQuorumIdx
+}

+ 57 - 0
raft/quorum/quorum.go

@@ -0,0 +1,57 @@
+// Copyright 2019 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package quorum
+
+import (
+	"math"
+	"strconv"
+)
+
+type Index uint64
+
+func (i Index) String() string {
+	if i == math.MaxUint64 {
+		return "∞"
+	}
+	return strconv.FormatUint(uint64(i), 10)
+}
+
+// AckedIndexer allows looking up a commit index for a given ID of a voter
+// from a corresponding MajorityConfig.
+type AckedIndexer interface {
+	AckedIndex(voterID uint64) (idx Index, found bool)
+}
+
+type mapAckIndexer map[uint64]Index
+
+func (m mapAckIndexer) AckedIndex(id uint64) (Index, bool) {
+	idx, ok := m[id]
+	return idx, ok
+}
+
+// VoteResult indicates the outcome of a vote.
+//
+//go:generate stringer -type=VoteResult
+type VoteResult uint8
+
+const (
+	// VotePending indicates that the decision of the vote depends on future
+	// votes, i.e. neither "yes" or "no" has reached quorum yet.
+	VotePending VoteResult = 1 + iota
+	// VoteLost indicates that the quorum has voted "no".
+	VoteLost
+	// VoteWon indicates that the quorum has voted "yes".
+	VoteWon
+)

+ 481 - 0
raft/quorum/testdata/joint_commit.txt

@@ -0,0 +1,481 @@
+# No difference between a simple majority quorum and a simple majority quorum
+# joint with an empty majority quorum. (This is asserted for all datadriven tests
+# by the framework, so we don't dwell on it more).
+#
+# Note that by specifying cfgj explicitly we tell the test harness to treat the
+# input as a joint quorum and not a majority quorum. If we didn't specify
+# cfgj=zero the test would pass just the same, but it wouldn't be exercising the
+# joint quorum path.
+committed cfg=(1,2,3) cfgj=zero idx=(100,101,99)
+----
+       idx
+x>     100    (id=1)
+xx>    101    (id=2)
+>       99    (id=3)
+100
+
+# Joint nonoverlapping singleton quorums.
+
+committed cfg=(1) cfgj=(2) idx=(_,_)
+----
+      idx
+?       0    (id=1)
+?       0    (id=2)
+0
+
+# Voter 1 has 100 committed, 2 nothing. This means we definitely won't commit
+# past 100.
+committed cfg=(1) cfgj=(2) idx=(100,_)
+----
+      idx
+x>    100    (id=1)
+?       0    (id=2)
+0
+
+# Committed index collapses once both majorities do, to the lower index.
+committed cfg=(1) cfgj=(2) idx=(13, 100)
+----
+      idx
+>      13    (id=1)
+x>    100    (id=2)
+13
+
+# Joint overlapping (i.e. identical) singleton quorum.
+
+committed cfg=(1) cfgj=(1) idx=(_)
+----
+     idx
+?      0    (id=1)
+0
+
+committed cfg=(1) cfgj=(1) idx=(100)
+----
+     idx
+>    100    (id=1)
+100
+
+
+
+# Two-node config joint with non-overlapping single node config
+committed cfg=(1,3) cfgj=(2) idx=(_,_,_)
+----
+       idx
+?        0    (id=1)
+?        0    (id=2)
+?        0    (id=3)
+0
+
+committed cfg=(1,3) cfgj=(2) idx=(100,_,_)
+----
+       idx
+xx>    100    (id=1)
+?        0    (id=2)
+?        0    (id=3)
+0
+
+# 1 has 100 committed, 2 has 50 (collapsing half of the joint quorum to 50).
+committed cfg=(1,3) cfgj=(2) idx=(100,_,50)
+----
+       idx
+xx>    100    (id=1)
+x>      50    (id=2)
+?        0    (id=3)
+0
+
+# 2 reports 45, collapsing the other half (to 45).
+committed cfg=(1,3) cfgj=(2) idx=(100,45,50)
+----
+       idx
+xx>    100    (id=1)
+x>      50    (id=2)
+>       45    (id=3)
+45
+
+# Two-node config with overlapping single-node config.
+
+committed cfg=(1,2) cfgj=(2) idx=(_,_)
+----
+      idx
+?       0    (id=1)
+?       0    (id=2)
+0
+
+# 1 reports 100.
+committed cfg=(1,2) cfgj=(2) idx=(100,_)
+----
+      idx
+x>    100    (id=1)
+?       0    (id=2)
+0
+
+# 2 reports 100.
+committed cfg=(1,2) cfgj=(2) idx=(_,100)
+----
+      idx
+?       0    (id=1)
+x>    100    (id=2)
+0
+
+committed cfg=(1,2) cfgj=(2) idx=(50,100)
+----
+      idx
+>      50    (id=1)
+x>    100    (id=2)
+50
+
+committed cfg=(1,2) cfgj=(2) idx=(100,50)
+----
+      idx
+x>    100    (id=1)
+>      50    (id=2)
+50
+
+
+
+# Joint non-overlapping two-node configs.
+
+committed cfg=(1,2) cfgj=(3,4) idx=(50,_,_,_)
+----
+        idx
+xxx>     50    (id=1)
+?         0    (id=2)
+?         0    (id=3)
+?         0    (id=4)
+0
+
+committed cfg=(1,2) cfgj=(3,4) idx=(50,_,49,_)
+----
+        idx
+xxx>     50    (id=1)
+?         0    (id=2)
+xx>      49    (id=3)
+?         0    (id=4)
+0
+
+committed cfg=(1,2) cfgj=(3,4) idx=(50,48,49,_)
+----
+        idx
+xxx>     50    (id=1)
+x>       48    (id=2)
+xx>      49    (id=3)
+?         0    (id=4)
+0
+
+committed cfg=(1,2) cfgj=(3,4) idx=(50,48,49,47)
+----
+        idx
+xxx>     50    (id=1)
+x>       48    (id=2)
+xx>      49    (id=3)
+>        47    (id=4)
+47
+
+# Joint overlapping two-node configs.
+committed cfg=(1,2) cfgj=(2,3) idx=(_,_,_)
+----
+       idx
+?        0    (id=1)
+?        0    (id=2)
+?        0    (id=3)
+0
+
+committed cfg=(1,2) cfgj=(2,3) idx=(100,_,_)
+----
+       idx
+xx>    100    (id=1)
+?        0    (id=2)
+?        0    (id=3)
+0
+
+committed cfg=(1,2) cfgj=(2,3) idx=(_,100,_)
+----
+       idx
+?        0    (id=1)
+xx>    100    (id=2)
+?        0    (id=3)
+0
+
+committed cfg=(1,2) cfgj=(2,3) idx=(_,100,99)
+----
+       idx
+?        0    (id=1)
+xx>    100    (id=2)
+x>      99    (id=3)
+0
+
+committed cfg=(1,2) cfgj=(2,3) idx=(101,100,99)
+----
+       idx
+xx>    101    (id=1)
+x>     100    (id=2)
+>       99    (id=3)
+99
+
+# Joint identical two-node configs.
+committed cfg=(1,2) cfgj=(1,2) idx=(_,_)
+----
+      idx
+?       0    (id=1)
+?       0    (id=2)
+0
+
+committed cfg=(1,2) cfgj=(1,2) idx=(_,40)
+----
+      idx
+?       0    (id=1)
+x>     40    (id=2)
+0
+
+committed cfg=(1,2) cfgj=(1,2) idx=(41,40)
+----
+      idx
+x>     41    (id=1)
+>      40    (id=2)
+40
+
+
+
+# Joint disjoint three-node configs.
+
+committed cfg=(1,2,3) cfgj=(4,5,6) idx=(_,_,_,_,_,_)
+----
+          idx
+?           0    (id=1)
+?           0    (id=2)
+?           0    (id=3)
+?           0    (id=4)
+?           0    (id=5)
+?           0    (id=6)
+0
+
+committed cfg=(1,2,3) cfgj=(4,5,6) idx=(100,_,_,_,_,_)
+----
+          idx
+xxxxx>    100    (id=1)
+?           0    (id=2)
+?           0    (id=3)
+?           0    (id=4)
+?           0    (id=5)
+?           0    (id=6)
+0
+
+committed cfg=(1,2,3) cfgj=(4,5,6) idx=(100,_,_,90,_,_)
+----
+          idx
+xxxxx>    100    (id=1)
+?           0    (id=2)
+?           0    (id=3)
+xxxx>      90    (id=4)
+?           0    (id=5)
+?           0    (id=6)
+0
+
+committed cfg=(1,2,3) cfgj=(4,5,6) idx=(100,99,_,_,_,_)
+----
+          idx
+xxxxx>    100    (id=1)
+xxxx>      99    (id=2)
+?           0    (id=3)
+?           0    (id=4)
+?           0    (id=5)
+?           0    (id=6)
+0
+
+# First quorum <= 99, second one <= 97. Both quorums guarantee that 90 is
+# committed.
+committed cfg=(1,2,3) cfgj=(4,5,6) idx=(_,99,90,97,95,_)
+----
+          idx
+?           0    (id=1)
+xxxxx>     99    (id=2)
+xx>        90    (id=3)
+xxxx>      97    (id=4)
+xxx>       95    (id=5)
+?           0    (id=6)
+90
+
+# First quorum collapsed to 92. Second one already had at least 95 committed,
+# so the result also collapses.
+committed cfg=(1,2,3) cfgj=(4,5,6) idx=(92,99,90,97,95,_)
+----
+          idx
+xx>        92    (id=1)
+xxxxx>     99    (id=2)
+x>         90    (id=3)
+xxxx>      97    (id=4)
+xxx>       95    (id=5)
+?           0    (id=6)
+92
+
+# Second quorum collapses, but nothing changes in the output.
+committed cfg=(1,2,3) cfgj=(4,5,6) idx=(92,99,90,97,95,77)
+----
+          idx
+xx>        92    (id=1)
+xxxxx>     99    (id=2)
+x>         90    (id=3)
+xxxx>      97    (id=4)
+xxx>       95    (id=5)
+>          77    (id=6)
+92
+
+
+# Joint overlapping three-node configs.
+
+committed cfg=(1,2,3) cfgj=(1,4,5) idx=(_,_,_,_,_)
+----
+         idx
+?          0    (id=1)
+?          0    (id=2)
+?          0    (id=3)
+?          0    (id=4)
+?          0    (id=5)
+0
+
+committed cfg=(1,2,3) cfgj=(1,4,5) idx=(100,_,_,_,_)
+----
+         idx
+xxxx>    100    (id=1)
+?          0    (id=2)
+?          0    (id=3)
+?          0    (id=4)
+?          0    (id=5)
+0
+
+committed cfg=(1,2,3) cfgj=(1,4,5) idx=(100,101,_,_,_)
+----
+         idx
+xxx>     100    (id=1)
+xxxx>    101    (id=2)
+?          0    (id=3)
+?          0    (id=4)
+?          0    (id=5)
+0
+
+committed cfg=(1,2,3) cfgj=(1,4,5) idx=(100,101,100,_,_)
+----
+         idx
+xx>      100    (id=1)
+xxxx>    101    (id=2)
+>        100    (id=3)
+?          0    (id=4)
+?          0    (id=5)
+0
+
+# Second quorum could commit either 98 or 99, but first quorum is open.
+committed cfg=(1,2,3) cfgj=(1,4,5) idx=(_,100,_,99,98)
+----
+         idx
+?          0    (id=1)
+xxxx>    100    (id=2)
+?          0    (id=3)
+xxx>      99    (id=4)
+xx>       98    (id=5)
+0
+
+# Additionally, first quorum can commit either 100 or 99
+committed cfg=(1,2,3) cfgj=(1,4,5) idx=(_,100,99,99,98)
+----
+         idx
+?          0    (id=1)
+xxxx>    100    (id=2)
+xx>       99    (id=3)
+>         99    (id=4)
+x>        98    (id=5)
+98
+
+committed cfg=(1,2,3) cfgj=(1,4,5) idx=(1,100,99,99,98)
+----
+         idx
+>          1    (id=1)
+xxxx>    100    (id=2)
+xx>       99    (id=3)
+>         99    (id=4)
+x>        98    (id=5)
+98
+
+committed cfg=(1,2,3) cfgj=(1,4,5) idx=(100,100,99,99,98)
+----
+         idx
+xxx>     100    (id=1)
+>        100    (id=2)
+x>        99    (id=3)
+>         99    (id=4)
+>         98    (id=5)
+99
+
+
+# More overlap.
+
+committed cfg=(1,2,3) cfgj=(2,3,4) idx=(_,_,_,_)
+----
+        idx
+?         0    (id=1)
+?         0    (id=2)
+?         0    (id=3)
+?         0    (id=4)
+0
+
+committed cfg=(1,2,3) cfgj=(2,3,4) idx=(_,100,99,_)
+----
+        idx
+?         0    (id=1)
+xxx>    100    (id=2)
+xx>      99    (id=3)
+?         0    (id=4)
+99
+
+committed cfg=(1,2,3) cfgj=(2,3,4) idx=(98,100,99,_)
+----
+        idx
+x>       98    (id=1)
+xxx>    100    (id=2)
+xx>      99    (id=3)
+?         0    (id=4)
+99
+
+committed cfg=(1,2,3) cfgj=(2,3,4) idx=(100,100,99,_)
+----
+        idx
+xx>     100    (id=1)
+>       100    (id=2)
+x>       99    (id=3)
+?         0    (id=4)
+99
+
+committed cfg=(1,2,3) cfgj=(2,3,4) idx=(100,100,99,98)
+----
+        idx
+xx>     100    (id=1)
+>       100    (id=2)
+x>       99    (id=3)
+>        98    (id=4)
+99
+
+committed cfg=(1,2,3) cfgj=(2,3,4) idx=(100,_,_,101)
+----
+        idx
+xx>     100    (id=1)
+?         0    (id=2)
+?         0    (id=3)
+xxx>    101    (id=4)
+0
+
+committed cfg=(1,2,3) cfgj=(2,3,4) idx=(100,99,_,101)
+----
+        idx
+xx>     100    (id=1)
+x>       99    (id=2)
+?         0    (id=3)
+xxx>    101    (id=4)
+99
+
+# Identical. This is also exercised in the test harness, so it's listed here
+# only briefly.
+committed cfg=(1,2,3) cfgj=(1,2,3) idx=(50,45,_)
+----
+       idx
+xx>     50    (id=1)
+x>      45    (id=2)
+?        0    (id=3)
+45

+ 165 - 0
raft/quorum/testdata/joint_vote.txt

@@ -0,0 +1,165 @@
+# Empty joint config wins all votes. This isn't used in production. Note that
+# by specifying cfgj explicitly we tell the test harness to treat the input as
+# a joint quorum and not a majority quorum.
+vote cfgj=zero
+----
+VoteWon
+
+# More examples with close to trivial configs.
+
+vote cfg=(1) cfgj=zero votes=(_)
+----
+VotePending
+
+vote cfg=(1) cfgj=zero votes=(y)
+----
+VoteWon
+
+vote cfg=(1) cfgj=zero votes=(n)
+----
+VoteLost
+
+vote cfg=(1) cfgj=(1) votes=(_)
+----
+VotePending
+
+vote cfg=(1) cfgj=(1) votes=(y)
+----
+VoteWon
+
+vote cfg=(1) cfgj=(1) votes=(n)
+----
+VoteLost
+
+vote cfg=(1) cfgj=(2) votes=(_,_)
+----
+VotePending
+
+vote cfg=(1) cfgj=(2) votes=(y,_)
+----
+VotePending
+
+vote cfg=(1) cfgj=(2) votes=(y,y)
+----
+VoteWon
+
+vote cfg=(1) cfgj=(2) votes=(y,n)
+----
+VoteLost
+
+vote cfg=(1) cfgj=(2) votes=(n,_)
+----
+VoteLost
+
+vote cfg=(1) cfgj=(2) votes=(n,n)
+----
+VoteLost
+
+vote cfg=(1) cfgj=(2) votes=(n,y)
+----
+VoteLost
+
+# Two node configs.
+
+vote cfg=(1,2) cfgj=(3,4) votes=(_,_,_,_)
+----
+VotePending
+
+vote cfg=(1,2) cfgj=(3,4) votes=(y,_,_,_)
+----
+VotePending
+
+vote cfg=(1,2) cfgj=(3,4) votes=(y,y,_,_)
+----
+VotePending
+
+vote cfg=(1,2) cfgj=(3,4) votes=(y,y,n,_)
+----
+VoteLost
+
+vote cfg=(1,2) cfgj=(3,4) votes=(y,y,n,n)
+----
+VoteLost
+
+vote cfg=(1,2) cfgj=(3,4) votes=(y,y,y,n)
+----
+VoteLost
+
+vote cfg=(1,2) cfgj=(3,4) votes=(y,y,y,y)
+----
+VoteWon
+
+vote cfg=(1,2) cfgj=(2,3) votes=(_,_,_)
+----
+VotePending
+
+vote cfg=(1,2) cfgj=(2,3) votes=(_,n,_)
+----
+VoteLost
+
+vote cfg=(1,2) cfgj=(2,3) votes=(y,y,_)
+----
+VotePending
+
+vote cfg=(1,2) cfgj=(2,3) votes=(y,y,n)
+----
+VoteLost
+
+vote cfg=(1,2) cfgj=(2,3) votes=(y,y,y)
+----
+VoteWon
+
+vote cfg=(1,2) cfgj=(1,2) votes=(_,_)
+----
+VotePending
+
+vote cfg=(1,2) cfgj=(1,2) votes=(y,_)
+----
+VotePending
+
+vote cfg=(1,2) cfgj=(1,2) votes=(y,n)
+----
+VoteLost
+
+vote cfg=(1,2) cfgj=(1,2) votes=(n,_)
+----
+VoteLost
+
+vote cfg=(1,2) cfgj=(1,2) votes=(n,n)
+----
+VoteLost
+
+
+# Simple example for overlapping three node configs.
+
+vote cfg=(1,2,3) cfgj=(2,3,4) votes=(_,_,_,_)
+----
+VotePending
+
+vote cfg=(1,2,3) cfgj=(2,3,4) votes=(_,n,_,_)
+----
+VotePending
+
+vote cfg=(1,2,3) cfgj=(2,3,4) votes=(_,n,n,_)
+----
+VoteLost
+
+vote cfg=(1,2,3) cfgj=(2,3,4) votes=(_,y,y,_)
+----
+VoteWon
+
+vote cfg=(1,2,3) cfgj=(2,3,4) votes=(y,y,_,_)
+----
+VotePending
+
+vote cfg=(1,2,3) cfgj=(2,3,4) votes=(y,y,n,_)
+----
+VotePending
+
+vote cfg=(1,2,3) cfgj=(2,3,4) votes=(y,y,n,n)
+----
+VoteLost
+
+vote cfg=(1,2,3) cfgj=(2,3,4) votes=(y,y,n,y)
+----
+VoteWon

+ 153 - 0
raft/quorum/testdata/majority_commit.txt

@@ -0,0 +1,153 @@
+# The empty quorum commits "everything". This is useful for its use in joint
+# quorums.
+committed
+----
+<empty majority quorum>∞
+
+
+
+# A single voter quorum is not final when no index is known.
+committed cfg=(1) idx=(_)
+----
+     idx
+?      0    (id=1)
+0
+
+# When an index is known, that's the committed index, and that's final.
+committed cfg=(1) idx=(12)
+----
+     idx
+>     12    (id=1)
+12
+
+
+
+
+# With two nodes, start out similarly.
+committed cfg=(1, 2) idx=(_,_)
+----
+      idx
+?       0    (id=1)
+?       0    (id=2)
+0
+
+# The first committed index becomes known (for n1). Nothing changes in the
+# output because idx=12 is not known to be on a quorum (which is both nodes).
+committed cfg=(1, 2) idx=(12,_)
+----
+      idx
+x>     12    (id=1)
+?       0    (id=2)
+0
+
+# The second index comes in and finalize the decision. The result will be the
+# smaller of the two indexes.
+committed cfg=(1,2) idx=(12,5)
+----
+      idx
+x>     12    (id=1)
+>       5    (id=2)
+5
+
+
+
+
+# No surprises for three nodes.
+committed cfg=(1,2,3) idx=(_,_,_)
+----
+       idx
+?        0    (id=1)
+?        0    (id=2)
+?        0    (id=3)
+0
+
+committed cfg=(1,2,3) idx=(12,_,_)
+----
+       idx
+xx>     12    (id=1)
+?        0    (id=2)
+?        0    (id=3)
+0
+
+# We see a committed index, but a higher committed index for the last pending
+# votes could change (increment) the outcome, so not final yet.
+committed cfg=(1,2,3) idx=(12,5,_)
+----
+       idx
+xx>     12    (id=1)
+x>       5    (id=2)
+?        0    (id=3)
+5
+
+# a) the case in which it does:
+committed cfg=(1,2,3) idx=(12,5,6)
+----
+       idx
+xx>     12    (id=1)
+>        5    (id=2)
+x>       6    (id=3)
+6
+
+# b) the case in which it does not:
+committed cfg=(1,2,3) idx=(12,5,4)
+----
+       idx
+xx>     12    (id=1)
+x>       5    (id=2)
+>        4    (id=3)
+5
+
+# c) a different case in which the last index is pending but it has no chance of
+# swaying the outcome (because nobody in the current quorum agrees on anything
+# higher than the candidate):
+committed cfg=(1,2,3) idx=(5,5,_)
+----
+       idx
+x>       5    (id=1)
+>        5    (id=2)
+?        0    (id=3)
+5
+
+# c) continued: Doesn't matter what shows up last. The result is final.
+committed cfg=(1,2,3) idx=(5,5,12)
+----
+       idx
+>        5    (id=1)
+>        5    (id=2)
+xx>     12    (id=3)
+5
+
+# With all committed idx known, the result is final.
+committed cfg=(1, 2, 3) idx=(100, 101, 103)
+----
+       idx
+>      100    (id=1)
+x>     101    (id=2)
+xx>    103    (id=3)
+101
+
+
+
+# Some more complicated examples. Similar to case c) above. The result is
+# already final because no index higher than 103 is one short of quorum.
+committed cfg=(1, 2, 3, 4, 5) idx=(101, 104, 103, 103,_)
+----
+         idx
+x>       101    (id=1)
+xxxx>    104    (id=2)
+xx>      103    (id=3)
+>        103    (id=4)
+?          0    (id=5)
+103
+
+# A similar case which is not final because another vote for >= 103 would change
+# the outcome.
+committed cfg=(1, 2, 3, 4, 5) idx=(101, 102, 103, 103,_)
+----
+         idx
+x>       101    (id=1)
+xx>      102    (id=2)
+xxx>     103    (id=3)
+>        103    (id=4)
+?          0    (id=5)
+102

+ 97 - 0
raft/quorum/testdata/majority_vote.txt

@@ -0,0 +1,97 @@
+# The empty config always announces a won vote.
+vote
+----
+VoteWon
+
+vote cfg=(1) votes=(_)
+----
+VotePending
+
+vote cfg=(1) votes=(n)
+----
+VoteLost
+
+vote cfg=(123) votes=(y)
+----
+VoteWon
+
+
+
+
+vote cfg=(4,8) votes=(_,_)
+----
+VotePending
+
+# With two voters, a single rejection loses the vote.
+vote cfg=(4,8) votes=(n,_)
+----
+VoteLost
+
+vote cfg=(4,8) votes=(y,_)
+----
+VotePending
+
+vote cfg=(4,8) votes=(n,y)
+----
+VoteLost
+
+vote cfg=(4,8) votes=(y,y)
+----
+VoteWon
+
+
+
+vote cfg=(2,4,7) votes=(_,_,_)
+----
+VotePending
+
+vote cfg=(2,4,7) votes=(n,_,_)
+----
+VotePending
+
+vote cfg=(2,4,7) votes=(y,_,_)
+----
+VotePending
+
+vote cfg=(2,4,7) votes=(n,n,_)
+----
+VoteLost
+
+vote cfg=(2,4,7) votes=(y,n,_)
+----
+VotePending
+
+vote cfg=(2,4,7) votes=(y,y,_)
+----
+VoteWon
+
+vote cfg=(2,4,7) votes=(y,y,n)
+----
+VoteWon
+
+vote cfg=(2,4,7) votes=(n,y,n)
+----
+VoteLost
+
+
+
+# Test some random example with seven nodes (why not).
+vote cfg=(1,2,3,4,5,6,7) votes=(y,y,n,y,_,_,_)
+----
+VotePending
+
+vote cfg=(1,2,3,4,5,6,7) votes=(_,y,y,_,n,y,n)
+----
+VotePending
+
+vote cfg=(1,2,3,4,5,6,7) votes=(y,y,n,y,_,n,y)
+----
+VoteWon
+
+vote cfg=(1,2,3,4,5,6,7) votes=(y,y,_,n,y,n,n)
+----
+VotePending
+
+vote cfg=(1,2,3,4,5,6,7) votes=(y,y,n,y,n,n,n)
+----
+VoteLost

+ 26 - 0
raft/quorum/voteresult_string.go

@@ -0,0 +1,26 @@
+// Code generated by "stringer -type=VoteResult"; DO NOT EDIT.
+
+package quorum
+
+import "strconv"
+
+func _() {
+	// An "invalid array index" compiler error signifies that the constant values have changed.
+	// Re-run the stringer command to generate them again.
+	var x [1]struct{}
+	_ = x[VotePending-1]
+	_ = x[VoteLost-2]
+	_ = x[VoteWon-3]
+}
+
+const _VoteResult_name = "VotePendingVoteLostVoteWon"
+
+var _VoteResult_index = [...]uint8{0, 11, 19, 26}
+
+func (i VoteResult) String() string {
+	i -= 1
+	if i >= VoteResult(len(_VoteResult_index)-1) {
+		return "VoteResult(" + strconv.FormatInt(int64(i+1), 10) + ")"
+	}
+	return _VoteResult_name[_VoteResult_index[i]:_VoteResult_index[i+1]]
+}

+ 13 - 18
raft/raft.go

@@ -24,6 +24,7 @@ import (
 	"sync"
 	"sync"
 	"time"
 	"time"
 
 
+	"go.etcd.io/etcd/raft/quorum"
 	pb "go.etcd.io/etcd/raft/raftpb"
 	pb "go.etcd.io/etcd/raft/raftpb"
 )
 )
 
 
@@ -343,7 +344,7 @@ func newRaft(c *Config) *raft {
 		raftLog:                   raftlog,
 		raftLog:                   raftlog,
 		maxMsgSize:                c.MaxSizePerMsg,
 		maxMsgSize:                c.MaxSizePerMsg,
 		maxUncommittedSize:        c.MaxUncommittedEntriesSize,
 		maxUncommittedSize:        c.MaxUncommittedEntriesSize,
-		prs:                       makePRS(c.MaxInflightMsgs),
+		prs:                       makeProgressTracker(c.MaxInflightMsgs),
 		electionTimeout:           c.ElectionTick,
 		electionTimeout:           c.ElectionTick,
 		heartbeatTimeout:          c.HeartbeatTick,
 		heartbeatTimeout:          c.HeartbeatTick,
 		logger:                    c.Logger,
 		logger:                    c.Logger,
@@ -744,7 +745,7 @@ func (r *raft) campaign(t CampaignType) {
 		voteMsg = pb.MsgVote
 		voteMsg = pb.MsgVote
 		term = r.Term
 		term = r.Term
 	}
 	}
-	if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == electionWon {
+	if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
 		// We won the election after voting for ourselves (which must mean that
 		// We won the election after voting for ourselves (which must mean that
 		// this is a single-node cluster). Advance to the next state.
 		// this is a single-node cluster). Advance to the next state.
 		if t == campaignPreElection {
 		if t == campaignPreElection {
@@ -754,7 +755,7 @@ func (r *raft) campaign(t CampaignType) {
 		}
 		}
 		return
 		return
 	}
 	}
-	for id := range r.prs.nodes {
+	for id := range r.prs.voters.IDs() {
 		if id == r.id {
 		if id == r.id {
 			continue
 			continue
 		}
 		}
@@ -769,15 +770,7 @@ func (r *raft) campaign(t CampaignType) {
 	}
 	}
 }
 }
 
 
-type electionResult byte
-
-const (
-	electionIndeterminate electionResult = iota
-	electionLost
-	electionWon
-)
-
-func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result electionResult) {
+func (r *raft) poll(id uint64, t pb.MessageType, v bool) (granted int, rejected int, result quorum.VoteResult) {
 	if v {
 	if v {
 		r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
 		r.logger.Infof("%x received %s from %x at term %d", r.id, t, id, r.Term)
 	} else {
 	} else {
@@ -999,7 +992,9 @@ func stepLeader(r *raft, m pb.Message) error {
 		r.bcastAppend()
 		r.bcastAppend()
 		return nil
 		return nil
 	case pb.MsgReadIndex:
 	case pb.MsgReadIndex:
-		if !r.prs.isSingleton() { // more than one voting member in cluster
+		// If more than the local vote is needed, go through a full broadcast,
+		// otherwise optimize.
+		if !r.prs.isSingleton() {
 			if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
 			if r.raftLog.zeroTermOnErrCompacted(r.raftLog.term(r.raftLog.committed)) != r.Term {
 				// Reject read only request when this leader has not committed any log entry at its term.
 				// Reject read only request when this leader has not committed any log entry at its term.
 				return nil
 				return nil
@@ -1110,7 +1105,7 @@ func stepLeader(r *raft, m pb.Message) error {
 			return nil
 			return nil
 		}
 		}
 
 
-		if !r.prs.hasQuorum(r.readOnly.recvAck(m.From, m.Context)) {
+		if r.prs.voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
 			return nil
 			return nil
 		}
 		}
 
 
@@ -1210,14 +1205,14 @@ func stepCandidate(r *raft, m pb.Message) error {
 		gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
 		gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
 		r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
 		r.logger.Infof("%x has received %d %s votes and %d vote rejections", r.id, gr, m.Type, rj)
 		switch res {
 		switch res {
-		case electionWon:
+		case quorum.VoteWon:
 			if r.state == StatePreCandidate {
 			if r.state == StatePreCandidate {
 				r.campaign(campaignElection)
 				r.campaign(campaignElection)
 			} else {
 			} else {
 				r.becomeLeader()
 				r.becomeLeader()
 				r.bcastAppend()
 				r.bcastAppend()
 			}
 			}
-		case electionLost:
+		case quorum.VoteLost:
 			// pb.MsgPreVoteResp contains future term of pre-candidate
 			// pb.MsgPreVoteResp contains future term of pre-candidate
 			// m.Term > r.Term; reuse r.Term
 			// m.Term > r.Term; reuse r.Term
 			r.becomeFollower(r.Term, None)
 			r.becomeFollower(r.Term, None)
@@ -1346,7 +1341,7 @@ func (r *raft) restore(s pb.Snapshot) bool {
 		r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
 		r.id, r.raftLog.committed, r.raftLog.lastIndex(), r.raftLog.lastTerm(), s.Metadata.Index, s.Metadata.Term)
 
 
 	r.raftLog.restore(s)
 	r.raftLog.restore(s)
-	r.prs = makePRS(r.prs.maxInflight)
+	r.prs = makeProgressTracker(r.prs.maxInflight)
 	r.restoreNode(s.Metadata.ConfState.Nodes, false)
 	r.restoreNode(s.Metadata.ConfState.Nodes, false)
 	r.restoreNode(s.Metadata.ConfState.Learners, true)
 	r.restoreNode(s.Metadata.ConfState.Learners, true)
 	return true
 	return true
@@ -1417,7 +1412,7 @@ func (r *raft) removeNode(id uint64) {
 	r.prs.removeAny(id)
 	r.prs.removeAny(id)
 
 
 	// Do not try to commit or abort transferring if the cluster is now empty.
 	// Do not try to commit or abort transferring if the cluster is now empty.
-	if len(r.prs.nodes) == 0 && len(r.prs.learners) == 0 {
+	if len(r.prs.voters[0]) == 0 && len(r.prs.learners) == 0 {
 		return
 		return
 	}
 	}
 
 

+ 3 - 3
raft/raft_flow_control_test.go

@@ -29,7 +29,7 @@ func TestMsgAppFlowControlFull(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 
 
-	pr2 := r.prs.nodes[2]
+	pr2 := r.prs.prs[2]
 	// force the progress to be in replicate state
 	// force the progress to be in replicate state
 	pr2.becomeReplicate()
 	pr2.becomeReplicate()
 	// fill in the inflights window
 	// fill in the inflights window
@@ -65,7 +65,7 @@ func TestMsgAppFlowControlMoveForward(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 
 
-	pr2 := r.prs.nodes[2]
+	pr2 := r.prs.prs[2]
 	// force the progress to be in replicate state
 	// force the progress to be in replicate state
 	pr2.becomeReplicate()
 	pr2.becomeReplicate()
 	// fill in the inflights window
 	// fill in the inflights window
@@ -110,7 +110,7 @@ func TestMsgAppFlowControlRecvHeartbeat(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 
 
-	pr2 := r.prs.nodes[2]
+	pr2 := r.prs.prs[2]
 	// force the progress to be in replicate state
 	// force the progress to be in replicate state
 	pr2.becomeReplicate()
 	pr2.becomeReplicate()
 	// fill in the inflights window
 	// fill in the inflights window

+ 29 - 29
raft/raft_snap_test.go

@@ -40,11 +40,11 @@ func TestSendingSnapshotSetPendingSnapshot(t *testing.T) {
 
 
 	// force set the next of node 2, so that
 	// force set the next of node 2, so that
 	// node 2 needs a snapshot
 	// node 2 needs a snapshot
-	sm.prs.nodes[2].Next = sm.raftLog.firstIndex()
+	sm.prs.prs[2].Next = sm.raftLog.firstIndex()
 
 
-	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true})
-	if sm.prs.nodes[2].PendingSnapshot != 11 {
-		t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.nodes[2].PendingSnapshot)
+	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true})
+	if sm.prs.prs[2].PendingSnapshot != 11 {
+		t.Fatalf("PendingSnapshot = %d, want 11", sm.prs.prs[2].PendingSnapshot)
 	}
 	}
 }
 }
 
 
@@ -56,7 +56,7 @@ func TestPendingSnapshotPauseReplication(t *testing.T) {
 	sm.becomeCandidate()
 	sm.becomeCandidate()
 	sm.becomeLeader()
 	sm.becomeLeader()
 
 
-	sm.prs.nodes[2].becomeSnapshot(11)
+	sm.prs.prs[2].becomeSnapshot(11)
 
 
 	sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 	sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 	msgs := sm.readMessages()
 	msgs := sm.readMessages()
@@ -73,18 +73,18 @@ func TestSnapshotFailure(t *testing.T) {
 	sm.becomeCandidate()
 	sm.becomeCandidate()
 	sm.becomeLeader()
 	sm.becomeLeader()
 
 
-	sm.prs.nodes[2].Next = 1
-	sm.prs.nodes[2].becomeSnapshot(11)
+	sm.prs.prs[2].Next = 1
+	sm.prs.prs[2].becomeSnapshot(11)
 
 
 	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
 	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: true})
-	if sm.prs.nodes[2].PendingSnapshot != 0 {
-		t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot)
+	if sm.prs.prs[2].PendingSnapshot != 0 {
+		t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot)
 	}
 	}
-	if sm.prs.nodes[2].Next != 1 {
-		t.Fatalf("Next = %d, want 1", sm.prs.nodes[2].Next)
+	if sm.prs.prs[2].Next != 1 {
+		t.Fatalf("Next = %d, want 1", sm.prs.prs[2].Next)
 	}
 	}
-	if !sm.prs.nodes[2].Paused {
-		t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused)
+	if !sm.prs.prs[2].Paused {
+		t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused)
 	}
 	}
 }
 }
 
 
@@ -96,18 +96,18 @@ func TestSnapshotSucceed(t *testing.T) {
 	sm.becomeCandidate()
 	sm.becomeCandidate()
 	sm.becomeLeader()
 	sm.becomeLeader()
 
 
-	sm.prs.nodes[2].Next = 1
-	sm.prs.nodes[2].becomeSnapshot(11)
+	sm.prs.prs[2].Next = 1
+	sm.prs.prs[2].becomeSnapshot(11)
 
 
 	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
 	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgSnapStatus, Reject: false})
-	if sm.prs.nodes[2].PendingSnapshot != 0 {
-		t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot)
+	if sm.prs.prs[2].PendingSnapshot != 0 {
+		t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot)
 	}
 	}
-	if sm.prs.nodes[2].Next != 12 {
-		t.Fatalf("Next = %d, want 12", sm.prs.nodes[2].Next)
+	if sm.prs.prs[2].Next != 12 {
+		t.Fatalf("Next = %d, want 12", sm.prs.prs[2].Next)
 	}
 	}
-	if !sm.prs.nodes[2].Paused {
-		t.Errorf("Paused = %v, want true", sm.prs.nodes[2].Paused)
+	if !sm.prs.prs[2].Paused {
+		t.Errorf("Paused = %v, want true", sm.prs.prs[2].Paused)
 	}
 	}
 }
 }
 
 
@@ -206,7 +206,7 @@ func TestSnapshotSucceedViaAppResp(t *testing.T) {
 	mustSend(n2, n1, pb.MsgAppResp)
 	mustSend(n2, n1, pb.MsgAppResp)
 
 
 	// Leader has correct state for follower.
 	// Leader has correct state for follower.
-	pr := n1.prs.nodes[2]
+	pr := n1.prs.prs[2]
 	if pr.State != ProgressStateReplicate {
 	if pr.State != ProgressStateReplicate {
 		t.Fatalf("unexpected state %v", pr)
 		t.Fatalf("unexpected state %v", pr)
 	}
 	}
@@ -227,23 +227,23 @@ func TestSnapshotAbort(t *testing.T) {
 	sm.becomeCandidate()
 	sm.becomeCandidate()
 	sm.becomeLeader()
 	sm.becomeLeader()
 
 
-	sm.prs.nodes[2].Next = 1
-	sm.prs.nodes[2].becomeSnapshot(11)
+	sm.prs.prs[2].Next = 1
+	sm.prs.prs[2].becomeSnapshot(11)
 
 
 	// A successful msgAppResp that has a higher/equal index than the
 	// A successful msgAppResp that has a higher/equal index than the
 	// pending snapshot should abort the pending snapshot.
 	// pending snapshot should abort the pending snapshot.
 	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11})
 	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: 11})
-	if sm.prs.nodes[2].PendingSnapshot != 0 {
-		t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.nodes[2].PendingSnapshot)
+	if sm.prs.prs[2].PendingSnapshot != 0 {
+		t.Fatalf("PendingSnapshot = %d, want 0", sm.prs.prs[2].PendingSnapshot)
 	}
 	}
 	// The follower entered ProgressStateReplicate and the leader send an append
 	// The follower entered ProgressStateReplicate and the leader send an append
 	// and optimistically updated the progress (so we see 13 instead of 12).
 	// and optimistically updated the progress (so we see 13 instead of 12).
 	// There is something to append because the leader appended an empty entry
 	// There is something to append because the leader appended an empty entry
 	// to the log at index 12 when it assumed leadership.
 	// to the log at index 12 when it assumed leadership.
-	if sm.prs.nodes[2].Next != 13 {
-		t.Fatalf("Next = %d, want 13", sm.prs.nodes[2].Next)
+	if sm.prs.prs[2].Next != 13 {
+		t.Fatalf("Next = %d, want 13", sm.prs.prs[2].Next)
 	}
 	}
-	if n := sm.prs.nodes[2].ins.count; n != 1 {
+	if n := sm.prs.prs[2].ins.count; n != 1 {
 		t.Fatalf("expected an inflight message, got %d", n)
 		t.Fatalf("expected an inflight message, got %d", n)
 	}
 	}
 }
 }

+ 61 - 56
raft/raft_test.go

@@ -271,12 +271,12 @@ func TestProgressLeader(t *testing.T) {
 	r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
 	r := newTestRaft(1, []uint64{1, 2}, 5, 1, NewMemoryStorage())
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
-	r.prs.nodes[2].becomeReplicate()
+	r.prs.prs[2].becomeReplicate()
 
 
 	// Send proposals to r1. The first 5 entries should be appended to the log.
 	// Send proposals to r1. The first 5 entries should be appended to the log.
 	propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}}
 	propMsg := pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("foo")}}}
 	for i := 0; i < 5; i++ {
 	for i := 0; i < 5; i++ {
-		if pr := r.prs.nodes[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 {
+		if pr := r.prs.prs[r.id]; pr.State != ProgressStateReplicate || pr.Match != uint64(i+1) || pr.Next != pr.Match+1 {
 			t.Errorf("unexpected progress %v", pr)
 			t.Errorf("unexpected progress %v", pr)
 		}
 		}
 		if err := r.Step(propMsg); err != nil {
 		if err := r.Step(propMsg); err != nil {
@@ -291,17 +291,17 @@ func TestProgressResumeByHeartbeatResp(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 
 
-	r.prs.nodes[2].Paused = true
+	r.prs.prs[2].Paused = true
 
 
 	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
 	r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
-	if !r.prs.nodes[2].Paused {
-		t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused)
+	if !r.prs.prs[2].Paused {
+		t.Errorf("paused = %v, want true", r.prs.prs[2].Paused)
 	}
 	}
 
 
-	r.prs.nodes[2].becomeReplicate()
+	r.prs.prs[2].becomeReplicate()
 	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
 	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgHeartbeatResp})
-	if r.prs.nodes[2].Paused {
-		t.Errorf("paused = %v, want false", r.prs.nodes[2].Paused)
+	if r.prs.prs[2].Paused {
+		t.Errorf("paused = %v, want false", r.prs.prs[2].Paused)
 	}
 	}
 }
 }
 
 
@@ -331,7 +331,7 @@ func TestProgressFlowControl(t *testing.T) {
 	r.readMessages()
 	r.readMessages()
 
 
 	// While node 2 is in probe state, propose a bunch of entries.
 	// While node 2 is in probe state, propose a bunch of entries.
-	r.prs.nodes[2].becomeProbe()
+	r.prs.prs[2].becomeProbe()
 	blob := []byte(strings.Repeat("a", 1000))
 	blob := []byte(strings.Repeat("a", 1000))
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
 		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
 		r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: blob}}})
@@ -409,8 +409,8 @@ func TestUncommittedEntryLimit(t *testing.T) {
 
 
 	// Set the two followers to the replicate state. Commit to tail of log.
 	// Set the two followers to the replicate state. Commit to tail of log.
 	const numFollowers = 2
 	const numFollowers = 2
-	r.prs.nodes[2].becomeReplicate()
-	r.prs.nodes[3].becomeReplicate()
+	r.prs.prs[2].becomeReplicate()
+	r.prs.prs[3].becomeReplicate()
 	r.uncommittedSize = 0
 	r.uncommittedSize = 0
 
 
 	// Send proposals to r1. The first 5 entries should be appended to the log.
 	// Send proposals to r1. The first 5 entries should be appended to the log.
@@ -2632,7 +2632,7 @@ func TestLeaderAppResp(t *testing.T) {
 		sm.readMessages()
 		sm.readMessages()
 		sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index})
 		sm.Step(pb.Message{From: 2, Type: pb.MsgAppResp, Index: tt.index, Term: sm.Term, Reject: tt.reject, RejectHint: tt.index})
 
 
-		p := sm.prs.nodes[2]
+		p := sm.prs.prs[2]
 		if p.Match != tt.wmatch {
 		if p.Match != tt.wmatch {
 			t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch)
 			t.Errorf("#%d match = %d, want %d", i, p.Match, tt.wmatch)
 		}
 		}
@@ -2679,9 +2679,9 @@ func TestBcastBeat(t *testing.T) {
 		mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1})
 		mustAppendEntry(sm, pb.Entry{Index: uint64(i) + 1})
 	}
 	}
 	// slow follower
 	// slow follower
-	sm.prs.nodes[2].Match, sm.prs.nodes[2].Next = 5, 6
+	sm.prs.prs[2].Match, sm.prs.prs[2].Next = 5, 6
 	// normal follower
 	// normal follower
-	sm.prs.nodes[3].Match, sm.prs.nodes[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
+	sm.prs.prs[3].Match, sm.prs.prs[3].Next = sm.raftLog.lastIndex(), sm.raftLog.lastIndex()+1
 
 
 	sm.Step(pb.Message{Type: pb.MsgBeat})
 	sm.Step(pb.Message{Type: pb.MsgBeat})
 	msgs := sm.readMessages()
 	msgs := sm.readMessages()
@@ -2689,8 +2689,8 @@ func TestBcastBeat(t *testing.T) {
 		t.Fatalf("len(msgs) = %v, want 2", len(msgs))
 		t.Fatalf("len(msgs) = %v, want 2", len(msgs))
 	}
 	}
 	wantCommitMap := map[uint64]uint64{
 	wantCommitMap := map[uint64]uint64{
-		2: min(sm.raftLog.committed, sm.prs.nodes[2].Match),
-		3: min(sm.raftLog.committed, sm.prs.nodes[3].Match),
+		2: min(sm.raftLog.committed, sm.prs.prs[2].Match),
+		3: min(sm.raftLog.committed, sm.prs.prs[3].Match),
 	}
 	}
 	for i, m := range msgs {
 	for i, m := range msgs {
 		if m.Type != pb.MsgHeartbeat {
 		if m.Type != pb.MsgHeartbeat {
@@ -2776,11 +2776,11 @@ func TestLeaderIncreaseNext(t *testing.T) {
 		sm.raftLog.append(previousEnts...)
 		sm.raftLog.append(previousEnts...)
 		sm.becomeCandidate()
 		sm.becomeCandidate()
 		sm.becomeLeader()
 		sm.becomeLeader()
-		sm.prs.nodes[2].State = tt.state
-		sm.prs.nodes[2].Next = tt.next
+		sm.prs.prs[2].State = tt.state
+		sm.prs.prs[2].Next = tt.next
 		sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 		sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 
 
-		p := sm.prs.nodes[2]
+		p := sm.prs.prs[2]
 		if p.Next != tt.wnext {
 		if p.Next != tt.wnext {
 			t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
 			t.Errorf("#%d next = %d, want %d", i, p.Next, tt.wnext)
 		}
 		}
@@ -2792,7 +2792,7 @@ func TestSendAppendForProgressProbe(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 	r.readMessages()
 	r.readMessages()
-	r.prs.nodes[2].becomeProbe()
+	r.prs.prs[2].becomeProbe()
 
 
 	// each round is a heartbeat
 	// each round is a heartbeat
 	for i := 0; i < 3; i++ {
 	for i := 0; i < 3; i++ {
@@ -2811,8 +2811,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
 			}
 			}
 		}
 		}
 
 
-		if !r.prs.nodes[2].Paused {
-			t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused)
+		if !r.prs.prs[2].Paused {
+			t.Errorf("paused = %v, want true", r.prs.prs[2].Paused)
 		}
 		}
 		for j := 0; j < 10; j++ {
 		for j := 0; j < 10; j++ {
 			mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
 			mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
@@ -2826,8 +2826,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
 		for j := 0; j < r.heartbeatTimeout; j++ {
 		for j := 0; j < r.heartbeatTimeout; j++ {
 			r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
 			r.Step(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
 		}
 		}
-		if !r.prs.nodes[2].Paused {
-			t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused)
+		if !r.prs.prs[2].Paused {
+			t.Errorf("paused = %v, want true", r.prs.prs[2].Paused)
 		}
 		}
 
 
 		// consume the heartbeat
 		// consume the heartbeat
@@ -2849,8 +2849,8 @@ func TestSendAppendForProgressProbe(t *testing.T) {
 	if msg[0].Index != 0 {
 	if msg[0].Index != 0 {
 		t.Errorf("index = %d, want %d", msg[0].Index, 0)
 		t.Errorf("index = %d, want %d", msg[0].Index, 0)
 	}
 	}
-	if !r.prs.nodes[2].Paused {
-		t.Errorf("paused = %v, want true", r.prs.nodes[2].Paused)
+	if !r.prs.prs[2].Paused {
+		t.Errorf("paused = %v, want true", r.prs.prs[2].Paused)
 	}
 	}
 }
 }
 
 
@@ -2859,7 +2859,7 @@ func TestSendAppendForProgressReplicate(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 	r.readMessages()
 	r.readMessages()
-	r.prs.nodes[2].becomeReplicate()
+	r.prs.prs[2].becomeReplicate()
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
 		mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
 		mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
@@ -2876,7 +2876,7 @@ func TestSendAppendForProgressSnapshot(t *testing.T) {
 	r.becomeCandidate()
 	r.becomeCandidate()
 	r.becomeLeader()
 	r.becomeLeader()
 	r.readMessages()
 	r.readMessages()
-	r.prs.nodes[2].becomeSnapshot(10)
+	r.prs.prs[2].becomeSnapshot(10)
 
 
 	for i := 0; i < 10; i++ {
 	for i := 0; i < 10; i++ {
 		mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
 		mustAppendEntry(r, pb.Entry{Data: []byte("somedata")})
@@ -2897,17 +2897,17 @@ func TestRecvMsgUnreachable(t *testing.T) {
 	r.becomeLeader()
 	r.becomeLeader()
 	r.readMessages()
 	r.readMessages()
 	// set node 2 to state replicate
 	// set node 2 to state replicate
-	r.prs.nodes[2].Match = 3
-	r.prs.nodes[2].becomeReplicate()
-	r.prs.nodes[2].optimisticUpdate(5)
+	r.prs.prs[2].Match = 3
+	r.prs.prs[2].becomeReplicate()
+	r.prs.prs[2].optimisticUpdate(5)
 
 
 	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})
 	r.Step(pb.Message{From: 2, To: 1, Type: pb.MsgUnreachable})
 
 
-	if r.prs.nodes[2].State != ProgressStateProbe {
-		t.Errorf("state = %s, want %s", r.prs.nodes[2].State, ProgressStateProbe)
+	if r.prs.prs[2].State != ProgressStateProbe {
+		t.Errorf("state = %s, want %s", r.prs.prs[2].State, ProgressStateProbe)
 	}
 	}
-	if wnext := r.prs.nodes[2].Match + 1; r.prs.nodes[2].Next != wnext {
-		t.Errorf("next = %d, want %d", r.prs.nodes[2].Next, wnext)
+	if wnext := r.prs.prs[2].Match + 1; r.prs.prs[2].Next != wnext {
+		t.Errorf("next = %d, want %d", r.prs.prs[2].Next, wnext)
 	}
 	}
 }
 }
 
 
@@ -2973,13 +2973,13 @@ func TestRestoreWithLearner(t *testing.T) {
 		t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners)
 		t.Errorf("sm.LearnerNodes = %+v, length not equal with %+v", sg, s.Metadata.ConfState.Learners)
 	}
 	}
 	for _, n := range s.Metadata.ConfState.Nodes {
 	for _, n := range s.Metadata.ConfState.Nodes {
-		if sm.prs.nodes[n].IsLearner {
-			t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.nodes[n], false)
+		if sm.prs.prs[n].IsLearner {
+			t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.prs[n], false)
 		}
 		}
 	}
 	}
 	for _, n := range s.Metadata.ConfState.Learners {
 	for _, n := range s.Metadata.ConfState.Learners {
-		if !sm.prs.learners[n].IsLearner {
-			t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.nodes[n], true)
+		if !sm.prs.prs[n].IsLearner {
+			t.Errorf("sm.Node %x isLearner = %s, want %t", n, sm.prs.prs[n], true)
 		}
 		}
 	}
 	}
 
 
@@ -3121,8 +3121,8 @@ func TestProvideSnap(t *testing.T) {
 	sm.becomeLeader()
 	sm.becomeLeader()
 
 
 	// force set the next of node 2, so that node 2 needs a snapshot
 	// force set the next of node 2, so that node 2 needs a snapshot
-	sm.prs.nodes[2].Next = sm.raftLog.firstIndex()
-	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.nodes[2].Next - 1, Reject: true})
+	sm.prs.prs[2].Next = sm.raftLog.firstIndex()
+	sm.Step(pb.Message{From: 2, To: 1, Type: pb.MsgAppResp, Index: sm.prs.prs[2].Next - 1, Reject: true})
 
 
 	msgs := sm.readMessages()
 	msgs := sm.readMessages()
 	if len(msgs) != 1 {
 	if len(msgs) != 1 {
@@ -3152,8 +3152,8 @@ func TestIgnoreProvidingSnap(t *testing.T) {
 
 
 	// force set the next of node 2, so that node 2 needs a snapshot
 	// force set the next of node 2, so that node 2 needs a snapshot
 	// change node 2 to be inactive, expect node 1 ignore sending snapshot to 2
 	// change node 2 to be inactive, expect node 1 ignore sending snapshot to 2
-	sm.prs.nodes[2].Next = sm.raftLog.firstIndex() - 1
-	sm.prs.nodes[2].RecentActive = false
+	sm.prs.prs[2].Next = sm.raftLog.firstIndex() - 1
+	sm.prs.prs[2].RecentActive = false
 
 
 	sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 	sm.Step(pb.Message{From: 1, To: 1, Type: pb.MsgProp, Entries: []pb.Entry{{Data: []byte("somedata")}}})
 
 
@@ -3201,7 +3201,7 @@ func TestSlowNodeRestore(t *testing.T) {
 	// node 3 will only be considered as active when node 1 receives a reply from it.
 	// node 3 will only be considered as active when node 1 receives a reply from it.
 	for {
 	for {
 		nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
 		nt.send(pb.Message{From: 1, To: 1, Type: pb.MsgBeat})
-		if lead.prs.nodes[3].RecentActive {
+		if lead.prs.prs[3].RecentActive {
 			break
 			break
 		}
 		}
 	}
 	}
@@ -3304,8 +3304,8 @@ func TestAddLearner(t *testing.T) {
 	if !reflect.DeepEqual(nodes, wnodes) {
 	if !reflect.DeepEqual(nodes, wnodes) {
 		t.Errorf("nodes = %v, want %v", nodes, wnodes)
 		t.Errorf("nodes = %v, want %v", nodes, wnodes)
 	}
 	}
-	if !r.prs.learners[2].IsLearner {
-		t.Errorf("node 2 is learner %t, want %t", r.prs.nodes[2].IsLearner, true)
+	if !r.prs.prs[2].IsLearner {
+		t.Errorf("node 2 is learner %t, want %t", r.prs.prs[2].IsLearner, true)
 	}
 	}
 }
 }
 
 
@@ -3619,8 +3619,8 @@ func TestLeaderTransferToSlowFollower(t *testing.T) {
 
 
 	nt.recover()
 	nt.recover()
 	lead := nt.peers[1].(*raft)
 	lead := nt.peers[1].(*raft)
-	if lead.prs.nodes[3].Match != 1 {
-		t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.nodes[3].Match, 1)
+	if lead.prs.prs[3].Match != 1 {
+		t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.prs[3].Match, 1)
 	}
 	}
 
 
 	// Transfer leadership to 3 when node 3 is lack of log.
 	// Transfer leadership to 3 when node 3 is lack of log.
@@ -3642,8 +3642,8 @@ func TestLeaderTransferAfterSnapshot(t *testing.T) {
 	nt.storage[1].Compact(lead.raftLog.applied)
 	nt.storage[1].Compact(lead.raftLog.applied)
 
 
 	nt.recover()
 	nt.recover()
-	if lead.prs.nodes[3].Match != 1 {
-		t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.nodes[3].Match, 1)
+	if lead.prs.prs[3].Match != 1 {
+		t.Fatalf("node 1 has match %x for node 3, want %x", lead.prs.prs[3].Match, 1)
 	}
 	}
 
 
 	// Transfer leadership to 3 when node 3 is lack of snapshot.
 	// Transfer leadership to 3 when node 3 is lack of snapshot.
@@ -3722,8 +3722,8 @@ func TestLeaderTransferIgnoreProposal(t *testing.T) {
 		t.Fatalf("should return drop proposal error while transferring")
 		t.Fatalf("should return drop proposal error while transferring")
 	}
 	}
 
 
-	if lead.prs.nodes[1].Match != 1 {
-		t.Fatalf("node 1 has match %x, want %x", lead.prs.nodes[1].Match, 1)
+	if lead.prs.prs[1].Match != 1 {
+		t.Fatalf("node 1 has match %x, want %x", lead.prs.prs[1].Match, 1)
 	}
 	}
 }
 }
 
 
@@ -4334,14 +4334,19 @@ func newNetworkWithConfig(configFunc func(*Config), peers ...stateMachine) *netw
 				learners[i] = true
 				learners[i] = true
 			}
 			}
 			v.id = id
 			v.id = id
-			v.prs.nodes = make(map[uint64]*Progress)
-			v.prs.learners = make(map[uint64]*Progress)
+			v.prs.voters[0] = make(map[uint64]struct{})
+			v.prs.voters[1] = make(map[uint64]struct{})
+			v.prs.learners = make(map[uint64]struct{})
+			v.prs.prs = make(map[uint64]*Progress)
 			for i := 0; i < size; i++ {
 			for i := 0; i < size; i++ {
+				pr := &Progress{}
 				if _, ok := learners[peerAddrs[i]]; ok {
 				if _, ok := learners[peerAddrs[i]]; ok {
-					v.prs.learners[peerAddrs[i]] = &Progress{IsLearner: true}
+					pr.IsLearner = true
+					v.prs.learners[peerAddrs[i]] = struct{}{}
 				} else {
 				} else {
-					v.prs.nodes[peerAddrs[i]] = &Progress{}
+					v.prs.voters[0][peerAddrs[i]] = struct{}{}
 				}
 				}
+				v.prs.prs[peerAddrs[i]] = pr
 			}
 			}
 			v.reset(v.Term)
 			v.reset(v.Term)
 			npeers[id] = v
 			npeers[id] = v

+ 8 - 4
raft/read_only.go

@@ -29,7 +29,11 @@ type ReadState struct {
 type readIndexStatus struct {
 type readIndexStatus struct {
 	req   pb.Message
 	req   pb.Message
 	index uint64
 	index uint64
-	acks  map[uint64]struct{}
+	// NB: this never records 'false', but it's more convenient to use this
+	// instead of a map[uint64]struct{} due to the API of quorum.VoteResult. If
+	// this becomes performance sensitive enough (doubtful), quorum.VoteResult
+	// can change to an API that is closer to that of CommittedIndex.
+	acks map[uint64]bool
 }
 }
 
 
 type readOnly struct {
 type readOnly struct {
@@ -54,20 +58,20 @@ func (ro *readOnly) addRequest(index uint64, m pb.Message) {
 	if _, ok := ro.pendingReadIndex[s]; ok {
 	if _, ok := ro.pendingReadIndex[s]; ok {
 		return
 		return
 	}
 	}
-	ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]struct{})}
+	ro.pendingReadIndex[s] = &readIndexStatus{index: index, req: m, acks: make(map[uint64]bool)}
 	ro.readIndexQueue = append(ro.readIndexQueue, s)
 	ro.readIndexQueue = append(ro.readIndexQueue, s)
 }
 }
 
 
 // recvAck notifies the readonly struct that the raft state machine received
 // recvAck notifies the readonly struct that the raft state machine received
 // an acknowledgment of the heartbeat that attached with the read only request
 // an acknowledgment of the heartbeat that attached with the read only request
 // context.
 // context.
-func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]struct{} {
+func (ro *readOnly) recvAck(id uint64, context []byte) map[uint64]bool {
 	rs, ok := ro.pendingReadIndex[string(context)]
 	rs, ok := ro.pendingReadIndex[string(context)]
 	if !ok {
 	if !ok {
 		return nil
 		return nil
 	}
 	}
 
 
-	rs.acks[id] = struct{}{}
+	rs.acks[id] = true
 	return rs.acks
 	return rs.acks
 }
 }
 
 

+ 201 - 0
vendor/github.com/cockroachdb/datadriven/LICENSE

@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.

+ 318 - 0
vendor/github.com/cockroachdb/datadriven/datadriven.go

@@ -0,0 +1,318 @@
+// Copyright 2018 The Cockroach Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied. See the License for the specific language governing
+// permissions and limitations under the License.
+
+package datadriven
+
+import (
+	"bufio"
+	"flag"
+	"fmt"
+	"io"
+	"io/ioutil"
+	"os"
+	"path/filepath"
+	"strconv"
+	"strings"
+	"testing"
+)
+
+var (
+	rewriteTestFiles = flag.Bool(
+		"rewrite", false,
+		"ignore the expected results and rewrite the test files with the actual results from this "+
+			"run. Used to update tests when a change affects many cases; please verify the testfile "+
+			"diffs carefully!",
+	)
+)
+
+// RunTest invokes a data-driven test. The test cases are contained in a
+// separate test file and are dynamically loaded, parsed, and executed by this
+// testing framework. By convention, test files are typically located in a
+// sub-directory called "testdata". Each test file has the following format:
+//
+//   <command>[,<command>...] [arg | arg=val | arg=(val1, val2, ...)]...
+//   <input to the command>
+//   ----
+//   <expected results>
+//
+// The command input can contain blank lines. However, by default, the expected
+// results cannot contain blank lines. This alternate syntax allows the use of
+// blank lines:
+//
+//   <command>[,<command>...] [arg | arg=val | arg=(val1, val2, ...)]...
+//   <input to the command>
+//   ----
+//   ----
+//   <expected results>
+//
+//   <more expected results>
+//   ----
+//   ----
+//
+// To execute data-driven tests, pass the path of the test file as well as a
+// function which can interpret and execute whatever commands are present in
+// the test file. The framework invokes the function, passing it information
+// about the test case in a TestData struct. The function then returns the
+// actual results of the case, which this function compares with the expected
+// results, and either succeeds or fails the test.
+func RunTest(t *testing.T, path string, f func(d *TestData) string) {
+	t.Helper()
+	file, err := os.OpenFile(path, os.O_RDWR, 0644 /* irrelevant */)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		_ = file.Close()
+	}()
+
+	runTestInternal(t, path, file, f, *rewriteTestFiles)
+}
+
+// RunTestFromString is a version of RunTest which takes the contents of a test
+// directly.
+func RunTestFromString(t *testing.T, input string, f func(d *TestData) string) {
+	t.Helper()
+	runTestInternal(t, "<string>" /* optionalPath */, strings.NewReader(input), f, *rewriteTestFiles)
+}
+
+func runTestInternal(
+	t *testing.T, sourceName string, reader io.Reader, f func(d *TestData) string, rewrite bool,
+) {
+	t.Helper()
+
+	r := newTestDataReader(t, sourceName, reader, rewrite)
+	for r.Next(t) {
+		d := &r.data
+		actual := func() string {
+			defer func() {
+				if r := recover(); r != nil {
+					fmt.Printf("\npanic during %s:\n%s\n", d.Pos, d.Input)
+					panic(r)
+				}
+			}()
+			return f(d)
+		}()
+
+		if r.rewrite != nil {
+			r.emit("----")
+			if hasBlankLine(actual) {
+				r.emit("----")
+				r.rewrite.WriteString(actual)
+				r.emit("----")
+				r.emit("----")
+			} else {
+				r.emit(actual)
+			}
+		} else if d.Expected != actual {
+			t.Fatalf("\n%s: %s\nexpected:\n%s\nfound:\n%s", d.Pos, d.Input, d.Expected, actual)
+		} else if testing.Verbose() {
+			input := d.Input
+			if input == "" {
+				input = "<no input to command>"
+			}
+			// TODO(tbg): it's awkward to reproduce the args, but it would be helpful.
+			fmt.Printf("\n%s:\n%s [%d args]\n%s\n----\n%s", d.Pos, d.Cmd, len(d.CmdArgs), input, actual)
+		}
+	}
+
+	if r.rewrite != nil {
+		data := r.rewrite.Bytes()
+		if l := len(data); l > 2 && data[l-1] == '\n' && data[l-2] == '\n' {
+			data = data[:l-1]
+		}
+		if dest, ok := reader.(*os.File); ok {
+			if _, err := dest.WriteAt(data, 0); err != nil {
+				t.Fatal(err)
+			}
+			if err := dest.Truncate(int64(len(data))); err != nil {
+				t.Fatal(err)
+			}
+			if err := dest.Sync(); err != nil {
+				t.Fatal(err)
+			}
+		} else {
+			t.Logf("input is not a file; rewritten output is:\n%s", data)
+		}
+	}
+}
+
+// Walk goes through all the files in a subdirectory, creating subtests to match
+// the file hierarchy; for each "leaf" file, the given function is called.
+//
+// This can be used in conjunction with RunTest. For example:
+//
+//    datadriven.Walk(t, path, func (t *testing.T, path string) {
+//      // initialize per-test state
+//      datadriven.RunTest(t, path, func (d *datadriven.TestData) {
+//       // ...
+//      }
+//    }
+//
+//   Files:
+//     testdata/typing
+//     testdata/logprops/scan
+//     testdata/logprops/select
+//
+//   If path is "testdata/typing", the function is called once and no subtests
+//   care created.
+//
+//   If path is "testdata/logprops", the function is called two times, in
+//   separate subtests /scan, /select.
+//
+//   If path is "testdata", the function is called three times, in subtest
+//   hierarchy /typing, /logprops/scan, /logprops/select.
+//
+func Walk(t *testing.T, path string, f func(t *testing.T, path string)) {
+	finfo, err := os.Stat(path)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if !finfo.IsDir() {
+		f(t, path)
+		return
+	}
+	files, err := ioutil.ReadDir(path)
+	if err != nil {
+		t.Fatal(err)
+	}
+	for _, file := range files {
+		t.Run(file.Name(), func(t *testing.T) {
+			Walk(t, filepath.Join(path, file.Name()), f)
+		})
+	}
+}
+
+// TestData contains information about one data-driven test case that was
+// parsed from the test file.
+type TestData struct {
+	Pos string // reader and line number
+
+	// Cmd is the first string on the directive line (up to the first whitespace).
+	Cmd string
+
+	CmdArgs []CmdArg
+
+	Input    string
+	Expected string
+}
+
+// ScanArgs looks up the first CmdArg matching the given key and scans it into
+// the given destinations in order. If the arg does not exist, the number of
+// destinations does not match that of the arguments, or a destination can not
+// be populated from its matching value, a fatal error results.
+//
+// For example, for a TestData originating from
+//
+// cmd arg1=50 arg2=yoruba arg3=(50, 50, 50)
+//
+// the following would be valid:
+//
+// var i1, i2, i3, i4 int
+// var s string
+// td.ScanArgs(t, "arg1", &i1)
+// td.ScanArgs(t, "arg2", &s)
+// td.ScanArgs(t, "arg3", &i2, &i3, &i4)
+func (td *TestData) ScanArgs(t *testing.T, key string, dests ...interface{}) {
+	t.Helper()
+	var arg CmdArg
+	for i := range td.CmdArgs {
+		if td.CmdArgs[i].Key == key {
+			arg = td.CmdArgs[i]
+			break
+		}
+	}
+	if arg.Key == "" {
+		t.Fatalf("missing argument: %s", key)
+	}
+	if len(dests) != len(arg.Vals) {
+		t.Fatalf("%s: got %d destinations, but %d values", arg.Key, len(dests), len(arg.Vals))
+	}
+
+	for i := range dests {
+		arg.Scan(t, i, dests[i])
+
+	}
+}
+
+// CmdArg contains information about an argument on the directive line. An
+// argument is specified in one of the following forms:
+//  - argument
+//  - argument=value
+//  - argument=(values, ...)
+type CmdArg struct {
+	Key  string
+	Vals []string
+}
+
+func (arg CmdArg) String() string {
+	switch len(arg.Vals) {
+	case 0:
+		return arg.Key
+
+	case 1:
+		return fmt.Sprintf("%s=%s", arg.Key, arg.Vals[0])
+
+	default:
+		return fmt.Sprintf("%s=(%s)", arg.Key, strings.Join(arg.Vals, ", "))
+	}
+}
+
+// Scan attempts to parse the value at index i into the dest.
+func (arg CmdArg) Scan(t *testing.T, i int, dest interface{}) {
+	if i < 0 || i >= len(arg.Vals) {
+		t.Fatalf("cannot scan index %d of key %s", i, arg.Key)
+	}
+	val := arg.Vals[i]
+	switch dest := dest.(type) {
+	case *string:
+		*dest = val
+	case *int:
+		n, err := strconv.ParseInt(val, 10, 64)
+		if err != nil {
+			t.Fatal(err)
+		}
+		*dest = int(n) // assume 64bit ints
+	case *uint64:
+		n, err := strconv.ParseUint(val, 10, 64)
+		if err != nil {
+			t.Fatal(err)
+		}
+		*dest = n
+	case *bool:
+		b, err := strconv.ParseBool(val)
+		if err != nil {
+			t.Fatal(err)
+		}
+		*dest = b
+	default:
+		t.Fatalf("unsupported type %T for destination #%d (might be easy to add it)", dest, i+1)
+	}
+}
+
+// Fatalf wraps a fatal testing error with test file position information, so
+// that it's easy to locate the source of the error.
+func (td TestData) Fatalf(tb testing.TB, format string, args ...interface{}) {
+	tb.Helper()
+	tb.Fatalf("%s: %s", td.Pos, fmt.Sprintf(format, args...))
+}
+
+func hasBlankLine(s string) bool {
+	scanner := bufio.NewScanner(strings.NewReader(s))
+	for scanner.Scan() {
+		if strings.TrimSpace(scanner.Text()) == "" {
+			return true
+		}
+	}
+	return false
+}

+ 40 - 0
vendor/github.com/cockroachdb/datadriven/line_scanner.go

@@ -0,0 +1,40 @@
+// Copyright 2018 The Cockroach Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied. See the License for the specific language governing
+// permissions and limitations under the License.
+
+package datadriven
+
+import (
+	"bufio"
+	"io"
+)
+
+type lineScanner struct {
+	*bufio.Scanner
+	line int
+}
+
+func newLineScanner(r io.Reader) *lineScanner {
+	return &lineScanner{
+		Scanner: bufio.NewScanner(r),
+		line:    0,
+	}
+}
+
+func (l *lineScanner) Scan() bool {
+	ok := l.Scanner.Scan()
+	if ok {
+		l.line++
+	}
+	return ok
+}

+ 202 - 0
vendor/github.com/cockroachdb/datadriven/test_data_reader.go

@@ -0,0 +1,202 @@
+// Copyright 2018 The Cockroach Authors.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+// implied. See the License for the specific language governing
+// permissions and limitations under the License.
+
+package datadriven
+
+import (
+	"bytes"
+	"fmt"
+	"io"
+	"regexp"
+	"strings"
+	"testing"
+)
+
+type testDataReader struct {
+	sourceName string
+	reader     io.Reader
+	scanner    *lineScanner
+	data       TestData
+	rewrite    *bytes.Buffer
+}
+
+func newTestDataReader(
+	t *testing.T, sourceName string, file io.Reader, record bool,
+) *testDataReader {
+	t.Helper()
+
+	var rewrite *bytes.Buffer
+	if record {
+		rewrite = &bytes.Buffer{}
+	}
+	return &testDataReader{
+		sourceName: sourceName,
+		reader:     file,
+		scanner:    newLineScanner(file),
+		rewrite:    rewrite,
+	}
+}
+
+func (r *testDataReader) Next(t *testing.T) bool {
+	t.Helper()
+
+	r.data = TestData{}
+	for r.scanner.Scan() {
+		line := r.scanner.Text()
+		r.emit(line)
+
+		line = strings.TrimSpace(line)
+		if strings.HasPrefix(line, "#") {
+			// Skip comment lines.
+			continue
+		}
+		// Support wrapping directive lines using \, for example:
+		//   build-scalar \
+		//   vars(int)
+		for strings.HasSuffix(line, `\`) && r.scanner.Scan() {
+			nextLine := r.scanner.Text()
+			r.emit(nextLine)
+			line = strings.TrimSuffix(line, `\`) + " " + strings.TrimSpace(nextLine)
+		}
+
+		fields := splitDirectives(t, line)
+		if len(fields) == 0 {
+			continue
+		}
+		cmd := fields[0]
+		r.data.Pos = fmt.Sprintf("%s:%d", r.sourceName, r.scanner.line)
+		r.data.Cmd = cmd
+
+		for _, arg := range fields[1:] {
+			key := arg
+			var vals []string
+			if pos := strings.IndexByte(key, '='); pos >= 0 {
+				key = arg[:pos]
+				val := arg[pos+1:]
+
+				if len(val) > 2 && val[0] == '(' && val[len(val)-1] == ')' {
+					vals = strings.Split(val[1:len(val)-1], ",")
+					for i := range vals {
+						vals[i] = strings.TrimSpace(vals[i])
+					}
+				} else {
+					vals = []string{val}
+				}
+			}
+			r.data.CmdArgs = append(r.data.CmdArgs, CmdArg{Key: key, Vals: vals})
+		}
+
+		var buf bytes.Buffer
+		var separator bool
+		for r.scanner.Scan() {
+			line := r.scanner.Text()
+			if line == "----" {
+				separator = true
+				break
+			}
+
+			r.emit(line)
+			fmt.Fprintln(&buf, line)
+		}
+
+		r.data.Input = strings.TrimSpace(buf.String())
+
+		if separator {
+			r.readExpected()
+		}
+		return true
+	}
+	return false
+}
+
+func (r *testDataReader) readExpected() {
+	var buf bytes.Buffer
+	var line string
+	var allowBlankLines bool
+
+	if r.scanner.Scan() {
+		line = r.scanner.Text()
+		if line == "----" {
+			allowBlankLines = true
+		}
+	}
+
+	if allowBlankLines {
+		// Look for two successive lines of "----" before terminating.
+		for r.scanner.Scan() {
+			line = r.scanner.Text()
+
+			if line == "----" {
+				if r.scanner.Scan() {
+					line2 := r.scanner.Text()
+					if line2 == "----" {
+						break
+					}
+
+					fmt.Fprintln(&buf, line)
+					fmt.Fprintln(&buf, line2)
+					continue
+				}
+			}
+
+			fmt.Fprintln(&buf, line)
+		}
+	} else {
+		// Terminate on first blank line.
+		for {
+			if strings.TrimSpace(line) == "" {
+				break
+			}
+
+			fmt.Fprintln(&buf, line)
+
+			if !r.scanner.Scan() {
+				break
+			}
+
+			line = r.scanner.Text()
+		}
+	}
+
+	r.data.Expected = buf.String()
+}
+
+func (r *testDataReader) emit(s string) {
+	if r.rewrite != nil {
+		r.rewrite.WriteString(s)
+		r.rewrite.WriteString("\n")
+	}
+}
+
+var splitDirectivesRE = regexp.MustCompile(`^ *[a-zA-Z0-9_,-\.]+(|=[-a-zA-Z0-9_@]+|=\([^)]*\))( |$)`)
+
+// splits a directive line into tokens, where each token is
+// either:
+//  - a,list,of,things
+//  - argument
+//  - argument=value
+//  - argument=(values, ...)
+func splitDirectives(t *testing.T, line string) []string {
+	var res []string
+
+	for line != "" {
+		str := splitDirectivesRE.FindString(line)
+		if len(str) == 0 {
+			t.Fatalf("cannot parse directive %s\n", line)
+		}
+		res = append(res, strings.TrimSpace(line[0:len(str)]))
+		line = line[len(str):]
+	}
+	return res
+}